001/* 002 * (C) Copyright 2012-2018 Nuxeo (http://nuxeo.com/) and others. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 * 016 * Contributors: 017 * Vladimir Pasquier <[email protected]> 018 */ 019 020package org.nuxeo.ecm.platform.routing.api.operation; 021 022import java.io.Serializable; 023import java.util.ArrayList; 024import java.util.List; 025import java.util.Map; 026 027import org.apache.commons.lang3.StringUtils; 028import org.apache.commons.logging.Log; 029import org.apache.commons.logging.LogFactory; 030import org.javasimon.SimonManager; 031import org.javasimon.Split; 032import org.nuxeo.ecm.automation.OperationContext; 033import org.nuxeo.ecm.automation.core.Constants; 034import org.nuxeo.ecm.automation.core.annotations.Context; 035import org.nuxeo.ecm.automation.core.annotations.Operation; 036import org.nuxeo.ecm.automation.core.annotations.OperationMethod; 037import org.nuxeo.ecm.automation.core.annotations.Param; 038import org.nuxeo.ecm.core.api.CloseableCoreSession; 039import org.nuxeo.ecm.core.api.CoreInstance; 040import org.nuxeo.ecm.core.api.CoreSession; 041import org.nuxeo.ecm.core.api.DocumentModel; 042import org.nuxeo.ecm.core.api.IdRef; 043import org.nuxeo.ecm.core.api.IterableQueryResult; 044import org.nuxeo.ecm.core.api.NuxeoException; 045import org.nuxeo.ecm.platform.routing.api.DocumentRoute; 046import org.nuxeo.ecm.platform.routing.api.DocumentRoutingService; 047import org.nuxeo.runtime.api.Framework; 048import org.nuxeo.runtime.transaction.TransactionHelper; 049 050/** 051 * Bulk operation to cancel and restart all the workflow instances of the workflow model with the id 052 * <param>workflowId</param>. If the <param> nodeId</param> parameter is specified, then only the workflows suspened on 053 * that node are restarted. 054 * 055 * @since 5.7 056 */ 057@Operation(id = BulkRestartWorkflow.ID, category = Constants.CAT_WORKFLOW, label = "Bulk Restart Workflow", description = "Bulk operation to restart workflows.", aliases = { "BulkRestartWorkflow" }) 058public class BulkRestartWorkflow { 059 060 public static final String ID = "WorkflowModel.BulkRestartInstances"; 061 062 private static final Log log = LogFactory.getLog(BulkRestartWorkflow.class); 063 064 @Param(name = "workflowId", required = true) 065 protected String workflowId; 066 067 @Param(name = "nodeId", required = false) 068 protected String nodeId; 069 070 @Param(name = "reinitLifecycle", required = false) 071 protected boolean reinitLifecycle; 072 073 @Param(name = "batchSize", required = false) 074 protected Integer batchSize; 075 076 @Context 077 protected OperationContext ctx; 078 079 public static final int DEFAULT_BATCH_SIZE = 1000; 080 081 @OperationMethod 082 public void run() { 083 CloseableCoreSession session = null; 084 boolean transactionStarted = false; 085 Split split = SimonManager.getStopwatch(ID).start(); 086 try { 087 session = CoreInstance.openCoreSession(null); 088 089 // Fetching all routes 090 // If the nodeId parameter is null, fetch all the workflow routes 091 // with 092 // the given workflowId 093 String query = "Select %s from DocumentRoute where (ecm:name like '%s.%%' OR ecm:name like '%s') and ecm:currentLifeCycleState = 'running'"; 094 String key = "ecm:uuid"; 095 if (StringUtils.isEmpty(nodeId)) { 096 if (StringUtils.isEmpty(workflowId)) { 097 log.error("Need to specify either the workflowModelId either the nodeId to query the workflows"); 098 return; 099 } 100 query = String.format(query, key, workflowId, workflowId); 101 } else { 102 query = "Select %s from RouteNode where rnode:nodeId = '%s' and ecm:currentLifeCycleState = 'suspended'"; 103 key = "ecm:parentId"; 104 if (StringUtils.isEmpty(nodeId)) { 105 log.error("Need to specify either the workflowModelId either the nodeId to query the workflows"); 106 return; 107 } 108 query = String.format(query, key, nodeId); 109 } 110 111 IterableQueryResult results = session.queryAndFetch(query, "NXQL"); 112 List<String> routeIds = new ArrayList<>(); 113 for (Map<String, Serializable> result : results) { 114 routeIds.add(result.get(key).toString()); 115 } 116 results.close(); 117 DocumentRoutingService routingService = Framework.getService(DocumentRoutingService.class); 118 // Batching initialization 119 if (batchSize == null) { 120 batchSize = DEFAULT_BATCH_SIZE; 121 } 122 123 if (!TransactionHelper.isTransactionActive()) { 124 TransactionHelper.startTransaction(); 125 transactionStarted = true; 126 } 127 long routesRestartedCount = 0; 128 for (String routeId : routeIds) { 129 try { 130 DocumentModel docRoute = session.getDocument(new IdRef(routeId)); 131 DocumentRoute route = docRoute.getAdapter(DocumentRoute.class); 132 List<String> relatedDocIds = route.getAttachedDocuments(); 133 route.cancel(session); 134 135 log.debug("Canceling workflow " + route.getDocument().getName()); 136 137 if (reinitLifecycle) { 138 reinitLifecycle(relatedDocIds, session); 139 } 140 routingService.createNewInstance(workflowId, relatedDocIds, session, true); 141 for (String string : relatedDocIds) { 142 log.debug("Starting workflow for " + string); 143 } 144 // removing old workflow instance 145 session.removeDocument(route.getDocument().getRef()); 146 147 routesRestartedCount++; 148 if (routesRestartedCount % batchSize == 0) { 149 session.close(); 150 TransactionHelper.commitOrRollbackTransaction(); 151 TransactionHelper.startTransaction(); 152 session = CoreInstance.openCoreSession(null); 153 } 154 } catch (NuxeoException e) { 155 Throwable t = unwrapException(e); 156 log.error(t.getClass().getSimpleName() + ": " + t.getMessage()); 157 log.error("Workflow with the docId '" + routeId + "' cannot be canceled. " + routesRestartedCount 158 + " workflows have been processed."); 159 } 160 } 161 } finally { 162 if (session != null) { 163 session.close(); 164 } 165 TransactionHelper.commitOrRollbackTransaction(); 166 if (!transactionStarted) { 167 TransactionHelper.startTransaction(); 168 } 169 split.stop(); 170 log.info(split.toString()); 171 } 172 } 173 174 public static Throwable unwrapException(Throwable t) { 175 Throwable cause = null; 176 if (t != null) { 177 cause = t.getCause(); 178 } 179 if (cause == null) { 180 return t; 181 } else { 182 return unwrapException(cause); 183 } 184 } 185 186 protected void reinitLifecycle(List<String> docIds, CoreSession session) { 187 for (String docId : docIds) { 188 session.reinitLifeCycleState(new IdRef(docId)); 189 } 190 } 191 192}