001/* 002 * (C) Copyright 2018 Nuxeo (http://nuxeo.com/) and others. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 * 016 * Contributors: 017 * Kevin Leturc <[email protected]> 018 */ 019package org.nuxeo.ecm.core.bulk; 020 021import static org.apache.commons.lang3.StringUtils.isEmpty; 022import static org.nuxeo.ecm.core.bulk.message.BulkStatus.State.ABORTED; 023import static org.nuxeo.ecm.core.bulk.message.BulkStatus.State.COMPLETED; 024import static org.nuxeo.ecm.core.bulk.message.BulkStatus.State.SCHEDULED; 025import static org.nuxeo.ecm.core.bulk.message.BulkStatus.State.UNKNOWN; 026 027import java.io.Serializable; 028import java.time.Duration; 029import java.time.Instant; 030import java.util.List; 031import java.util.Map; 032import java.util.Set; 033import java.util.stream.Collectors; 034 035import org.apache.logging.log4j.Logger; 036import org.nuxeo.ecm.core.api.repository.RepositoryManager; 037import org.nuxeo.ecm.core.bulk.message.BulkCommand; 038import org.nuxeo.ecm.core.bulk.message.BulkStatus; 039import org.nuxeo.lib.stream.computation.Record; 040import org.nuxeo.lib.stream.log.LogAppender; 041import org.nuxeo.lib.stream.log.LogManager; 042import org.nuxeo.runtime.api.Framework; 043import org.nuxeo.runtime.codec.CodecService; 044import org.nuxeo.runtime.kv.KeyValueService; 045import org.nuxeo.runtime.kv.KeyValueStore; 046import org.nuxeo.runtime.kv.KeyValueStoreProvider; 047import org.nuxeo.runtime.stream.StreamService; 048 049/** 050 * Basic implementation of {@link BulkService}. 051 * 052 * @since 10.2 053 */ 054public class BulkServiceImpl implements BulkService { 055 056 private static final Logger log = org.apache.logging.log4j.LogManager.getLogger(BulkServiceImpl.class); 057 058 public static final String BULK_LOG_MANAGER_NAME = "bulk"; 059 060 public static final String BULK_KV_STORE_NAME = "bulk"; 061 062 public static final String COMMAND_STREAM = "command"; 063 064 public static final String STATUS_STREAM = "status"; 065 066 public static final String DONE_STREAM = "done"; 067 068 public static final String RECORD_CODEC = "avro"; 069 070 public static final String COMMAND_PREFIX = "command:"; 071 072 public static final String STATUS_PREFIX = "status:"; 073 074 public static final String PRODUCE_IMMEDIATE_OPTION = "produceImmediate"; 075 076 // How long we keep the command and its status in the kv store once completed 077 public static final long COMPLETED_TTL_SECONDS = 3_600; 078 079 // How long we keep the command and its status in the kv store once aborted 080 public static final long ABORTED_TTL_SECONDS = 7_200; 081 082 @Override 083 public String submit(BulkCommand command) { 084 log.debug("Run action with command={}", command); 085 // check command 086 BulkAdminService adminService = Framework.getService(BulkAdminService.class); 087 if (!adminService.getActions().contains(command.getAction())) { 088 throw new IllegalArgumentException("Unknown action for command: " + command); 089 } 090 BulkActionValidation actionValidation = adminService.getActionValidation(command.getAction()); 091 092 // Try to validate the action if a validation class is provided 093 if (actionValidation != null) { 094 actionValidation.validate(command); 095 } 096 097 RepositoryManager repoManager = Framework.getService(RepositoryManager.class); 098 if (isEmpty(command.getRepository())) { 099 command.setRepository(repoManager.getDefaultRepositoryName()); 100 } else { 101 if (repoManager.getRepository(command.getRepository()) == null) { 102 throw new IllegalArgumentException("Unknown repository: " + command); 103 } 104 } 105 if (command.getBucketSize() == 0 || command.getBatchSize() == 0) { 106 107 if (command.getBucketSize() == 0) { 108 command.setBucketSize(adminService.getBucketSize(command.getAction())); 109 } 110 if (command.getBatchSize() == 0) { 111 command.setBatchSize(adminService.getBatchSize(command.getAction())); 112 } 113 } 114 115 // store the bulk command and status in the key/value store 116 BulkStatus status = new BulkStatus(command.getId()); 117 status.setState(SCHEDULED); 118 status.setAction(command.getAction()); 119 status.setUsername(command.getUsername()); 120 status.setSubmitTime(Instant.now()); 121 setStatus(status); 122 byte[] commandAsBytes = setCommand(command); 123 124 String shardKey; 125 if (adminService.isSequentialCommands(command.getAction())) { 126 // no concurrency all commands for this action goes to the same partition 127 shardKey = command.getAction(); 128 } else { 129 // use a random value 130 shardKey = command.getId(); 131 } 132 // send command to bulk processor 133 LogManager logManager = Framework.getService(StreamService.class).getLogManager(BULK_LOG_MANAGER_NAME); 134 LogAppender<Record> logAppender = logManager.getAppender(COMMAND_STREAM, 135 Framework.getService(CodecService.class).getCodec(RECORD_CODEC, Record.class)); 136 logAppender.append(shardKey, Record.of(command.getId(), commandAsBytes)); 137 return command.getId(); 138 } 139 140 @Override 141 public BulkStatus getStatus(String commandId) { 142 KeyValueStore keyValueStore = getKvStore(); 143 byte[] statusAsBytes = keyValueStore.get(STATUS_PREFIX + commandId); 144 if (statusAsBytes == null) { 145 log.debug("Request status of unknown command: {}", commandId); 146 return BulkStatus.unknownOf(commandId); 147 } 148 return BulkCodecs.getStatusCodec().decode(statusAsBytes); 149 } 150 151 /** 152 * Stores the status in the kv store returns the encoded status 153 */ 154 public byte[] setStatus(BulkStatus status) { 155 KeyValueStore kvStore = getKvStore(); 156 byte[] statusAsBytes = BulkCodecs.getStatusCodec().encode(status); 157 switch (status.getState()) { 158 case ABORTED: 159 kvStore.put(STATUS_PREFIX + status.getId(), statusAsBytes, ABORTED_TTL_SECONDS); 160 // we remove the command from the kv store, so computation have to handle abort 161 kvStore.put(COMMAND_PREFIX + status.getId(), (String) null); 162 break; 163 case COMPLETED: 164 kvStore.put(STATUS_PREFIX + status.getId(), statusAsBytes, COMPLETED_TTL_SECONDS); 165 kvStore.setTTL(COMMAND_PREFIX + status.getId(), COMPLETED_TTL_SECONDS); 166 break; 167 default: 168 kvStore.put(STATUS_PREFIX + status.getId(), statusAsBytes); 169 } 170 return statusAsBytes; 171 } 172 173 @Override 174 public BulkCommand getCommand(String commandId) { 175 KeyValueStore keyValueStore = getKvStore(); 176 byte[] statusAsBytes = keyValueStore.get(COMMAND_PREFIX + commandId); 177 if (statusAsBytes == null) { 178 return null; 179 } 180 return BulkCodecs.getCommandCodec().decode(statusAsBytes); 181 } 182 183 @Override 184 public BulkStatus abort(String commandId) { 185 BulkStatus status = getStatus(commandId); 186 if (COMPLETED.equals(status.getState())) { 187 log.debug("Cannot abort a completed command: {}", commandId); 188 return status; 189 } 190 status.setState(ABORTED); 191 // set the status in the KV store 192 setStatus(status); 193 // Send a delta to the status computation 194 BulkStatus delta = BulkStatus.deltaOf(commandId); 195 delta.setCompletedTime(Instant.now()); 196 delta.setState(ABORTED); 197 byte[] statusAsBytes = BulkCodecs.getStatusCodec().encode(delta); 198 LogManager logManager = Framework.getService(StreamService.class).getLogManager(BULK_LOG_MANAGER_NAME); 199 LogAppender<Record> logAppender = logManager.getAppender(STATUS_STREAM); 200 logAppender.append(commandId, Record.of(commandId, statusAsBytes)); 201 return status; 202 } 203 204 @Override 205 public Map<String, Serializable> getResult(String commandId) { 206 return getStatus(commandId).getResult(); 207 } 208 209 /** 210 * Stores the command in the kv store, returns the encoded command. 211 */ 212 public byte[] setCommand(BulkCommand command) { 213 KeyValueStore kvStore = getKvStore(); 214 byte[] commandAsBytes = BulkCodecs.getCommandCodec().encode(command); 215 kvStore.put(COMMAND_PREFIX + command.getId(), commandAsBytes); 216 return commandAsBytes; 217 } 218 219 @Override 220 public boolean await(String commandId, Duration duration) throws InterruptedException { 221 long deadline = System.currentTimeMillis() + duration.toMillis(); 222 BulkStatus status; 223 do { 224 status = getStatus(commandId); 225 switch (status.getState()) { 226 case COMPLETED: 227 case ABORTED: 228 return true; 229 case UNKNOWN: 230 log.error("Unknown status for command: {}", commandId); 231 return false; 232 default: 233 // continue 234 } 235 Thread.sleep(100); 236 } while (deadline > System.currentTimeMillis()); 237 log.debug("await timeout on {} after {} ms", () -> getStatus(commandId), duration::toMillis); 238 return false; 239 } 240 241 public KeyValueStore getKvStore() { 242 return Framework.getService(KeyValueService.class).getKeyValueStore(BULK_KV_STORE_NAME); 243 } 244 245 @Override 246 public boolean await(Duration duration) throws InterruptedException { 247 KeyValueStoreProvider kv = (KeyValueStoreProvider) getKvStore(); 248 Set<String> commandIds = kv.keyStream(STATUS_PREFIX) 249 .map(k -> k.replaceFirst(STATUS_PREFIX, "")) 250 .collect(Collectors.toSet()); 251 // nanoTime is always monotonous 252 long deadline = System.nanoTime() + duration.toNanos(); 253 for (String commandId : commandIds) { 254 for (;;) { 255 BulkStatus status = getStatus(commandId); 256 BulkStatus.State state = status.getState(); 257 if (state == COMPLETED || state == ABORTED || state == UNKNOWN) { 258 break; 259 } 260 Thread.sleep(200); 261 if (deadline < System.nanoTime()) { 262 log.debug("await timeout, at least one uncompleted command: {}", status); 263 return false; 264 } 265 } 266 } 267 return true; 268 } 269 270 @Override 271 public List<BulkStatus> getStatuses(String username) { 272 KeyValueStoreProvider kv = (KeyValueStoreProvider) getKvStore(); 273 return kv.keyStream(STATUS_PREFIX) 274 .map(kv::get) 275 .map(BulkCodecs.getStatusCodec()::decode) 276 .filter(status -> username.equals(status.getUsername())) 277 .collect(Collectors.toList()); 278 } 279 280}