001/* 002 * (C) Copyright 2015-2018 Nuxeo (http://nuxeo.com/) and others. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 * 016 * Contributors: 017 * Mariana Cedica <[email protected]> 018 * Antoine Taillefer <[email protected]> 019 * Kevin Leturc <[email protected]> 020 */ 021package org.nuxeo.drive.elasticsearch; 022 023import java.io.IOException; 024import java.time.ZonedDateTime; 025import java.time.temporal.ChronoUnit; 026import java.util.ArrayList; 027import java.util.HashSet; 028import java.util.List; 029import java.util.Set; 030 031import org.apache.logging.log4j.LogManager; 032import org.apache.logging.log4j.Logger; 033import org.elasticsearch.action.search.SearchRequest; 034import org.elasticsearch.action.search.SearchResponse; 035import org.elasticsearch.action.search.SearchType; 036import org.elasticsearch.index.query.BoolQueryBuilder; 037import org.elasticsearch.index.query.QueryBuilder; 038import org.elasticsearch.index.query.QueryBuilders; 039import org.elasticsearch.index.query.RangeQueryBuilder; 040import org.elasticsearch.index.query.TermsQueryBuilder; 041import org.elasticsearch.search.SearchHit; 042import org.elasticsearch.search.SearchHits; 043import org.elasticsearch.search.builder.SearchSourceBuilder; 044import org.elasticsearch.search.sort.SortOrder; 045import org.nuxeo.drive.service.SynchronizationRoots; 046import org.nuxeo.drive.service.impl.AuditChangeFinder; 047import org.nuxeo.ecm.core.api.CoreSession; 048import org.nuxeo.ecm.core.api.repository.RepositoryManager; 049import org.nuxeo.ecm.platform.audit.api.ExtendedInfo; 050import org.nuxeo.ecm.platform.audit.api.LogEntry; 051import org.nuxeo.ecm.platform.audit.impl.LogEntryImpl; 052import org.nuxeo.elasticsearch.ElasticSearchConstants; 053import org.nuxeo.elasticsearch.api.ESClient; 054import org.nuxeo.elasticsearch.api.ElasticSearchAdmin; 055import org.nuxeo.runtime.api.Framework; 056 057import com.fasterxml.jackson.databind.ObjectMapper; 058 059/** 060 * Override the JPA audit based change finder to execute query in ES. 061 * <p> 062 * The structure of the query executed by the {@link AuditChangeFinder} is: 063 * 064 * <pre> 065 * from LogEntry log where log.repositoryId = :repositoryId 066 * 067 * + AND if ActiveRoots (activeRoots) NOT empty 068 * 069 * from LogEntry log where log.repositoryId = :repositoryId and ( 070 * LIST_DOC_EVENTS_IDS_QUERY and ( ROOT_PATHS or COLECTIONS_PATHS) or 071 * (log.category = 'NuxeoDrive' and log.eventId != 'rootUnregistered') ) 072 * 073 * 074 * if ActiveRoots EMPTY: 075 * 076 * from LogEntry log where log.repositoryId = :repositoryId and ((log.category = 077 * 'NuxeoDrive' and log.eventId != 'rootUnregistered')) 078 * 079 * + AND (log.id > :lowerBound and log.id <= :upperBound) + order by 080 * log.repositoryId asc, log.eventDate desc 081 * </pre> 082 * 083 * @since 7.3 084 */ 085public class ESAuditChangeFinder extends AuditChangeFinder { 086 087 private static final Logger log = LogManager.getLogger(ESAuditChangeFinder.class); 088 089 protected static final String EVENT_ID = "eventId"; 090 091 protected List<LogEntry> queryESAuditEntries(CoreSession session, SynchronizationRoots activeRoots, 092 Set<String> collectionSyncRootMemberIds, long lowerBound, long upperBound, int limit) { 093 094 SearchRequest request = new SearchRequest(getESIndexName()).types(ElasticSearchConstants.ENTRY_TYPE) 095 .searchType(SearchType.DFS_QUERY_THEN_FETCH); 096 097 QueryBuilder queryBuilder = QueryBuilders.matchAllQuery(); 098 QueryBuilder filterBuilder = buildFilterClauses(session, activeRoots, collectionSyncRootMemberIds, lowerBound, 099 upperBound); 100 SearchSourceBuilder source = new SearchSourceBuilder().query( 101 QueryBuilders.boolQuery().must(queryBuilder).filter(filterBuilder)); 102 source.sort("repositoryId", SortOrder.ASC).sort("eventDate", SortOrder.DESC); 103 source.size(limit); 104 request.source(source); 105 List<LogEntry> entries = new ArrayList<>(); 106 logSearchRequest(request); 107 SearchResponse searchResponse = getClient().search(request); 108 logSearchResponse(searchResponse); 109 ObjectMapper mapper = new ObjectMapper(); 110 for (SearchHit hit : searchResponse.getHits()) { 111 try { 112 entries.add(mapper.readValue(hit.getSourceAsString(), LogEntryImpl.class)); 113 } catch (IOException e) { 114 log.error("Error while reading Audit Entry from ES", e); 115 } 116 } 117 return entries; 118 } 119 120 protected QueryBuilder buildFilterClauses(CoreSession session, SynchronizationRoots activeRoots, 121 Set<String> collectionSyncRootMemberIds, long lowerBound, long upperBound) { 122 BoolQueryBuilder filterBuilder = QueryBuilders.boolQuery(); 123 124 // from LogEntry log where log.repositoryId = :repositoryId 125 QueryBuilder repositoryClauseFilter = QueryBuilders.termQuery("repositoryId", session.getRepositoryName()); 126 filterBuilder.must(repositoryClauseFilter); 127 128 if (activeRoots.getPaths().isEmpty()) { 129 // AND (log.category = 'NuxeoDrive' and log.eventId != 'rootUnregistered') 130 filterBuilder.must(getDriveLogsQueryClause()); 131 } else { 132 133 BoolQueryBuilder orFilterBuilderIfActiveRoots = QueryBuilders.boolQuery(); 134 135 // LIST_DOC_EVENTS_IDS_QUERY 136 137 // (log.category = 'eventDocumentCategory' and (log.eventId = 138 // 'documentCreated' or log.eventId = 'documentModified' or 139 // log.eventId = 'documentMoved' or log.eventId = 140 // 'documentCreatedByCopy' or log.eventId = 'documentRestored' or 141 // log.eventId = 'addedToCollection’ or log.eventId = 'documentProxyPublished’ or log.eventId = 142 // 'documentLocked' or log.eventId = 'documentUnlocked') or log.category = 143 // 'eventLifeCycleCategory' and log.eventId = 144 // 'lifecycle_transition_event' and log.docLifeCycle != 'deleted' ) 145 String[] eventIds = { "documentCreated", "documentModified", "documentMoved", "documentCreatedByCopy", 146 "documentRestored", "addedToCollection", "documentProxyPublished", "documentLocked", 147 "documentUnlocked", "documentUntrashed" }; 148 BoolQueryBuilder orEventsFilter = QueryBuilders.boolQuery(); 149 orEventsFilter.should(getEventsClause("eventDocumentCategory", eventIds, true)); 150 orEventsFilter.should( 151 getEventsClause("eventLifeCycleCategory", new String[] { "lifecycle_transition_event" }, true)); 152 orEventsFilter.should(getEventsClause("eventLifeCycleCategory", new String[] { "deleted" }, false)); 153 154 // ROOT_PATHS log.docPath like :rootPath1 155 if (collectionSyncRootMemberIds != null && !collectionSyncRootMemberIds.isEmpty()) { 156 BoolQueryBuilder rootsOrCollectionsFilter = QueryBuilders.boolQuery(); 157 rootsOrCollectionsFilter.should(getCurrentRootsClause(activeRoots.getPaths())); 158 rootsOrCollectionsFilter.should(getCollectionSyncRootClause(collectionSyncRootMemberIds)); 159 160 // ( LIST_DOC_EVENTS_IDS_QUERY and ( ROOT_PATHS or 161 // COLECTIONS_PATHS) 162 // or (log.category = 'NuxeoDrive' and log.eventId != 163 // 'rootUnregistered') ) 164 orFilterBuilderIfActiveRoots.should( 165 QueryBuilders.boolQuery().must(orEventsFilter).must(rootsOrCollectionsFilter)); 166 } else { 167 orFilterBuilderIfActiveRoots.should(QueryBuilders.boolQuery().must(orEventsFilter).must( 168 getCurrentRootsClause(activeRoots.getPaths()))); 169 } 170 171 orFilterBuilderIfActiveRoots.should(getDriveLogsQueryClause()); 172 173 filterBuilder.must(orFilterBuilderIfActiveRoots); 174 } 175 176 filterBuilder.must(getLogIdBoundsClause(lowerBound, upperBound)); 177 return filterBuilder; 178 179 } 180 181 protected RangeQueryBuilder getLogIdBoundsClause(long lowerBound, long upperBound) { 182 RangeQueryBuilder rangeFilter = QueryBuilders.rangeQuery("id"); 183 rangeFilter.gt(lowerBound); 184 rangeFilter.lte(upperBound); 185 return rangeFilter; 186 } 187 188 protected TermsQueryBuilder getCollectionSyncRootClause(Set<String> collectionSyncRootMemberIds) { 189 return QueryBuilders.termsQuery("docUUID", collectionSyncRootMemberIds); 190 } 191 192 protected BoolQueryBuilder getCurrentRootsClause(Set<String> rootPaths) { 193 BoolQueryBuilder orFilterRoots = QueryBuilders.boolQuery(); 194 for (String rootPath : rootPaths) { 195 orFilterRoots.should(QueryBuilders.prefixQuery("docPath", rootPath)); 196 } 197 return orFilterRoots; 198 } 199 200 protected BoolQueryBuilder getDriveLogsQueryClause() { 201 BoolQueryBuilder filterBuilder = QueryBuilders.boolQuery(); 202 filterBuilder.must(QueryBuilders.termQuery("category", "NuxeoDrive")); 203 filterBuilder.mustNot(QueryBuilders.termQuery(EVENT_ID, "rootUnregistered")); 204 return filterBuilder; 205 } 206 207 protected BoolQueryBuilder getEventsClause(String category, String[] eventIds, boolean shouldMatch) { 208 BoolQueryBuilder filterBuilder = QueryBuilders.boolQuery(); 209 filterBuilder.must(QueryBuilders.termQuery("category", category)); 210 if (eventIds != null && eventIds.length > 0) { 211 if (eventIds.length == 1) { 212 if (shouldMatch) { 213 filterBuilder.must(QueryBuilders.termQuery(EVENT_ID, eventIds[0])); 214 } else { 215 filterBuilder.mustNot(QueryBuilders.termQuery(EVENT_ID, eventIds[0])); 216 } 217 } else { 218 if (shouldMatch) { 219 filterBuilder.must(QueryBuilders.termsQuery(EVENT_ID, eventIds)); 220 } else { 221 filterBuilder.mustNot(QueryBuilders.termsQuery(EVENT_ID, eventIds)); 222 } 223 } 224 } 225 return filterBuilder; 226 } 227 228 @Override 229 public long getUpperBound() { 230 RepositoryManager repositoryManager = Framework.getService(RepositoryManager.class); 231 return getUpperBound(new HashSet<>(repositoryManager.getRepositoryNames())); 232 } 233 234 /** 235 * Returns the last available log id in the audit index considering events older than the last clustering 236 * invalidation date if clustering is enabled for at least one of the given repositories. This is to make sure the 237 * {@code DocumentModel} further fetched from the session using the audit entry doc id is fresh. 238 */ 239 @Override 240 public long getUpperBound(Set<String> repositoryNames) { 241 SearchRequest request = new SearchRequest(getESIndexName()).types(ElasticSearchConstants.ENTRY_TYPE) 242 .searchType(SearchType.DFS_QUERY_THEN_FETCH); 243 RangeQueryBuilder filterBuilder = QueryBuilders.rangeQuery("logDate"); 244 long clusteringDelay = getClusteringDelay(repositoryNames); 245 if (clusteringDelay > -1) { 246 long lastClusteringInvalidationDate = System.currentTimeMillis() - 2 * clusteringDelay; 247 filterBuilder = filterBuilder.lt(lastClusteringInvalidationDate); 248 } 249 SearchSourceBuilder source = new SearchSourceBuilder(); 250 source.sort("id", SortOrder.DESC).size(1); 251 // scroll on previous days with a times 2 step up to 32 252 ESClient esClient = getClient(); 253 for (int i = 1; i <= 32; i = i * 2) { 254 ZonedDateTime lowerLogDateTime = ZonedDateTime.now().truncatedTo(ChronoUnit.DAYS).minusDays(i); 255 // set lower bound in query 256 filterBuilder = filterBuilder.gt(lowerLogDateTime.toInstant().toEpochMilli()); 257 source.query(QueryBuilders.boolQuery().filter(filterBuilder)); 258 request.source(source); 259 // run request 260 logSearchRequest(request); 261 SearchResponse searchResponse = esClient.search(request); 262 logSearchResponse(searchResponse); 263 264 // if results return the first hit id 265 ObjectMapper mapper = new ObjectMapper(); 266 SearchHits hits = searchResponse.getHits(); 267 for (SearchHit hit : hits) { 268 try { 269 return mapper.readValue(hit.getSourceAsString(), LogEntryImpl.class).getId(); 270 } catch (IOException e) { 271 log.error("Error while reading Audit Entry from ES", e); 272 } 273 } 274 } 275 if (clusteringDelay > -1) { 276 // Check for existing entries without the clustering invalidation date filter to not return -1 in this 277 // case and make sure the lower bound of the next call to NuxeoDriveManager#getChangeSummary will be >= 0 278 source.query(QueryBuilders.matchAllQuery()).size(0); 279 request.source(source); 280 logSearchRequest(request); 281 SearchResponse searchResponse = esClient.search(request); 282 logSearchResponse(searchResponse); 283 if (searchResponse.getHits().getTotalHits() > 0) { 284 log.debug("Found no audit log entries matching the criterias but some exist, returning 0"); 285 return 0; 286 } 287 } 288 log.debug("Found no audit log entries, returning -1"); 289 return -1; 290 } 291 292 @Override 293 protected List<LogEntry> queryAuditEntries(CoreSession session, SynchronizationRoots activeRoots, 294 Set<String> collectionSyncRootMemberIds, long lowerBound, long upperBound, int limit) { 295 List<LogEntry> entries = queryESAuditEntries(session, activeRoots, collectionSyncRootMemberIds, lowerBound, 296 upperBound, limit); 297 // Post filter the output to remove (un)registration that are unrelated 298 // to the current user. 299 // TODO move this to the ES query 300 List<LogEntry> postFilteredEntries = new ArrayList<>(); 301 String principalName = session.getPrincipal().getName(); 302 for (LogEntry entry : entries) { 303 ExtendedInfo impactedUserInfo = entry.getExtendedInfos().get("impactedUserName"); 304 if (impactedUserInfo != null && !principalName.equals(impactedUserInfo.getValue(String.class))) { 305 // ignore event that only impact other users 306 continue; 307 } 308 log.debug("Change detected: {}", entry); 309 postFilteredEntries.add(entry); 310 } 311 return postFilteredEntries; 312 } 313 314 protected ESClient getClient() { 315 return Framework.getService(ElasticSearchAdmin.class).getClient(); 316 } 317 318 protected String getESIndexName() { 319 ElasticSearchAdmin esa = Framework.getService(ElasticSearchAdmin.class); 320 return esa.getIndexNameForType(ElasticSearchConstants.ENTRY_TYPE); 321 } 322 323 protected void logSearchRequest(SearchRequest request) { 324 log.debug("Elasticsearch search request: curl -XGET 'http://localhost:9200/{}/{}/_search?pretty' -d '{}'", 325 this::getESIndexName, () -> ElasticSearchConstants.ENTRY_TYPE, () -> request); 326 } 327 328 protected void logSearchResponse(SearchResponse response) { 329 log.debug("Elasticsearch search response: {}", response); 330 } 331}