001/* 002 * (C) Copyright 2018 Nuxeo (http://nuxeo.com/) and others. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 * 016 * Contributors: 017 * Nelson Silva <[email protected]> 018 */ 019 020package org.nuxeo.ecm.automation.client.adapters; 021 022import org.apache.commons.collections.CollectionUtils; 023import org.apache.http.Header; 024import org.apache.http.HttpHeaders; 025import org.apache.http.HttpStatus; 026import org.apache.http.client.protocol.HttpClientContext; 027import org.apache.http.protocol.HttpContext; 028import org.nuxeo.ecm.automation.client.AutomationClient; 029import org.nuxeo.ecm.automation.client.LoginInfo; 030import org.nuxeo.ecm.automation.client.OperationRequest; 031import org.nuxeo.ecm.automation.client.RemoteException; 032import org.nuxeo.ecm.automation.client.Session; 033import org.nuxeo.ecm.automation.client.jaxrs.spi.Connector; 034import org.nuxeo.ecm.automation.client.jaxrs.spi.DefaultOperationRequest; 035import org.nuxeo.ecm.automation.client.jaxrs.spi.DefaultSession; 036import org.nuxeo.ecm.automation.client.jaxrs.spi.JsonMarshalling; 037import org.nuxeo.ecm.automation.client.jaxrs.spi.Request; 038import org.nuxeo.ecm.automation.client.jaxrs.util.MultipartInput; 039import org.nuxeo.ecm.automation.client.model.Blob; 040import org.nuxeo.ecm.automation.client.model.Blobs; 041import org.nuxeo.ecm.automation.client.model.OperationDocumentation; 042import org.nuxeo.ecm.automation.client.model.OperationInput; 043 044import java.io.IOException; 045import java.io.InputStream; 046import java.time.Duration; 047import java.util.HashMap; 048import java.util.List; 049import java.util.Map; 050import java.util.concurrent.CompletableFuture; 051import java.util.concurrent.ExecutionException; 052import java.util.concurrent.ExecutorService; 053import java.util.concurrent.Executors; 054import java.util.concurrent.Future; 055 056import static org.nuxeo.ecm.automation.client.Constants.CTYPE_REQUEST_NOCHARSET; 057import static org.nuxeo.ecm.automation.client.Constants.HEADER_NX_SCHEMAS; 058import static org.nuxeo.ecm.automation.client.Constants.REQUEST_ACCEPT_HEADER; 059 060/** 061 * Asynchronous session adapter. 062 * @since 10.3 063 */ 064public class AsyncSession implements Session { 065 066 protected static ExecutorService executor = Executors.newSingleThreadExecutor(); 067 068 /** 069 * Request providing a completable call method for convenience. 070 */ 071 public class CompletableRequest extends Request { 072 073 protected CompletableFuture<CompletableRequest> future; 074 075 protected int status; 076 077 protected Header[] headers; 078 079 protected Object result; 080 081 protected boolean redirected; 082 083 public CompletableRequest(int method, String url) { 084 super(method, url, (String) null); 085 } 086 087 public CompletableRequest(int method, String url, String entity) { 088 super(method, url, entity); 089 } 090 091 public CompletableRequest(int method, String url, MultipartInput input) { 092 super(method, url, input); 093 } 094 095 @Override 096 public Object handleResult(int status, Header[] headers, InputStream stream, HttpContext ctx) 097 throws RemoteException, IOException { 098 this.status = status; 099 this.headers = headers; 100 List redirects = (List) ctx.getAttribute(HttpClientContext.REDIRECT_LOCATIONS); 101 this.redirected = CollectionUtils.isNotEmpty(redirects); 102 try { 103 this.result = super.handleResult(status, headers, stream, ctx); 104 future.complete(this); 105 } catch (RemoteException e) { 106 future.completeExceptionally(e); 107 } 108 return result; 109 } 110 111 protected AsyncSession getSession() { 112 return AsyncSession.this; 113 } 114 115 protected String getHeader(String name) { 116 return Request.getHeaderValue(headers, name); 117 } 118 119 public CompletableFuture<? extends CompletableRequest> call() { 120 future = new CompletableFuture<>(); 121 try { 122 getSession().getConnector().execute(this); 123 } catch (IOException e) { 124 future.completeExceptionally(e); 125 } 126 return future; 127 } 128 129 public int getStatus() { 130 return status; 131 } 132 133 public Object getResult() { 134 return result; 135 } 136 137 public boolean isRedirected() { 138 return redirected; 139 } 140 } 141 142 /** 143 * Asynchronous pooling based request 144 */ 145 public class AsyncRequest extends CompletableRequest { 146 147 protected static final String ASYNC_ADAPTER = "/@async"; 148 149 public AsyncRequest(int method, String url, String entity) { 150 super(method, url + ASYNC_ADAPTER, entity); 151 } 152 153 public AsyncRequest(int method, String url, MultipartInput input) { 154 super(method, url + ASYNC_ADAPTER, input); 155 } 156 157 protected AsyncSession getSession() { 158 return AsyncSession.this; 159 } 160 161 public CompletableFuture<Object> execute() { 162 return call().thenCompose((req) -> { 163 if (req.getStatus() == HttpStatus.SC_ACCEPTED) { 164 String location = req.getHeader(HttpHeaders.LOCATION); 165 return poll(location, Duration.ofSeconds(1), Duration.ofSeconds(30)); 166 } 167 return CompletableFuture.completedFuture(req.getResult()); 168 }); 169 } 170 171 protected CompletableFuture<Object> poll(String location, Duration delay, Duration duration) { 172 CompletableFuture<Object> resultFuture = new CompletableFuture<>(); 173 long deadline = System.nanoTime() + duration.toNanos(); 174 CompletableRequest req = new CompletableRequest(Request.GET, location); 175 Future pollFuture = executor.submit(() -> { 176 do { 177 req.call().thenAccept(res -> { 178 if (req.isRedirected()) { 179 resultFuture.complete(res.getResult()); 180 } 181 }).exceptionally(ex -> { 182 resultFuture.completeExceptionally(ex.getCause()); 183 return null; 184 }); 185 try { 186 Thread.sleep(delay.toMillis()); 187 } catch (InterruptedException e) { 188 // interrupted when result is complete 189 return; 190 } 191 } while (deadline > System.nanoTime()); 192 }); 193 resultFuture.whenComplete((result, thrown) -> { 194 pollFuture.cancel(true); 195 }); 196 return resultFuture; 197 } 198 } 199 200 protected final DefaultSession session; 201 202 public AsyncSession(DefaultSession session) { 203 this.session = session; 204 } 205 206 public Session getSession() { 207 return session; 208 } 209 210 @Override 211 public AutomationClient getClient() { 212 return session.getClient(); 213 } 214 215 @Override 216 public LoginInfo getLogin() { 217 return session.getLogin(); 218 } 219 220 @Override 221 public OperationRequest newRequest(String id) { 222 return newRequest(id, new HashMap<>()); 223 } 224 225 @Override 226 public OperationRequest newRequest(String id, Map<String, Object> ctx) { 227 OperationDocumentation op = getOperation(id); 228 if (op == null) { 229 throw new IllegalArgumentException("No such operation: " + id); 230 } 231 return new DefaultOperationRequest(this, op, ctx); 232 } 233 234 @Override 235 public Object execute(OperationRequest request) throws IOException { 236 AsyncRequest req; 237 String content = JsonMarshalling.writeRequest(request); 238 String ctype; 239 Object input = request.getInput(); 240 if (input instanceof OperationInput && ((OperationInput) input).isBinary()) { 241 MultipartInput mpinput = Request.buildMultipartInput(input, content); 242 req = new AsyncRequest(Request.POST, request.getUrl(), mpinput); 243 ctype = mpinput.getContentType(); 244 } else { 245 req = new AsyncRequest(Request.POST, request.getUrl(), content); 246 ctype = CTYPE_REQUEST_NOCHARSET; 247 } 248 // set headers 249 for (Map.Entry<String, String> entry : request.getHeaders().entrySet()) { 250 req.put(entry.getKey(), entry.getValue()); 251 } 252 req.put(HttpHeaders.ACCEPT, REQUEST_ACCEPT_HEADER); 253 req.put(HttpHeaders.CONTENT_TYPE, ctype); 254 if (req.get(HEADER_NX_SCHEMAS) == null && session.getDefaultSchemas() != null) { 255 req.put(HEADER_NX_SCHEMAS, session.getDefaultSchemas()); 256 } 257 try { 258 return req.execute().get(); 259 } catch (ExecutionException e) { 260 if (e.getCause() instanceof RemoteException) { 261 throw (RemoteException) e.getCause(); 262 } 263 throw new IOException(e); 264 } catch (InterruptedException e) { 265 Thread.currentThread().interrupt(); 266 throw new RuntimeException(e); 267 } 268 } 269 270 @Override 271 public Blob getFile(String path) throws IOException { 272 return session.getFile(path); 273 } 274 275 @Override 276 public Blobs getFiles(String path) throws IOException { 277 return session.getFiles(path); 278 } 279 280 @Override 281 public OperationDocumentation getOperation(String id) { 282 return session.getOperation(id); 283 } 284 285 @Override 286 public Map<String, OperationDocumentation> getOperations() { 287 return session.getOperations(); 288 } 289 290 @Override 291 public <T> T getAdapter(Class<T> type) { 292 return session.getAdapter(type); 293 } 294 295 @Override 296 public String getDefaultSchemas() { 297 return session.getDefaultSchemas(); 298 } 299 300 @Override 301 public void setDefaultSchemas(String defaultSchemas) { 302 session.setDefaultSchemas(defaultSchemas); 303 } 304 305 @Override 306 public void close() { 307 session.close(); 308 } 309 310 public Connector getConnector() { 311 return session.getConnector(); 312 } 313}