001/* 002 * (C) Copyright 2018 Nuxeo (http://nuxeo.com/) and others. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 * 016 * Contributors: 017 * Nelson Silva <[email protected]> 018 */ 019package org.nuxeo.ecm.automation.server.jaxrs.adapters; 020 021import java.io.Serializable; 022import java.net.URI; 023import javax.mail.MessagingException; 024import javax.servlet.http.HttpServletRequest; 025import javax.servlet.http.HttpServletResponse; 026import javax.ws.rs.DELETE; 027import javax.ws.rs.GET; 028import javax.ws.rs.POST; 029import javax.ws.rs.Path; 030import javax.ws.rs.PathParam; 031import javax.ws.rs.Produces; 032 033import javax.ws.rs.core.Context; 034import javax.ws.rs.core.MediaType; 035import javax.ws.rs.core.Response; 036 037import org.apache.commons.collections.CollectionUtils; 038import org.apache.commons.lang3.StringUtils; 039import org.apache.logging.log4j.LogManager; 040import org.apache.logging.log4j.Logger; 041import org.nuxeo.ecm.automation.AutomationService; 042import org.nuxeo.ecm.automation.OperationCallback; 043import org.nuxeo.ecm.automation.OperationContext; 044import org.nuxeo.ecm.automation.OperationException; 045import org.nuxeo.ecm.automation.OperationType; 046import org.nuxeo.ecm.automation.core.impl.InvokableMethod; 047import org.nuxeo.ecm.automation.core.util.BlobList; 048import org.nuxeo.ecm.automation.jaxrs.io.operations.ExecutionRequest; 049import org.nuxeo.ecm.automation.server.AutomationServer; 050import org.nuxeo.ecm.automation.server.jaxrs.OperationResource; 051import org.nuxeo.ecm.automation.server.jaxrs.ResponseHelper; 052import org.nuxeo.ecm.core.api.AsyncService; 053import org.nuxeo.ecm.core.api.AsyncStatus; 054import org.nuxeo.ecm.core.api.Blob; 055import org.nuxeo.ecm.core.api.CloseableCoreSession; 056import org.nuxeo.ecm.core.api.CoreInstance; 057import org.nuxeo.ecm.core.api.CoreSession; 058import org.nuxeo.ecm.core.api.DocumentModel; 059import org.nuxeo.ecm.core.api.DocumentModelList; 060import org.nuxeo.ecm.core.api.NuxeoException; 061import org.nuxeo.ecm.core.api.NuxeoPrincipal; 062import org.nuxeo.ecm.core.transientstore.api.TransientStore; 063import org.nuxeo.ecm.core.transientstore.api.TransientStoreService; 064import org.nuxeo.ecm.platform.web.common.vh.VirtualHostHelper; 065import org.nuxeo.ecm.webengine.model.WebAdapter; 066import org.nuxeo.ecm.webengine.model.exceptions.WebResourceNotFoundException; 067import org.nuxeo.ecm.webengine.model.impl.DefaultAdapter; 068import org.nuxeo.runtime.api.Framework; 069import org.nuxeo.runtime.transaction.TransactionHelper; 070 071import java.io.IOException; 072import java.net.URISyntaxException; 073import java.util.Collections; 074import java.util.HashMap; 075import java.util.List; 076import java.util.Map; 077import java.util.UUID; 078 079/** 080 * Adapter that allows asynchronous execution of operations. 081 * 082 * @since 10.3 083 */ 084@WebAdapter(name = AsyncOperationAdapter.NAME, type = "AsyncOperationAdapter", targetType = "operation") 085@Produces({ MediaType.APPLICATION_JSON }) 086public class AsyncOperationAdapter extends DefaultAdapter { 087 088 public static final String NAME = "async"; 089 090 private static final Logger log = LogManager.getLogger(AsyncOperationAdapter.class); 091 092 protected static final String STATUS_STORE_NAME = "automation"; 093 094 protected static final String TRANSIENT_STORE_SERVICE = "service"; 095 096 protected static final String TRANSIENT_STORE_TASK_ID = "taskId"; 097 098 protected static final String TRANSIENT_STORE_ERROR = "error"; 099 100 protected static final String TRANSIENT_STORE_OUTPUT = "output"; 101 102 protected static final String TRANSIENT_STORE_OUTPUT_BLOB = "blob"; 103 104 protected static final String STATUS_PATH= "status"; 105 106 protected static final String RUNNING_STATUS= "RUNNING"; 107 108 protected static final String RESULT_URL_KEY= "url"; 109 110 @Context 111 protected AutomationService service; 112 113 @Context 114 protected HttpServletRequest request; 115 116 @Context 117 protected HttpServletResponse response; 118 119 @Context 120 protected CoreSession session; 121 122 @Context 123 protected AutomationServer srv; 124 125 @POST 126 public Object doPost(ExecutionRequest xreq) { 127 OperationResource op = (OperationResource) getTarget(); 128 String opId = op.getId(); 129 130 if (!srv.accept(opId, op.isChain(), request)) { 131 return ResponseHelper.notFound(); 132 } 133 String executionId = UUID.randomUUID().toString(); 134 135 // session will be set in the task thread 136 OperationContext opCtx = xreq.createContext(request, response, null); 137 138 opCtx.setCallback(new OperationCallback() { 139 140 @Override 141 public void onChainEnter(OperationType chain) { 142 // 143 } 144 145 @Override 146 public void onChainExit() { 147 setCompleted(executionId); 148 } 149 150 @Override 151 public void onOperationEnter(OperationContext context, OperationType type, InvokableMethod method, 152 Map<String, Object> params) { 153 enterMethod(executionId, method); 154 } 155 156 @Override 157 public void onOperationExit(Object output) { 158 setOutput(executionId, (Serializable) output); 159 } 160 161 @Override 162 public OperationException onError(OperationException error) { 163 setError(executionId, error.getMessage()); 164 return error; 165 } 166 167 }); 168 169 String repoName = session.getRepositoryName(); 170 NuxeoPrincipal principal = session.getPrincipal(); 171 172 // TODO NXP-26303: use thread pool 173 new Thread(() -> { 174 TransactionHelper.runInTransaction(() -> { 175 try (CloseableCoreSession session = CoreInstance.openCoreSession(repoName, principal)){ 176 opCtx.setCoreSession(session); 177 service.run(opCtx, opId, xreq.getParams()); 178 } catch (OperationException e) { 179 setError(executionId, e.getMessage()); 180 } 181 }); 182 }, String.format("Nuxeo-AsyncOperation-%s", executionId)).start(); 183 184 try { 185 String statusURL = String.format("%s%s/%s/%s", ctx.getServerURL(), getPath(), executionId, STATUS_PATH); 186 return Response.status(HttpServletResponse.SC_ACCEPTED).location(new URI(statusURL)).build(); 187 } catch (URISyntaxException e) { 188 throw new NuxeoException(e); 189 } 190 } 191 192 @GET 193 @Path("{executionId}/status") 194 public Object status(@PathParam("executionId") String executionId) throws IOException, MessagingException { 195 if (isCompleted(executionId)) { 196 String error = getError(executionId); 197 if (error != null) { 198 throw new NuxeoException(error, HttpServletResponse.SC_INTERNAL_SERVER_ERROR); 199 } 200 String resURL = String.format("%s/%s", getPath(), executionId); 201 return redirect(resURL); 202 } else { 203 Object result = RUNNING_STATUS; 204 if (isAsync(executionId)) { 205 Serializable taskId = getTaskId(executionId); 206 result = getAsyncService(executionId).getStatus(taskId); 207 } 208 return ResponseHelper.getResponse(result, request, HttpServletResponse.SC_OK); 209 } 210 } 211 212 @GET 213 @Path("{executionId}") 214 public Object result(@PathParam("executionId") String executionId) throws IOException, MessagingException { 215 216 if (isCompleted(executionId)) { 217 Object output = getResult(executionId); 218 219 String error = getError(executionId); 220 221 // cleanup after result is accessed 222 cleanup(executionId); 223 224 if (error != null) { 225 throw new NuxeoException(error, HttpServletResponse.SC_INTERNAL_SERVER_ERROR); 226 } 227 228 // if output has a "url" key assume it's a redirect url 229 if (output instanceof Map) { 230 Object url = ((Map<?, ?>) output).get(RESULT_URL_KEY); 231 if (url instanceof String) { 232 String baseUrl = VirtualHostHelper.getBaseURL(ctx.getRequest()); 233 return redirect(baseUrl + url); 234 } 235 } 236 return ResponseHelper.getResponse(output, request, HttpServletResponse.SC_OK); 237 } 238 239 throw new WebResourceNotFoundException("Execution with id=" + executionId + " not found"); 240 } 241 242 @DELETE 243 @Path("{executionId}") 244 public Object abort(@PathParam("executionId") String executionId) throws IOException, MessagingException { 245 if (exists(executionId) && !isCompleted(executionId)) { 246 // TODO NXP-26304: support aborting any execution 247 if (isAsync(executionId)) { 248 Serializable taskId = getTaskId(executionId); 249 return getAsyncService(executionId).abort(taskId); 250 } 251 return ResponseHelper.getResponse(RUNNING_STATUS, request, HttpServletResponse.SC_OK); 252 } 253 throw new WebResourceNotFoundException("Execution with id=" + executionId + " has completed"); 254 } 255 256 protected TransientStore getTransientStore() { 257 return Framework.getService(TransientStoreService.class).getStore(STATUS_STORE_NAME); 258 } 259 260 protected void enterMethod(String executionId, InvokableMethod method) { 261 // reset parameters 262 getTransientStore().remove(executionId); 263 264 // AsyncService.class is default => not async 265 if (!AsyncService.class.equals(method.getAsyncService())) { 266 getTransientStore().putParameter(executionId, TRANSIENT_STORE_SERVICE, method.getAsyncService().getName()); 267 } 268 } 269 270 protected void setError(String executionId, String error) { 271 getTransientStore().putParameter(executionId, TRANSIENT_STORE_ERROR, error); 272 setCompleted(executionId); 273 } 274 275 public String getError(String executionId) { 276 return (String) getTransientStore().getParameter(executionId, TRANSIENT_STORE_ERROR); 277 } 278 279 protected void setOutput(String executionId, Serializable output) { 280 TransientStore ts = getTransientStore(); 281 // store only taskId for async tasks 282 if (isAsync(executionId)) { 283 Serializable taskId = output instanceof AsyncStatus ? ((AsyncStatus) output).getId() : output; 284 ts.putParameter(executionId, TRANSIENT_STORE_TASK_ID, taskId); 285 } else { 286 if (output instanceof DocumentModel) { 287 detach((DocumentModel) output); 288 } else if (output instanceof DocumentModelList) { 289 ((DocumentModelList) output).forEach(this::detach); 290 } 291 if (output instanceof Blob) { 292 ts.putParameter(executionId, TRANSIENT_STORE_OUTPUT_BLOB, true); 293 ts.putBlobs(executionId, Collections.singletonList((Blob) output)); 294 } else if (output instanceof BlobList) { 295 ts.putParameter(executionId, TRANSIENT_STORE_OUTPUT_BLOB, false); 296 ts.putBlobs(executionId, (BlobList) output); 297 } else { 298 ts.putParameter(executionId, TRANSIENT_STORE_OUTPUT, output); 299 } 300 } 301 } 302 303 protected Object getResult(String executionId) { 304 TransientStore ts = getTransientStore(); 305 306 if (isAsync(executionId)) { 307 AsyncService service = getAsyncService(executionId); 308 if (service != null) { 309 Serializable taskId = ts.getParameter(executionId, TRANSIENT_STORE_TASK_ID); 310 return service.getResult(taskId); 311 } 312 } 313 314 Object output; 315 List<Blob> blobs = ts.getBlobs(executionId); 316 if (CollectionUtils.isNotEmpty(blobs)) { 317 boolean isSingle = (boolean) ts.getParameter(executionId, TRANSIENT_STORE_OUTPUT_BLOB); 318 output = isSingle ? blobs.get(0) : new BlobList(blobs); 319 } else { 320 output = ts.getParameter(executionId, TRANSIENT_STORE_OUTPUT); 321 } 322 if (output instanceof DocumentModel) { 323 attach((DocumentModel) output); 324 } else if (output instanceof DocumentModelList) { 325 ((DocumentModelList) output).forEach(this::attach); 326 } 327 return output; 328 } 329 330 protected void attach(DocumentModel doc) { 331 String sid = ctx.getCoreSession().getSessionId(); 332 doc.attach(sid); 333 } 334 335 protected void detach(DocumentModel doc) { 336 doc.detach(false); 337 } 338 339 protected boolean isAsync(String executionId) { 340 return getTransientStore().getParameter(executionId, TRANSIENT_STORE_SERVICE) != null; 341 } 342 343 protected Serializable getTaskId(String executionId) { 344 return getTransientStore().getParameter(executionId, TRANSIENT_STORE_TASK_ID); 345 } 346 347 protected AsyncService getAsyncService(String executionId) { 348 String serviceClass = (String) getTransientStore().getParameter(executionId, TRANSIENT_STORE_SERVICE); 349 try { 350 return (AsyncService) Framework.getService(Class.forName(serviceClass)); 351 } catch (ClassNotFoundException e) { 352 log.error("AsyncService class {} not found", serviceClass); 353 return null; 354 } 355 } 356 357 protected void setCompleted(String executionId) { 358 getTransientStore().setCompleted(executionId, true); 359 } 360 361 protected boolean isCompleted(String executionId) { 362 if (isAsync(executionId)) { 363 Serializable taskId = getTransientStore().getParameter(executionId, TRANSIENT_STORE_TASK_ID); 364 return getAsyncService(executionId).getStatus(taskId).isCompleted(); 365 } 366 return getTransientStore().isCompleted(executionId); 367 } 368 369 protected boolean exists(String executionId) { 370 return getTransientStore().exists(executionId); 371 } 372 373 protected void cleanup(String executionId) { 374 getTransientStore().release(executionId); 375 } 376}