001/* 002 * (C) Copyright 2014 Nuxeo SA (http://nuxeo.com/) and others. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 * 016 * Contributors: 017 * Nuxeo 018 */ 019package org.nuxeo.elasticsearch.web.admin; 020 021import com.codahale.metrics.MetricRegistry; 022import com.codahale.metrics.SharedMetricRegistries; 023import com.codahale.metrics.Timer; 024import org.apache.commons.logging.Log; 025import org.apache.commons.logging.LogFactory; 026import org.elasticsearch.action.search.SearchRequest; 027import org.elasticsearch.action.search.SearchResponse; 028import org.elasticsearch.index.query.QueryBuilders; 029import org.elasticsearch.search.builder.SearchSourceBuilder; 030import org.jboss.seam.annotations.In; 031import org.jboss.seam.annotations.Name; 032import org.jboss.seam.annotations.Scope; 033import org.nuxeo.ecm.core.api.CloseableCoreSession; 034import org.nuxeo.ecm.core.api.CoreInstance; 035import org.nuxeo.ecm.core.api.CoreSession; 036import org.nuxeo.ecm.core.api.DocumentModel; 037import org.nuxeo.ecm.core.api.DocumentRef; 038import org.nuxeo.ecm.core.api.IdRef; 039import org.nuxeo.ecm.platform.query.api.PageProvider; 040import org.nuxeo.ecm.platform.query.api.PageProviderDefinition; 041import org.nuxeo.ecm.platform.query.api.PageProviderService; 042import org.nuxeo.elasticsearch.api.ElasticSearchAdmin; 043import org.nuxeo.elasticsearch.api.ElasticSearchIndexing; 044import org.nuxeo.elasticsearch.commands.IndexingCommand; 045import org.nuxeo.elasticsearch.commands.IndexingCommand.Type; 046import org.nuxeo.runtime.api.Framework; 047import org.nuxeo.runtime.metrics.MetricsService; 048 049import java.io.Serializable; 050import java.util.ArrayList; 051import java.util.Arrays; 052import java.util.Collections; 053import java.util.List; 054 055import static org.jboss.seam.ScopeType.CONVERSATION; 056 057/** 058 * @author <a href="mailto:[email protected]">Tiry</a> 059 */ 060@Name("esAdmin") 061@Scope(CONVERSATION) 062public class ElasticSearchManager implements Serializable { 063 064 private static final long serialVersionUID = 1L; 065 066 private static final Log log = LogFactory.getLog(ElasticSearchManager.class); 067 068 private static final String DEFAULT_NXQL_QUERY = "SELECT * FROM Document"; 069 070 private static final String JSON_DELETE_CMD = "{\"id\":\"IndexingCommand-reindex\",\"type\":\"DELETE\",\"docId\":\"%s\",\"repo\":\"%s\",\"recurse\":true,\"sync\":true}"; 071 072 private static final String ES_CLUSTER_INFO_PROPERTY = "elasticsearch.adminCenter.displayClusterInfo"; 073 074 @In(create = true) 075 protected ElasticSearchAdmin esa; 076 077 @In(create = true) 078 protected ElasticSearchIndexing esi; 079 080 @In(create = true, required = false) 081 protected transient CoreSession documentManager; 082 083 protected List<PageProviderStatus> ppStatuses = null; 084 085 protected Timer indexTimer; 086 087 protected Timer bulkIndexTimer; 088 089 private String rootId; 090 091 private String nxql = DEFAULT_NXQL_QUERY; 092 093 private List<String> repositoryNames; 094 095 private String repositoryName; 096 097 private Boolean dropIndex = false; 098 099 public String getNodesInfo() { 100 return esa.getClient().getNodesInfo(); 101 } 102 103 public String getNodesStats() { 104 return esa.getClient().getNodesStats(); 105 } 106 107 public String getNodesHealth() { 108 String[] indices = getIndexNames(); 109 return esa.getClient().getHealthStatus(indices).toString(); 110 } 111 112 public void startReindexAll() { 113 String repositoryName = getRepositoryName(); 114 log.warn("Re-indexing the entire repository: " + repositoryName); 115 esi.reindexRepository(repositoryName); 116 } 117 118 public void startReindexNxql() { 119 String repositoryName = getRepositoryName(); 120 log.warn(String.format("Re-indexing from a NXQL query: %s on repository: %s", getNxql(), repositoryName)); 121 esi.runReindexingWorker(repositoryName, getNxql()); 122 } 123 124 public void startReindexFrom() { 125 String repositoryName = getRepositoryName(); 126 try (CloseableCoreSession session = CoreInstance.openCoreSessionSystem(repositoryName)) { 127 log.warn(String.format("Try to remove %s and its children from %s repository index", rootId, 128 repositoryName)); 129 String jsonCmd = String.format(JSON_DELETE_CMD, rootId, repositoryName); 130 IndexingCommand rmCmd = IndexingCommand.fromJSON(jsonCmd); 131 esi.indexNonRecursive(rmCmd); 132 133 DocumentRef ref = new IdRef(rootId); 134 if (session.exists(ref)) { 135 DocumentModel doc = session.getDocument(ref); 136 log.warn(String.format("Re-indexing document: %s and its children on repository: %s", doc, 137 repositoryName)); 138 IndexingCommand cmd = new IndexingCommand(doc, Type.INSERT, false, true); 139 esi.runIndexingWorker(Arrays.asList(cmd)); 140 } 141 } 142 } 143 144 public void flush() { 145 esa.flush(); 146 } 147 148 public void optimize() { 149 esa.optimize(); 150 } 151 152 protected void introspectPageProviders() { 153 154 ppStatuses = new ArrayList<>(); 155 156 PageProviderService pps = Framework.getService(PageProviderService.class); 157 for (String ppName : pps.getPageProviderDefinitionNames()) { 158 PageProviderDefinition def = pps.getPageProviderDefinition(ppName); 159 // Create an instance so class replacer is taken in account 160 PageProvider<?> pp = pps.getPageProvider(ppName, def, null, null, 0L, 0L, null); 161 String klass = pp.getClass().getCanonicalName(); 162 ppStatuses.add(new PageProviderStatus(ppName, klass)); 163 } 164 Collections.sort(ppStatuses); 165 } 166 167 public List<PageProviderStatus> getContentViewStatus() { 168 if (ppStatuses == null) { 169 introspectPageProviders(); 170 } 171 return ppStatuses; 172 } 173 174 public Boolean isIndexingInProgress() { 175 return esa.isIndexingInProgress(); 176 } 177 178 public Boolean displayClusterInfo() { 179 if (esa.isEmbedded()) { 180 return true; 181 } 182 return Boolean.parseBoolean(Framework.getProperty(ES_CLUSTER_INFO_PROPERTY, "false")); 183 } 184 185 public String getPendingWorkerCount() { 186 return Long.valueOf(esa.getPendingWorkerCount()).toString(); 187 } 188 189 public String getRunningWorkerCount() { 190 return Long.valueOf(esa.getRunningWorkerCount()).toString(); 191 } 192 193 public String getTotalCommandProcessed() { 194 return Integer.valueOf(esa.getTotalCommandProcessed()).toString(); 195 } 196 197 public String getNumberOfDocuments() { 198 String[] indices = getIndexNames(); 199 SearchResponse ret = esa.getClient().search(new SearchRequest(indices) 200 .source(new SearchSourceBuilder().query(QueryBuilders.matchAllQuery()).size(0))); 201 return Long.valueOf(ret.getHits().totalHits).toString(); 202 } 203 204 private String[] getIndexNames() { 205 List<String> repositoryNames = getRepositoryNames(); 206 String indices[] = new String[repositoryNames.size()]; 207 int i = 0; 208 for (String repo : repositoryNames) { 209 indices[i++] = esa.getIndexNameForRepository(repo); 210 } 211 return indices; 212 } 213 214 public String getIndexingRates() { 215 if (indexTimer == null) { 216 MetricRegistry registry = SharedMetricRegistries.getOrCreate(MetricsService.class.getName()); 217 indexTimer = registry.timer(MetricRegistry.name("nuxeo", "elasticsearch", "service", "index")); 218 219 } 220 return String.format("%.2f, %.2f, %.2f", indexTimer.getOneMinuteRate(), indexTimer.getFiveMinuteRate(), 221 indexTimer.getFifteenMinuteRate()); 222 } 223 224 public String getBulkIndexingRates() { 225 if (bulkIndexTimer == null) { 226 MetricRegistry registry = SharedMetricRegistries.getOrCreate(MetricsService.class.getName()); 227 bulkIndexTimer = registry.timer(MetricRegistry.name("nuxeo", "elasticsearch", "service", "bulkIndex")); 228 229 } 230 return String.format("%.2f, %.2f, %.2f", bulkIndexTimer.getOneMinuteRate(), bulkIndexTimer.getFiveMinuteRate(), 231 bulkIndexTimer.getFifteenMinuteRate()); 232 } 233 234 public String getRootId() { 235 return rootId; 236 } 237 238 public List<String> getRepositoryNames() { 239 if (repositoryNames == null) { 240 repositoryNames = esa.getRepositoryNames(); 241 } 242 return repositoryNames; 243 } 244 245 public void setRootId(String rootId) { 246 this.rootId = rootId; 247 } 248 249 public String getNxql() { 250 return nxql; 251 } 252 253 public void setNxql(String nxql) { 254 this.nxql = nxql; 255 } 256 257 public String getRepositoryName() { 258 if (repositoryName == null) { 259 List<String> repositoryNames = getRepositoryNames(); 260 if (!repositoryNames.isEmpty()) { 261 repositoryName = repositoryNames.get(0); 262 } 263 } 264 return repositoryName; 265 } 266 267 public void setRepositoryName(String repositoryName) { 268 this.repositoryName = repositoryName; 269 } 270 271 public Boolean getDropIndex() { 272 return dropIndex; 273 } 274 275 public void setDropIndex(Boolean dropIndex) { 276 this.dropIndex = dropIndex; 277 } 278}