001/* 002 * (C) Copyright 2012-2018 Nuxeo (http://nuxeo.com/) and others. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 * 016 * Contributors: 017 * Antoine Taillefer <[email protected]> 018 */ 019package org.nuxeo.drive.service.impl; 020 021import java.util.ArrayList; 022import java.util.Date; 023import java.util.HashMap; 024import java.util.List; 025import java.util.Map; 026import java.util.Set; 027 028import org.apache.logging.log4j.LogManager; 029import org.apache.logging.log4j.Logger; 030import org.nuxeo.drive.adapter.FileSystemItem; 031import org.nuxeo.drive.adapter.RootlessItemException; 032import org.nuxeo.drive.adapter.impl.AbstractFileSystemItem; 033import org.nuxeo.drive.service.FileSystemChangeFinder; 034import org.nuxeo.drive.service.FileSystemItemAdapterService; 035import org.nuxeo.drive.service.FileSystemItemChange; 036import org.nuxeo.drive.service.NuxeoDriveEvents; 037import org.nuxeo.drive.service.NuxeoDriveManager; 038import org.nuxeo.drive.service.SynchronizationRoots; 039import org.nuxeo.drive.service.TooManyChangesException; 040import org.nuxeo.ecm.core.api.CoreSession; 041import org.nuxeo.ecm.core.api.DocumentModel; 042import org.nuxeo.ecm.core.api.DocumentRef; 043import org.nuxeo.ecm.core.api.IdRef; 044import org.nuxeo.ecm.core.storage.sql.RepositoryDescriptor; 045import org.nuxeo.ecm.core.storage.sql.coremodel.SQLRepositoryService; 046import org.nuxeo.ecm.platform.audit.api.AuditReader; 047import org.nuxeo.ecm.platform.audit.api.ExtendedInfo; 048import org.nuxeo.ecm.platform.audit.api.LogEntry; 049import org.nuxeo.runtime.api.Framework; 050 051/** 052 * Implementation of {@link FileSystemChangeFinder} using the {@link AuditReader}. 053 * 054 * @author Antoine Taillefer 055 */ 056public class AuditChangeFinder implements FileSystemChangeFinder { 057 058 private static final Logger log = LogManager.getLogger(AuditChangeFinder.class); 059 060 protected Map<String, String> parameters = new HashMap<>(); 061 062 @Override 063 public void handleParameters(Map<String, String> parameters) { 064 this.parameters.putAll(parameters); 065 } 066 067 @Override 068 public List<FileSystemItemChange> getFileSystemChanges(CoreSession session, Set<IdRef> lastActiveRootRefs, 069 SynchronizationRoots activeRoots, Set<String> collectionSyncRootMemberIds, long lowerBound, long upperBound, 070 int limit) { 071 String principalName = session.getPrincipal().getName(); 072 List<FileSystemItemChange> changes = new ArrayList<>(); 073 074 // Note: lastActiveRootRefs is not used: we could remove it from the 075 // public API 076 // and from the client as well but it might be useful to optimize future 077 // alternative implementations FileSystemChangeFinder component so it 078 // might 079 // be better to leave it part of the public API as currently. 080 081 // Find changes from the log under active roots or events that are 082 // linked to the un-registration or deletion of formerly synchronized 083 // roots 084 List<LogEntry> entries = queryAuditEntries(session, activeRoots, collectionSyncRootMemberIds, lowerBound, 085 upperBound, limit); 086 087 // First pass over the entries to check if a "NuxeoDrive" event has 088 // occurred during that period. 089 // This event can be: 090 // - a root registration 091 // - a root unregistration 092 // - a "deleted" transition / documentTrashed event 093 // - an "undeleted" transition 094 // - a removal 095 // - a move to an non synchronization root 096 // - a security update 097 // Thus the list of active roots may have changed and the cache might 098 // need to be invalidated: let's make sure we perform a 099 // query with the actual active roots. 100 for (LogEntry entry : entries) { 101 if (NuxeoDriveEvents.EVENT_CATEGORY.equals(entry.getCategory())) { 102 log.debug("Detected sync root change for user '{}' in audit log:" 103 + " invalidating the root cache and refetching the changes.", principalName); 104 NuxeoDriveManager driveManager = Framework.getService(NuxeoDriveManager.class); 105 driveManager.invalidateSynchronizationRootsCache(principalName); 106 driveManager.invalidateCollectionSyncRootMemberCache(principalName); 107 Map<String, SynchronizationRoots> synchronizationRoots = driveManager.getSynchronizationRoots( 108 session.getPrincipal()); 109 SynchronizationRoots updatedActiveRoots = synchronizationRoots.get(session.getRepositoryName()); 110 Set<String> updatedCollectionSyncRootMemberIds = driveManager.getCollectionSyncRootMemberIds( 111 session.getPrincipal()).get(session.getRepositoryName()); 112 entries = queryAuditEntries(session, updatedActiveRoots, updatedCollectionSyncRootMemberIds, lowerBound, 113 upperBound, limit); 114 break; 115 } 116 } 117 118 if (entries.size() >= limit) { 119 throw new TooManyChangesException("Too many changes found in the audit logs."); 120 } 121 for (LogEntry entry : entries) { 122 log.debug("Handling log entry {}", entry); 123 FileSystemItemChange change = null; 124 DocumentRef docRef = new IdRef(entry.getDocUUID()); 125 ExtendedInfo fsIdInfo = entry.getExtendedInfos().get("fileSystemItemId"); 126 if (fsIdInfo != null) { 127 // This document has been deleted, moved, is an unregistered synchronization root or its security has 128 // been updated, we just know the FileSystemItem id and name. 129 log.debug("Found extended info in audit log entry: document has been deleted, moved," 130 + " is an unregistered synchronization root or its security has been updated," 131 + " we just know the FileSystemItem id and name."); 132 boolean isChangeSet = false; 133 // First try to adapt the document as a FileSystemItem to provide it to the FileSystemItemChange entry, 134 // only in the case of a move or a security update. 135 // This can succeed if this is a move to a synchronization root or a security update after which the 136 // current user still has access to the document. 137 if (!"deleted".equals(entry.getEventId()) && session.exists(docRef)) { 138 change = getFileSystemItemChange(session, docRef, entry, fsIdInfo.getValue(String.class)); 139 if (change != null) { 140 if (NuxeoDriveEvents.MOVED_EVENT.equals(entry.getEventId())) { 141 // A move to a synchronization root also fires a documentMoved event, don't propagate the 142 // virtual event. 143 log.debug( 144 "Document {} ({}) has been moved to another synchronzation root, not adding entry to the change summary.", 145 entry::getDocPath, () -> docRef); 146 continue; 147 } 148 isChangeSet = true; 149 } 150 } 151 if (!isChangeSet) { 152 // If the document has been deleted, is a regular unregistered synchronization root, has been moved 153 // to a non synchronization root, if its security has been updated denying access to the current 154 // user, or if it is not adaptable as a FileSystemItem for any other reason only provide the 155 // FileSystemItem id and name to the FileSystemItemChange entry. 156 log.debug( 157 "Document {} ({}) doesn't exist or is not adaptable as a FileSystemItem, only providing the FileSystemItem id and name to the FileSystemItemChange entry.", 158 entry::getDocPath, () -> docRef); 159 String fsId = fsIdInfo.getValue(String.class); 160 String eventId; 161 if (NuxeoDriveEvents.MOVED_EVENT.equals(entry.getEventId())) { 162 // Move to a non synchronization root 163 eventId = NuxeoDriveEvents.DELETED_EVENT; 164 } else { 165 // Deletion, unregistration or security update 166 eventId = entry.getEventId(); 167 } 168 change = new FileSystemItemChangeImpl(eventId, entry.getEventDate().getTime(), 169 entry.getRepositoryId(), entry.getDocUUID(), fsId, null); 170 } 171 log.debug("Adding FileSystemItemChange entry to the change summary: {}", change); 172 changes.add(change); 173 } else { 174 // No extended info in the audit log entry, this should not be a deleted document, a moved document, an 175 // unregistered synchronization root nor a security update denying access to the current user. 176 log.debug( 177 "No extended info found in audit log entry {} ({}): this is not a deleted document, a moved document," 178 + " an unregistered synchronization root nor a security update denying access to the current user.", 179 entry::getDocPath, () -> docRef); 180 if (!session.exists(docRef)) { 181 log.debug("Document {} ({}) doesn't exist, not adding entry to the change summary.", 182 entry::getDocPath, () -> docRef); 183 // Deleted or non accessible documents are mapped to 184 // deleted file system items in a separate event: no need to 185 // try to propagate this event. 186 continue; 187 } 188 // Let's try to adapt the document as a FileSystemItem to 189 // provide it to the FileSystemItemChange entry. 190 change = getFileSystemItemChange(session, docRef, entry, null); 191 if (change == null) { 192 // Non-adaptable documents are ignored 193 log.debug( 194 "Document {} ({}) is not adaptable as a FileSystemItem, not adding any entry to the change summary.", 195 entry::getDocPath, () -> docRef); 196 } else { 197 log.debug("Adding FileSystemItemChange entry to the change summary: {}", change); 198 changes.add(change); 199 } 200 } 201 } 202 return changes; 203 } 204 205 /** 206 * Returns the last available log id in the audit log table (primary key) to be used as the upper bound of the event 207 * log id range clause in the change query. 208 */ 209 @Override 210 @SuppressWarnings("unchecked") 211 public long getUpperBound() { 212 AuditReader auditService = Framework.getService(AuditReader.class); 213 String auditQuery = "from LogEntry log order by log.id desc"; 214 log.debug("Querying audit log for greatest id: {}", auditQuery); 215 216 List<LogEntry> entries = (List<LogEntry>) auditService.nativeQuery(auditQuery, 1, 1); 217 if (entries.isEmpty()) { 218 log.debug("Found no audit log entries, returning -1"); 219 return -1; 220 } 221 return entries.get(0).getId(); 222 } 223 224 /** 225 * Returns the last available log id in the audit log table (primary key) considering events older than the last 226 * clustering invalidation date if clustering is enabled for at least one of the given repositories. This is to make 227 * sure the {@code DocumentModel} further fetched from the session using the audit entry doc id is fresh. 228 */ 229 @Override 230 @SuppressWarnings("unchecked") 231 public long getUpperBound(Set<String> repositoryNames) { 232 long clusteringDelay = getClusteringDelay(repositoryNames); 233 AuditReader auditService = Framework.getService(AuditReader.class); 234 Map<String, Object> params = new HashMap<>(); 235 StringBuilder auditQuerySb = new StringBuilder("from LogEntry log"); 236 if (clusteringDelay > -1) { 237 // Double the delay in case of overlapping, see https://jira.nuxeo.com/browse/NXP-14826 238 long lastClusteringInvalidationDate = System.currentTimeMillis() - 2 * clusteringDelay; 239 params.put("lastClusteringInvalidationDate", new Date(lastClusteringInvalidationDate)); 240 auditQuerySb.append(" where log.logDate < :lastClusteringInvalidationDate"); 241 } 242 auditQuerySb.append(" order by log.id desc"); 243 String auditQuery = auditQuerySb.toString(); 244 log.debug("Querying audit log for greatest id: {} with params: {}", auditQuery, params); 245 246 List<LogEntry> entries = (List<LogEntry>) auditService.nativeQuery(auditQuery, params, 1, 1); 247 if (entries.isEmpty()) { 248 if (clusteringDelay > -1) { 249 // Check for existing entries without the clustering invalidation date filter to not return -1 in this 250 // case and make sure the lower bound of the next call to NuxeoDriveManager#getChangeSummary will be >= 251 // 0 252 List<LogEntry> allEntries = (List<LogEntry>) auditService.nativeQuery("from LogEntry", 1, 1); 253 if (!allEntries.isEmpty()) { 254 log.debug("Found no audit log entries matching the criterias but some exist, returning 0"); 255 return 0; 256 } 257 } 258 log.debug("Found no audit log entries, returning -1"); 259 return -1; 260 } 261 return entries.get(0).getId(); 262 } 263 264 /** 265 * Returns the longest clustering delay among the given repositories for which clustering is enabled. 266 */ 267 protected long getClusteringDelay(Set<String> repositoryNames) { 268 long clusteringDelay = -1; 269 SQLRepositoryService repositoryService = Framework.getService(SQLRepositoryService.class); 270 for (String repositoryName : repositoryNames) { 271 RepositoryDescriptor repositoryDescriptor = repositoryService.getRepositoryDescriptor(repositoryName); 272 if (repositoryDescriptor == null) { 273 // Not a VCS repository` 274 continue; 275 } 276 if (repositoryDescriptor.getClusteringEnabled()) { 277 clusteringDelay = Math.max(clusteringDelay, repositoryDescriptor.getClusteringDelay()); 278 } 279 } 280 return clusteringDelay; 281 } 282 283 @SuppressWarnings("unchecked") 284 protected List<LogEntry> queryAuditEntries(CoreSession session, SynchronizationRoots activeRoots, 285 Set<String> collectionSyncRootMemberIds, long lowerBound, long upperBound, int limit) { 286 AuditReader auditService = Framework.getService(AuditReader.class); 287 // Set fixed query parameters 288 Map<String, Object> params = new HashMap<>(); 289 params.put("repositoryId", session.getRepositoryName()); 290 291 // Build query and set dynamic parameters 292 StringBuilder auditQuerySb = new StringBuilder("from LogEntry log where "); 293 auditQuerySb.append("log.repositoryId = :repositoryId"); 294 auditQuerySb.append(" and "); 295 auditQuerySb.append("("); 296 if (!activeRoots.getPaths().isEmpty()) { 297 // detect changes under the currently active roots for the 298 // current user 299 auditQuerySb.append("("); 300 auditQuerySb.append("log.category = 'eventDocumentCategory'"); 301 // TODO: don't hardcode event ids (contribute them?) 302 auditQuerySb.append( 303 " and (log.eventId = 'documentCreated' or log.eventId = 'documentModified' or log.eventId = 'documentMoved' or log.eventId = 'documentCreatedByCopy' or log.eventId = 'documentRestored' or log.eventId = 'addedToCollection' or log.eventId = 'documentProxyPublished' or log.eventId = 'documentLocked' or log.eventId = 'documentUnlocked' or log.eventId = 'documentUntrashed')"); 304 auditQuerySb.append(" or "); 305 auditQuerySb.append("log.category = 'eventLifeCycleCategory'"); 306 auditQuerySb.append(" and log.eventId = 'lifecycle_transition_event' and log.docLifeCycle != 'deleted' "); 307 auditQuerySb.append(") and ("); 308 auditQuerySb.append("("); 309 auditQuerySb.append(getCurrentRootFilteringClause(activeRoots.getPaths(), params)); 310 auditQuerySb.append(")"); 311 if (collectionSyncRootMemberIds != null && !collectionSyncRootMemberIds.isEmpty()) { 312 auditQuerySb.append(" or ("); 313 auditQuerySb.append(getCollectionSyncRootFilteringClause(collectionSyncRootMemberIds, params)); 314 auditQuerySb.append(")"); 315 } 316 auditQuerySb.append(") or "); 317 } 318 // Detect any root (un-)registration changes for the roots previously 319 // seen by the current user. 320 // Exclude 'rootUnregistered' since root unregistration is covered by a 321 // "deleted" virtual event. 322 auditQuerySb.append("("); 323 auditQuerySb.append("log.category = '"); 324 auditQuerySb.append(NuxeoDriveEvents.EVENT_CATEGORY); 325 auditQuerySb.append("' and log.eventId != 'rootUnregistered'"); 326 auditQuerySb.append(")"); 327 auditQuerySb.append(") and ("); 328 auditQuerySb.append(getJPARangeClause(lowerBound, upperBound, params)); 329 // we intentionally sort by eventDate even if the range filtering is 330 // done on the log id: eventDate is useful to reflect the ordering of 331 // events occurring inside the same transaction while the 332 // monotonic behavior of log id is useful for ensuring that consecutive 333 // range queries to the audit won't miss any events even when long 334 // running transactions are logged after a delay. 335 auditQuerySb.append(") order by log.repositoryId asc, log.eventDate desc"); 336 String auditQuery = auditQuerySb.toString(); 337 338 log.debug("Querying audit log for changes: {} with params: {}", auditQuery, params); 339 340 List<LogEntry> entries = (List<LogEntry>) auditService.nativeQuery(auditQuery, params, 1, limit); 341 342 // Post filter the output to remove (un)registration that are unrelated 343 // to the current user. 344 List<LogEntry> postFilteredEntries = new ArrayList<>(); 345 String principalName = session.getPrincipal().getName(); 346 for (LogEntry entry : entries) { 347 ExtendedInfo impactedUserInfo = entry.getExtendedInfos().get("impactedUserName"); 348 if (impactedUserInfo != null && !principalName.equals(impactedUserInfo.getValue(String.class))) { 349 // ignore event that only impact other users 350 continue; 351 } 352 log.debug("Change detected: {}", entry); 353 postFilteredEntries.add(entry); 354 } 355 return postFilteredEntries; 356 } 357 358 protected String getCurrentRootFilteringClause(Set<String> rootPaths, Map<String, Object> params) { 359 StringBuilder rootPathClause = new StringBuilder(); 360 int rootPathCount = 0; 361 for (String rootPath : rootPaths) { 362 rootPathCount++; 363 String rootPathParam = "rootPath" + rootPathCount; 364 if (rootPathClause.length() > 0) { 365 rootPathClause.append(" or "); 366 } 367 rootPathClause.append(String.format("log.docPath like :%s", rootPathParam)); 368 params.put(rootPathParam, rootPath + '%'); 369 370 } 371 return rootPathClause.toString(); 372 } 373 374 protected String getCollectionSyncRootFilteringClause(Set<String> collectionSyncRootMemberIds, 375 Map<String, Object> params) { 376 String paramName = "collectionMemberIds"; 377 params.put(paramName, collectionSyncRootMemberIds); 378 return String.format("log.docUUID in (:%s)", paramName); 379 } 380 381 /** 382 * Using event log id to ensure consistency, see https://jira.nuxeo.com/browse/NXP-14826. 383 */ 384 protected String getJPARangeClause(long lowerBound, long upperBound, Map<String, Object> params) { 385 params.put("lowerBound", lowerBound); 386 params.put("upperBound", upperBound); 387 return "log.id > :lowerBound and log.id <= :upperBound"; 388 } 389 390 protected FileSystemItemChange getFileSystemItemChange(CoreSession session, DocumentRef docRef, LogEntry entry, 391 String expectedFileSystemItemId) { 392 DocumentModel doc = session.getDocument(docRef); 393 // TODO: check the facet, last root change and list of roots 394 // to have a special handling for the roots. 395 FileSystemItem fsItem = null; 396 try { 397 // NXP-19442: Avoid useless and costly call to DocumentModel#getLockInfo 398 fsItem = Framework.getService(FileSystemItemAdapterService.class).getFileSystemItem(doc, false, false, 399 false); 400 } catch (RootlessItemException e) { 401 // Can happen for an unregistered synchronization root that cannot 402 // be adapted as a FileSystemItem: nothing to do. 403 log.debug("RootlessItemException thrown while trying to adapt document {} ({}) as a FileSystemItem.", 404 entry::getDocPath, () -> docRef); 405 } 406 if (fsItem == null) { 407 log.debug("Document {} ({}) is not adaptable as a FileSystemItem, returning null.", entry::getDocPath, 408 () -> docRef); 409 return null; 410 } 411 if (expectedFileSystemItemId != null 412 && !fsItem.getId() 413 .endsWith(AbstractFileSystemItem.FILE_SYSTEM_ITEM_ID_SEPARATOR + expectedFileSystemItemId)) { 414 log.debug( 415 "Id {} of FileSystemItem adapted from document {} ({}) doesn't match expected FileSystemItem id {}, returning null.", 416 fsItem::getId, entry::getDocPath, () -> docRef, () -> expectedFileSystemItemId); 417 return null; 418 } 419 log.debug("Document {} ({}) is adaptable as a FileSystemItem, providing it to the FileSystemItemChange entry.", 420 entry::getDocPath, () -> docRef); 421 // EventDate is able to reflect the ordering of the events 422 // inside a transaction (e.g. when several documents are 423 // created, updated, deleted at once) hence it's useful 424 // to pass that info to the client even though the change 425 // detection filtering is using the log id to have a 426 // guaranteed monotonic behavior that evenDate cannot 427 // guarantee when facing long transactions. 428 return new FileSystemItemChangeImpl(entry.getEventId(), entry.getEventDate().getTime(), entry.getRepositoryId(), 429 entry.getDocUUID(), fsItem); 430 } 431 432}