001/* 002 * (C) Copyright 2017 Nuxeo (http://nuxeo.com/) and others. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 * 016 * Contributors: 017 * Estelle Giuly <[email protected]> 018 */ 019package org.nuxeo.audit.storage.stream; 020 021import static java.nio.charset.StandardCharsets.UTF_8; 022import static org.nuxeo.ecm.platform.audit.listener.StreamAuditEventListener.STREAM_NAME; 023 024import java.util.ArrayList; 025import java.util.Collections; 026import java.util.List; 027import java.util.Map; 028 029import org.apache.commons.logging.Log; 030import org.apache.commons.logging.LogFactory; 031import org.nuxeo.audit.storage.impl.DirectoryAuditStorage; 032import org.nuxeo.ecm.platform.audit.service.NXAuditEventsService; 033import org.nuxeo.lib.stream.computation.AbstractComputation; 034import org.nuxeo.lib.stream.computation.ComputationContext; 035import org.nuxeo.lib.stream.computation.Record; 036import org.nuxeo.lib.stream.computation.Topology; 037import org.nuxeo.runtime.api.Framework; 038import org.nuxeo.runtime.stream.StreamProcessorTopology; 039 040/** 041 * Computation that consumes a stream of Json log entries and write them to the Directory Audit Storage. 042 * 043 * @since 9.10 044 */ 045public class StreamAuditStorageWriter implements StreamProcessorTopology { 046 private static final Log log = LogFactory.getLog(StreamAuditStorageWriter.class); 047 048 public static final String COMPUTATION_NAME = "AuditStorageLogWriter"; 049 050 public static final String BATCH_SIZE_OPT = "batchSize"; 051 052 public static final String BATCH_THRESHOLD_MS_OPT = "batchThresholdMs"; 053 054 public static final int DEFAULT_BATCH_SIZE = 10; 055 056 public static final int DEFAULT_BATCH_THRESHOLD_MS = 200; 057 058 @Override 059 public Topology getTopology(Map<String, String> options) { 060 int batchSize = getOptionAsInteger(options, BATCH_SIZE_OPT, DEFAULT_BATCH_SIZE); 061 int batchThresholdMs = getOptionAsInteger(options, BATCH_THRESHOLD_MS_OPT, DEFAULT_BATCH_THRESHOLD_MS); 062 return Topology.builder() 063 .addComputation(() -> new AuditStorageLogWriterComputation(COMPUTATION_NAME, batchSize, 064 batchThresholdMs), Collections.singletonList("i1:" + STREAM_NAME)) 065 .build(); 066 } 067 068 public class AuditStorageLogWriterComputation extends AbstractComputation { 069 protected final int batchSize; 070 071 protected final int batchThresholdMs; 072 073 protected final List<String> jsonEntries; 074 075 public AuditStorageLogWriterComputation(String name, int batchSize, int batchThresholdMs) { 076 super(name, 1, 0); 077 this.batchSize = batchSize; 078 this.batchThresholdMs = batchThresholdMs; 079 jsonEntries = new ArrayList<>(batchSize); 080 } 081 082 @Override 083 public void init(ComputationContext context) { 084 log.debug(String.format("Starting computation: %s reading on: %s, batch size: %d, threshold: %dms", 085 COMPUTATION_NAME, STREAM_NAME, batchSize, batchThresholdMs)); 086 context.setTimer("batch", System.currentTimeMillis() + batchThresholdMs); 087 } 088 089 @Override 090 public void processTimer(ComputationContext context, String key, long timestamp) { 091 writeJsonEntriesToAudit(context); 092 context.setTimer("batch", System.currentTimeMillis() + batchThresholdMs); 093 } 094 095 @Override 096 public void processRecord(ComputationContext context, String inputStreamName, Record record) { 097 jsonEntries.add(new String(record.data, UTF_8)); 098 if (jsonEntries.size() >= batchSize) { 099 writeJsonEntriesToAudit(context); 100 } 101 } 102 103 @Override 104 public void destroy() { 105 log.debug(String.format("Destroy computation: %s, pending entries: %d", COMPUTATION_NAME, 106 jsonEntries.size())); 107 } 108 109 /** 110 * Store JSON entries in the Directory Audit Storage 111 */ 112 protected void writeJsonEntriesToAudit(ComputationContext context) { 113 if (jsonEntries.isEmpty()) { 114 return; 115 } 116 if (log.isDebugEnabled()) { 117 log.debug(String.format("Writing %d log entries to the directory audit storage %s.", jsonEntries.size(), 118 DirectoryAuditStorage.NAME)); 119 } 120 NXAuditEventsService audit = (NXAuditEventsService) Framework.getRuntime() 121 .getComponent(NXAuditEventsService.NAME); 122 DirectoryAuditStorage storage = (DirectoryAuditStorage) audit.getAuditStorage(DirectoryAuditStorage.NAME); 123 storage.append(jsonEntries); 124 jsonEntries.clear(); 125 context.askForCheckpoint(); 126 } 127 } 128 129 protected int getOptionAsInteger(Map<String, String> options, String option, int defaultValue) { 130 String value = options.get(option); 131 return value == null ? defaultValue : Integer.parseInt(value); 132 } 133 134}