001/* 002 * (C) Copyright 2015-2018 Nuxeo (http://nuxeo.com/) and others. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 * 016 * Contributors: 017 * Antoine Taillefer <[email protected]> 018 */ 019package org.nuxeo.ecm.automation.server.jaxrs.batch; 020 021import java.io.File; 022import java.io.FileOutputStream; 023import java.io.IOException; 024import java.io.InputStream; 025import java.io.OutputStream; 026import java.io.Serializable; 027import java.util.ArrayList; 028import java.util.Collection; 029import java.util.Collections; 030import java.util.HashMap; 031import java.util.List; 032import java.util.Map; 033 034import org.apache.commons.collections.CollectionUtils; 035import org.apache.commons.io.IOUtils; 036import org.apache.commons.lang3.StringUtils; 037import org.apache.commons.lang3.math.NumberUtils; 038import org.apache.commons.logging.Log; 039import org.apache.commons.logging.LogFactory; 040import org.nuxeo.ecm.core.api.Blob; 041import org.nuxeo.ecm.core.api.Blobs; 042import org.nuxeo.ecm.core.api.NuxeoException; 043import org.nuxeo.ecm.core.transientstore.api.TransientStore; 044import org.nuxeo.runtime.api.Framework; 045 046/** 047 * Represents a batch file backed by the {@link TransientStore}. 048 * <p> 049 * The file can be chunked or not. If it is chunked it references its chunks as {@link TransientStore} entry keys. 050 * 051 * @since 7.4 052 * @see Batch 053 */ 054public class BatchFileEntry { 055 056 protected static final Log log = LogFactory.getLog(BatchFileEntry.class); 057 058 protected TransientStore transientStore; 059 060 protected String key; 061 062 protected Map<String, Serializable> params; 063 064 protected Blob blob; 065 066 protected Blob chunkedBlob; 067 068 /** 069 * Returns a file entry that holds the given blob, not chunked. 070 */ 071 public BatchFileEntry(TransientStore transientStore, String key, Blob blob) { 072 this(transientStore, key, false); 073 this.blob = blob; 074 } 075 076 /** 077 * Returns a file entry that references the file chunks. 078 */ 079 public BatchFileEntry(TransientStore transientStore, String key, int chunkCount, String fileName, String mimeType, 080 long fileSize) { 081 this(transientStore, key, true); 082 params.put("chunkCount", String.valueOf(chunkCount)); 083 if (!StringUtils.isEmpty(fileName)) { 084 params.put("fileName", fileName); 085 } 086 if (!StringUtils.isEmpty(mimeType)) { 087 params.put("mimeType", mimeType); 088 } 089 params.put("fileSize", String.valueOf(fileSize)); 090 } 091 092 /** 093 * Returns a file entry that holds the given parameters. 094 */ 095 public BatchFileEntry(TransientStore transientStore, String key, Map<String, Serializable> params) { 096 this.transientStore = transientStore; 097 this.key = key; 098 this.params = params; 099 } 100 101 protected BatchFileEntry(TransientStore transientStore, String key, boolean chunked) { 102 this(transientStore, key, new HashMap<>()); 103 params.put(Batch.CHUNKED_PARAM_NAME, String.valueOf(chunked)); 104 } 105 106 public String getKey() { 107 return key; 108 } 109 110 public Map<String, Serializable> getParams() { 111 return params; 112 } 113 114 public boolean isChunked() { 115 return Boolean.parseBoolean((String) params.get(Batch.CHUNKED_PARAM_NAME)); 116 } 117 118 public String getFileName() { 119 if (isChunked()) { 120 return (String) params.get("fileName"); 121 } else { 122 Blob blob = getBlob(); 123 if (blob == null) { 124 return null; 125 } else { 126 return blob.getFilename(); 127 } 128 } 129 } 130 131 public String getMimeType() { 132 if (isChunked()) { 133 return (String) params.get("mimeType"); 134 } else { 135 Blob blob = getBlob(); 136 if (blob == null) { 137 return null; 138 } else { 139 return blob.getMimeType(); 140 } 141 } 142 } 143 144 public long getFileSize() { 145 if (isChunked()) { 146 return Long.parseLong((String) params.get("fileSize")); 147 } else { 148 Blob blob = getBlob(); 149 if (blob == null) { 150 return -1; 151 } else { 152 return blob.getLength(); 153 } 154 } 155 } 156 157 public int getChunkCount() { 158 if (!isChunked()) { 159 throw new NuxeoException( 160 String.format("Cannot get chunk count of file entry %s as it is not chunked", key)); 161 } 162 return Integer.parseInt((String) params.get("chunkCount")); 163 } 164 165 public Map<Integer, String> getChunks() { 166 if (!isChunked()) { 167 throw new NuxeoException(String.format("Cannot get chunks of file entry %s as it is not chunked", key)); 168 } 169 Map<Integer, String> chunks = new HashMap<>(); 170 for (String param : params.keySet()) { 171 if (NumberUtils.isDigits(param)) { 172 chunks.put(Integer.valueOf(param), (String) params.get(param)); 173 } 174 } 175 return chunks; 176 } 177 178 public List<Integer> getOrderedChunkIndexes() { 179 if (!isChunked()) { 180 throw new NuxeoException( 181 String.format("Cannot get chunk indexes of file entry %s as it is not chunked", key)); 182 } 183 List<Integer> sortedChunkIndexes = new ArrayList<>(getChunks().keySet()); 184 Collections.sort(sortedChunkIndexes); 185 return sortedChunkIndexes; 186 } 187 188 public Collection<String> getChunkEntryKeys() { 189 if (!isChunked()) { 190 throw new NuxeoException( 191 String.format("Cannot get chunk entry keys of file entry %s as it is not chunked", key)); 192 } 193 return getChunks().values(); 194 } 195 196 public boolean isChunksCompleted() { 197 return getChunks().size() == getChunkCount(); 198 } 199 200 public Blob getBlob() { 201 if (isChunked()) { 202 // First check if blob chunks have already been read and concatenated 203 if (chunkedBlob != null) { 204 return chunkedBlob; 205 } 206 File tmpChunkedFile = null; 207 try { 208 Map<Integer, String> chunks = getChunks(); 209 int uploadedChunkCount = chunks.size(); 210 int chunkCount = getChunkCount(); 211 if (uploadedChunkCount != chunkCount) { 212 log.warn(String.format( 213 "Cannot get blob for file entry %s as there are only %d uploaded chunks out of %d.", key, 214 uploadedChunkCount, chunkCount)); 215 return null; 216 } 217 chunkedBlob = Blobs.createBlobWithExtension(null); 218 // Temporary file made from concatenated chunks 219 tmpChunkedFile = chunkedBlob.getFile(); 220 BatchManager bm = Framework.getService(BatchManager.class); 221 // Sort chunk indexes and concatenate them to build the entire blob 222 List<Integer> sortedChunkIndexes = getOrderedChunkIndexes(); 223 for (int index : sortedChunkIndexes) { 224 Blob chunk = getChunk(transientStore, chunks.get(index)); 225 if (chunk != null) { 226 transferTo(chunk, tmpChunkedFile); 227 } 228 } 229 // Store tmpChunkedFile as a parameter for later deletion 230 transientStore.putParameter(key, "tmpChunkedFilePath", tmpChunkedFile.getAbsolutePath()); 231 chunkedBlob.setMimeType(getMimeType()); 232 chunkedBlob.setFilename(getFileName()); 233 return chunkedBlob; 234 } catch (IOException ioe) { 235 if (tmpChunkedFile != null && tmpChunkedFile.exists()) { 236 tmpChunkedFile.delete(); 237 } 238 chunkedBlob = null; 239 throw new NuxeoException(ioe); 240 } 241 } else { 242 return blob; 243 } 244 } 245 246 protected Blob getChunk(TransientStore ts, String key) { 247 List<Blob> blobs = ts.getBlobs(key); 248 if (CollectionUtils.isEmpty(blobs)) { 249 return null; 250 } 251 return blobs.get(0); 252 } 253 254 /** 255 * Appends the given blob to the given file. 256 */ 257 protected void transferTo(Blob blob, File file) throws IOException { 258 try (OutputStream out = new FileOutputStream(file, true)) { 259 try (InputStream in = blob.getStream()) { 260 IOUtils.copy(in, out); 261 } 262 } 263 } 264 265 public String addChunk(int index, Blob blob) { 266 if (!isChunked()) { 267 throw new NuxeoException("Cannot add a chunk to a non chunked file entry."); 268 } 269 int chunkCount = getChunkCount(); 270 if (index < 0) { 271 throw new NuxeoException(String.format("Cannot add chunk with negative index %d.", index)); 272 } 273 if (index >= chunkCount) { 274 throw new NuxeoException(String.format( 275 "Cannot add chunk with index %d to file entry %s as chunk count is %d.", index, key, chunkCount)); 276 } 277 if (getChunks().containsKey(index)) { 278 throw new NuxeoException( 279 String.format("Cannot add chunk with index %d to file entry %s as it already exists.", index, key)); 280 } 281 282 String chunkEntryKey = key + "_" + index; 283 BatchManager bm = Framework.getService(BatchManager.class); 284 transientStore.putBlobs(chunkEntryKey, Collections.singletonList(blob)); 285 transientStore.putParameter(key, String.valueOf(index), chunkEntryKey); 286 287 return chunkEntryKey; 288 } 289 290 public void beforeRemove() { 291 BatchManager bm = Framework.getService(BatchManager.class); 292 String tmpChunkedFilePath = (String) transientStore.getParameter(key, "tmpChunkedFilePath"); 293 if (tmpChunkedFilePath != null) { 294 File tmpChunkedFile = new File(tmpChunkedFilePath); 295 if (tmpChunkedFile.exists()) { 296 log.debug(String.format("Deleting temporary chunked file %s", tmpChunkedFilePath)); 297 tmpChunkedFile.delete(); 298 } 299 } 300 } 301}