Class ChronicleLogAppender<M extends Externalizable>
- java.lang.Object
-
- org.nuxeo.lib.stream.log.chronicle.ChronicleLogAppender<M>
-
- All Implemented Interfaces:
AutoCloseable,CloseableLogAppender<M>,LogAppender<M>
public class ChronicleLogAppender<M extends Externalizable> extends Object implements CloseableLogAppender<M>
Chronicle Queue implementation of LogAppender.- Since:
- 9.3
-
-
Field Summary
Fields Modifier and Type Field Description protected FilebasePathstatic StringBLOCK_SIZE_KEYprotected intblockSizeprotected booleanclosedprotected Codec<M>codecstatic intCQ_BLOCK_SIZEprotected static intMAX_PARTITIONSprotected static StringMETADATA_FILEstatic StringMSG_KEYprotected Namenameprotected intnbPartitionsprotected static StringPARTITION_PREFIXprotected List<net.openhft.chronicle.queue.ChronicleQueue>partitionsstatic StringPARTITIONS_KEYprotected static intPOLL_INTERVAL_MSprotected ChronicleRetentionDurationretentionstatic StringRETENTION_KEYprotected ConcurrentLinkedQueue<ChronicleLogTailer<M>>tailers
-
Constructor Summary
Constructors Modifier Constructor Description protectedChronicleLogAppender(ChronicleLogConfig config, Name name, int size, Codec<M> codec)Create a new LogprotectedChronicleLogAppender(ChronicleLogConfig config, Name name, Codec<M> codec, boolean withRetention)Open an existing Log
-
Method Summary
All Methods Static Methods Instance Methods Concrete Methods Modifier and Type Method Description protected LogTailer<M>addTailer(ChronicleLogTailer<M> tailer)LogOffsetappend(int partition, M message)Append a message into a partition, returnsLogOffsetposition of the message.voidclose()booleanclosed()Returnstrueif the appender has been closed by the manager.longcountMessages(int partition, long lowerOffset, long upperOffset)static <M extends Externalizable>
ChronicleLogAppender<M>create(ChronicleLogConfig config, Name name, int size, Codec<M> codec)Create a new logLogTailer<M>createTailer(LogPartition partition, Name group, Codec<M> codec)static intdiscoverPartitions(Path basePath)longendOffset(int partition)protected static booleanexists(File basePath)longfirstOffset(int partition)StringgetBasePath()Codec<M>getCodec()Returns the codec used to write record.protected PathgetMetadataPath()ChronicleRetentionDurationgetRetention()protected voidinitPartitions(boolean create)protected static booleanisPartitionDirectory(Path path)protected booleanisProcessed(ChronicleLogOffsetTracker tracker, long offset)Namename()Returns the Log's name.static <M extends Externalizable>
ChronicleLogAppender<M>open(ChronicleLogConfig config, Name name, Codec<M> codec)Open an existing log.static <M extends Externalizable>
ChronicleLogAppender<M>openWithoutRetention(ChronicleLogConfig config, Name name, Codec<M> codec)static intpartitions(Path basePath)protected static PropertiesreadMetadata(Path file)protected voidsaveMetadata()intsize()Returns the number of partitions in the Log.StringtoString()booleanwaitFor(LogOffset offset, Name group, Duration timeout)Wait for consumer to process a message up to the offset.-
Methods inherited from class java.lang.Object
clone, equals, finalize, getClass, hashCode, notify, notifyAll, wait, wait, wait
-
Methods inherited from interface org.nuxeo.lib.stream.log.LogAppender
append
-
-
-
-
Field Detail
-
PARTITION_PREFIX
protected static final String PARTITION_PREFIX
- See Also:
- Constant Field Values
-
METADATA_FILE
protected static final String METADATA_FILE
- See Also:
- Constant Field Values
-
POLL_INTERVAL_MS
protected static final int POLL_INTERVAL_MS
- See Also:
- Constant Field Values
-
MAX_PARTITIONS
protected static final int MAX_PARTITIONS
- See Also:
- Constant Field Values
-
MSG_KEY
public static final String MSG_KEY
- See Also:
- Constant Field Values
-
CQ_BLOCK_SIZE
public static final int CQ_BLOCK_SIZE
- See Also:
- Constant Field Values
-
RETENTION_KEY
public static final String RETENTION_KEY
- See Also:
- Constant Field Values
-
PARTITIONS_KEY
public static final String PARTITIONS_KEY
- See Also:
- Constant Field Values
-
BLOCK_SIZE_KEY
public static final String BLOCK_SIZE_KEY
- See Also:
- Constant Field Values
-
partitions
protected final List<net.openhft.chronicle.queue.ChronicleQueue> partitions
-
nbPartitions
protected final int nbPartitions
-
basePath
protected final File basePath
-
blockSize
protected final int blockSize
-
name
protected final Name name
-
tailers
protected final ConcurrentLinkedQueue<ChronicleLogTailer<M extends Externalizable>> tailers
-
retention
protected final ChronicleRetentionDuration retention
-
codec
protected final Codec<M extends Externalizable> codec
-
closed
protected volatile boolean closed
-
-
Constructor Detail
-
ChronicleLogAppender
protected ChronicleLogAppender(ChronicleLogConfig config, Name name, Codec<M> codec, boolean withRetention)
Open an existing Log
-
ChronicleLogAppender
protected ChronicleLogAppender(ChronicleLogConfig config, Name name, int size, Codec<M> codec)
Create a new Log
-
-
Method Detail
-
initPartitions
protected void initPartitions(boolean create)
-
saveMetadata
protected void saveMetadata()
-
getMetadataPath
protected Path getMetadataPath()
-
readMetadata
protected static Properties readMetadata(Path file)
-
exists
protected static boolean exists(File basePath)
-
create
public static <M extends Externalizable> ChronicleLogAppender<M> create(ChronicleLogConfig config, Name name, int size, Codec<M> codec)
Create a new log
-
open
public static <M extends Externalizable> ChronicleLogAppender<M> open(ChronicleLogConfig config, Name name, Codec<M> codec)
Open an existing log.
-
openWithoutRetention
public static <M extends Externalizable> ChronicleLogAppender<M> openWithoutRetention(ChronicleLogConfig config, Name name, Codec<M> codec)
-
getBasePath
public String getBasePath()
-
name
public Name name()
Description copied from interface:LogAppenderReturns the Log's name.- Specified by:
namein interfaceLogAppender<M extends Externalizable>
-
size
public int size()
Description copied from interface:LogAppenderReturns the number of partitions in the Log.- Specified by:
sizein interfaceLogAppender<M extends Externalizable>
-
append
public LogOffset append(int partition, M message)
Description copied from interface:LogAppenderAppend a message into a partition, returnsLogOffsetposition of the message. This method is thread safe, a queue can be shared by multiple producers.- Specified by:
appendin interfaceLogAppender<M extends Externalizable>- Parameters:
partition- index lower thanLogAppender.size()
-
endOffset
public long endOffset(int partition)
-
firstOffset
public long firstOffset(int partition)
-
countMessages
public long countMessages(int partition, long lowerOffset, long upperOffset)
-
addTailer
protected LogTailer<M> addTailer(ChronicleLogTailer<M> tailer)
-
waitFor
public boolean waitFor(LogOffset offset, Name group, Duration timeout) throws InterruptedException
Description copied from interface:LogAppenderWait for consumer to process a message up to the offset. The message is processed if a consumer of the group commits a greater or equals offset. Returntrueif the message has been consumed,falsein case of timeout.- Specified by:
waitForin interfaceLogAppender<M extends Externalizable>- Throws:
InterruptedException
-
closed
public boolean closed()
Description copied from interface:LogAppenderReturnstrueif the appender has been closed by the manager.- Specified by:
closedin interfaceLogAppender<M extends Externalizable>
-
getCodec
public Codec<M> getCodec()
Description copied from interface:LogAppenderReturns the codec used to write record. A null codec is the default legacy encoding.- Specified by:
getCodecin interfaceLogAppender<M extends Externalizable>
-
isProcessed
protected boolean isProcessed(ChronicleLogOffsetTracker tracker, long offset)
-
close
public void close()
- Specified by:
closein interfaceAutoCloseable- Specified by:
closein interfaceCloseableLogAppender<M extends Externalizable>
-
partitions
public static int partitions(Path basePath)
-
discoverPartitions
public static int discoverPartitions(Path basePath)
-
isPartitionDirectory
protected static boolean isPartitionDirectory(Path path)
-
getRetention
public ChronicleRetentionDuration getRetention()
-
-