001/* 002 * (C) Copyright 2015-2018 Nuxeo (http://nuxeo.com/) and others. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 * 016 * Contributors: 017 * Thierry Delprat <[email protected]> 018 * Antoine Taillefer <[email protected]> 019 * 020 */ 021package org.nuxeo.ecm.automation.server.jaxrs.batch; 022 023import static org.apache.commons.lang3.StringUtils.isEmpty; 024 025import java.io.IOException; 026import java.io.InputStream; 027import java.util.Collections; 028import java.util.HashMap; 029import java.util.List; 030import java.util.Map; 031import java.util.Objects; 032import java.util.Set; 033import java.util.concurrent.atomic.AtomicInteger; 034 035import org.apache.logging.log4j.LogManager; 036import org.apache.logging.log4j.Logger; 037import org.nuxeo.ecm.automation.AutomationService; 038import org.nuxeo.ecm.automation.OperationContext; 039import org.nuxeo.ecm.automation.OperationException; 040import org.nuxeo.ecm.automation.core.util.BlobList; 041import org.nuxeo.ecm.automation.core.util.ComplexTypeJSONDecoder; 042import org.nuxeo.ecm.automation.server.AutomationServer; 043import org.nuxeo.ecm.automation.server.RestBinding; 044import org.nuxeo.ecm.automation.server.jaxrs.batch.handler.BatchHandlerDescriptor; 045import org.nuxeo.ecm.core.api.Blob; 046import org.nuxeo.ecm.core.api.Blobs; 047import org.nuxeo.ecm.core.api.CoreSession; 048import org.nuxeo.ecm.core.api.NuxeoException; 049import org.nuxeo.ecm.core.api.NuxeoPrincipal; 050import org.nuxeo.ecm.core.transientstore.api.TransientStore; 051import org.nuxeo.ecm.core.transientstore.api.TransientStoreService; 052import org.nuxeo.ecm.webengine.model.exceptions.WebSecurityException; 053import org.nuxeo.runtime.api.Framework; 054import org.nuxeo.runtime.model.ComponentContext; 055import org.nuxeo.runtime.model.DefaultComponent; 056 057/** 058 * Runtime Component implementing the {@link BatchManager} service with the {@link TransientStore}. 059 * 060 * @since 5.4.2 061 */ 062public class BatchManagerComponent extends DefaultComponent implements BatchManager { 063 064 private static final Logger log = LogManager.getLogger(BatchManagerComponent.class); 065 066 public static final String CLIENT_BATCH_ID_FLAG = "allowClientGeneratedBatchId"; 067 068 /** 069 * The default batch handler name. 070 * 071 * @since 10.1 072 */ 073 public static final String DEFAULT_BATCH_HANDLER = "default"; 074 075 /** @since 10.1 */ 076 public static final String XP_BATCH_HANDLER = "handlers"; 077 078 protected Map<String, BatchHandler> handlers = new HashMap<>(); 079 080 protected final AtomicInteger uploadInProgress = new AtomicInteger(0); 081 082 static { 083 ComplexTypeJSONDecoder.registerBlobDecoder(new JSONBatchBlobDecoder()); 084 } 085 086 @Override 087 public void start(ComponentContext context) { 088 super.start(context); 089 List<BatchHandlerDescriptor> descriptors = getDescriptors(XP_BATCH_HANDLER); 090 descriptors.forEach(d -> { 091 try { 092 BatchHandler handler = d.klass.getDeclaredConstructor().newInstance(); 093 handler.initialize(d.name, d.properties); 094 handlers.put(d.name, handler); 095 } catch (ReflectiveOperationException e) { 096 log.error("Unable to instantiate batch handler", e); 097 } 098 }); 099 } 100 101 @Override 102 public void stop(ComponentContext context) throws InterruptedException { 103 super.stop(context); 104 handlers.clear(); 105 } 106 107 @Override 108 @Deprecated 109 public TransientStore getTransientStore() { 110 return getHandler(DEFAULT_BATCH_HANDLER).getTransientStore(); 111 } 112 113 @Override 114 public Set<String> getSupportedHandlers() { 115 return Collections.unmodifiableSet(handlers.keySet()); 116 } 117 118 @Override 119 public BatchHandler getHandler(String handlerName) { 120 return handlers.get(handlerName); 121 } 122 123 @Override 124 public String initBatch() { 125 Batch batch = initBatchInternal(null); 126 return batch.getKey(); 127 } 128 129 @Override 130 @Deprecated 131 public String initBatch(String batchId, String contextName) { 132 Batch batch = initBatchInternal(batchId); 133 return batch.getKey(); 134 } 135 136 protected Batch initBatchInternal(String batchId) { 137 BatchHandler batchHandler = handlers.get(DEFAULT_BATCH_HANDLER); 138 return batchHandler.newBatch(batchId); 139 } 140 141 @Override 142 public Batch initBatch(String handlerName) { 143 if (isEmpty(handlerName)) { 144 handlerName = DEFAULT_BATCH_HANDLER; 145 } 146 BatchHandler batchHandler = handlers.get(handlerName); 147 if (batchHandler == null) { 148 throw new IllegalArgumentException("Batch handler does not exist: " + handlerName); 149 } 150 return batchHandler.newBatch(null); 151 } 152 153 @Override 154 public Batch getBatch(String batchId) { 155 return handlers.values() 156 .stream() 157 .map(batchHandler -> batchHandler.getBatch(batchId)) 158 .filter(Objects::nonNull) 159 .findFirst() 160 .orElse(null); 161 } 162 163 @Override 164 public void addStream(String batchId, String index, InputStream is, String name, String mime) throws IOException { 165 Blob blob = Blobs.createBlob(is); 166 addBlob(batchId, index, blob, name, mime); 167 } 168 169 @Override 170 public void addBlob(String batchId, String index, Blob blob, String name, String mime) throws IOException { 171 uploadInProgress.incrementAndGet(); 172 try { 173 Batch batch = getBatch(batchId); 174 if (batch == null) { 175 batch = initBatchInternal(batchId); 176 } 177 batch.addFile(index, blob, name, mime); 178 log.debug("Added file {} [{}] to batch {}", index, name, batch.getKey()); 179 } finally { 180 uploadInProgress.decrementAndGet(); 181 } 182 } 183 184 @Override 185 public void addStream(String batchId, String index, InputStream is, int chunkCount, int chunkIndex, String name, 186 String mime, long fileSize) throws IOException { 187 Blob blob = Blobs.createBlob(is); 188 addBlob(batchId, index, blob, chunkCount, chunkIndex, name, mime, fileSize); 189 } 190 191 @Override 192 public void addBlob(String batchId, String index, Blob blob, int chunkCount, int chunkIndex, String name, 193 String mime, long fileSize) throws IOException { 194 uploadInProgress.incrementAndGet(); 195 try { 196 Batch batch = getBatch(batchId); 197 if (batch == null) { 198 batch = initBatchInternal(batchId); 199 } 200 batch.addChunk(index, blob, chunkCount, chunkIndex, name, mime, fileSize); 201 log.debug("Added chunk {} to file {} [{}] in batch {}", chunkIndex, index, name, batch.getKey()); 202 } finally { 203 uploadInProgress.decrementAndGet(); 204 } 205 } 206 207 @Override 208 public boolean hasBatch(String batchId) { 209 return handlers.values().stream().anyMatch(batchHandler -> batchHandler.getBatch(batchId) != null); 210 } 211 212 @Override 213 public List<Blob> getBlobs(String batchId) { 214 return getBlobs(batchId, 0); 215 } 216 217 @Override 218 public List<Blob> getBlobs(String batchId, int timeoutS) { 219 if (uploadInProgress.get() > 0 && timeoutS > 0) { 220 for (int i = 0; i < timeoutS * 5; i++) { 221 try { 222 Thread.sleep(200); 223 } catch (InterruptedException e) { 224 Thread.currentThread().interrupt(); 225 } 226 if (uploadInProgress.get() == 0) { 227 break; 228 } 229 } 230 } 231 Batch batch = getBatch(batchId); 232 if (batch == null) { 233 log.error("Unable to find batch with id {}", batchId); 234 return Collections.emptyList(); 235 } 236 return batch.getBlobs(); 237 } 238 239 @Override 240 public Blob getBlob(String batchId, String fileIndex) { 241 return getBlob(batchId, fileIndex, 0); 242 } 243 244 @Override 245 public Blob getBlob(String batchId, String fileIndex, int timeoutS) { 246 Blob blob = getBatchBlob(batchId, fileIndex); 247 if (blob == null && timeoutS > 0 && uploadInProgress.get() > 0) { 248 for (int i = 0; i < timeoutS * 5; i++) { 249 try { 250 Thread.sleep(200); 251 } catch (InterruptedException e) { 252 Thread.currentThread().interrupt(); 253 } 254 blob = getBatchBlob(batchId, fileIndex); 255 if (blob != null) { 256 break; 257 } 258 } 259 } 260 if (!hasBatch(batchId)) { 261 log.error("Unable to find batch with id {}", batchId); 262 return null; 263 } 264 return blob; 265 } 266 267 protected Blob getBatchBlob(String batchId, String fileIndex) { 268 Blob blob = null; 269 Batch batch = getBatch(batchId); 270 if (batch != null) { 271 blob = batch.getBlob(fileIndex); 272 } 273 return blob; 274 } 275 276 @Override 277 public List<BatchFileEntry> getFileEntries(String batchId) { 278 Batch batch = getBatch(batchId); 279 if (batch == null) { 280 return null; 281 } 282 return batch.getFileEntries(); 283 } 284 285 @Override 286 public BatchFileEntry getFileEntry(String batchId, String fileIndex) { 287 Batch batch = getBatch(batchId); 288 if (batch == null) { 289 return null; 290 } 291 return batch.getFileEntry(fileIndex); 292 } 293 294 @Override 295 public void clean(String batchId) { 296 Batch batch = getBatch(batchId); 297 if (batch != null) { 298 batch.clean(); 299 } 300 } 301 302 @Override 303 public Object execute(String batchId, String chainOrOperationId, CoreSession session, 304 Map<String, Object> contextParams, Map<String, Object> operationParams) { 305 List<Blob> blobs = getBlobs(batchId, getUploadWaitTimeout()); 306 if (blobs == null) { 307 String message = String.format("Unable to find batch associated with id '%s'", batchId); 308 log.error(message); 309 throw new NuxeoException(message); 310 } 311 return execute(new BlobList(blobs), chainOrOperationId, session, contextParams, operationParams); 312 } 313 314 @Override 315 public Object execute(String batchId, String fileIndex, String chainOrOperationId, CoreSession session, 316 Map<String, Object> contextParams, Map<String, Object> operationParams) { 317 Blob blob = getBlob(batchId, fileIndex, getUploadWaitTimeout()); 318 if (blob == null) { 319 String message = String.format( 320 "Unable to find batch associated with id '%s' or file associated with index '%s'", batchId, 321 fileIndex); 322 log.error(message); 323 throw new NuxeoException(message); 324 } 325 return execute(blob, chainOrOperationId, session, contextParams, operationParams); 326 } 327 328 protected Object execute(Object blobInput, String chainOrOperationId, CoreSession session, 329 Map<String, Object> contextParams, Map<String, Object> operationParams) { 330 if (contextParams == null) { 331 contextParams = new HashMap<>(); 332 } 333 if (operationParams == null) { 334 operationParams = new HashMap<>(); 335 } 336 337 try (OperationContext ctx = new OperationContext(session)) { 338 339 AutomationServer server = Framework.getService(AutomationServer.class); 340 RestBinding binding = server.getOperationBinding(chainOrOperationId); 341 342 if (binding != null && binding.isAdministrator) { 343 NuxeoPrincipal principal = ctx.getPrincipal(); 344 if (!principal.isAdministrator()) { 345 String message = "Not allowed. You must be administrator to use this operation"; 346 log.error(message); 347 throw new WebSecurityException(message); 348 } 349 } 350 351 ctx.setInput(blobInput); 352 ctx.putAll(contextParams); 353 354 AutomationService as = Framework.getService(AutomationService.class); 355 // Drag and Drop action category is accessible from the chain sub context as chain parameters 356 return as.run(ctx, chainOrOperationId, operationParams); 357 } catch (OperationException e) { 358 log.error("Error while executing automation batch ", e); 359 throw new NuxeoException(e); 360 } 361 } 362 363 protected int getUploadWaitTimeout() { 364 String t = Framework.getProperty("org.nuxeo.batch.upload.wait.timeout", "5"); 365 try { 366 return Integer.parseInt(t); 367 } catch (NumberFormatException e) { 368 log.error("Wrong number format for upload wait timeout property", e); 369 return 5; 370 } 371 } 372 373 @Override 374 public Object executeAndClean(String batchId, String chainOrOperationId, CoreSession session, 375 Map<String, Object> contextParams, Map<String, Object> operationParams) { 376 try { 377 return execute(batchId, chainOrOperationId, session, contextParams, operationParams); 378 } finally { 379 clean(batchId); 380 } 381 } 382 383 @Override 384 public boolean removeFileEntry(String batchId, String filedIdx) { 385 Batch batch = getBatch(batchId); 386 return batch != null && batch.removeFileEntry(filedIdx); 387 } 388}