Class ChronicleLogAppender<M extends Externalizable>
- java.lang.Object
-
- org.nuxeo.lib.stream.log.chronicle.ChronicleLogAppender<M>
-
- All Implemented Interfaces:
AutoCloseable
,CloseableLogAppender<M>
,LogAppender<M>
public class ChronicleLogAppender<M extends Externalizable> extends Object implements CloseableLogAppender<M>
Chronicle Queue implementation of LogAppender.- Since:
- 9.3
-
-
Field Summary
Fields Modifier and Type Field Description protected File
basePath
static String
BLOCK_SIZE_KEY
protected int
blockSize
protected boolean
closed
protected Codec<M>
codec
static int
CQ_BLOCK_SIZE
protected static int
MAX_PARTITIONS
protected static String
METADATA_FILE
static String
MSG_KEY
protected Name
name
protected int
nbPartitions
protected static String
PARTITION_PREFIX
protected List<net.openhft.chronicle.queue.ChronicleQueue>
partitions
static String
PARTITIONS_KEY
protected static int
POLL_INTERVAL_MS
protected ChronicleRetentionDuration
retention
static String
RETENTION_KEY
protected ConcurrentLinkedQueue<ChronicleLogTailer<M>>
tailers
-
Constructor Summary
Constructors Modifier Constructor Description protected
ChronicleLogAppender(ChronicleLogConfig config, Name name, int size, Codec<M> codec)
Create a new Logprotected
ChronicleLogAppender(ChronicleLogConfig config, Name name, Codec<M> codec, boolean withRetention)
Open an existing Log
-
Method Summary
All Methods Static Methods Instance Methods Concrete Methods Modifier and Type Method Description protected LogTailer<M>
addTailer(ChronicleLogTailer<M> tailer)
LogOffset
append(int partition, M message)
Append a message into a partition, returnsLogOffset
position of the message.void
close()
boolean
closed()
Returnstrue
if the appender has been closed by the manager.long
countMessages(int partition, long lowerOffset, long upperOffset)
static <M extends Externalizable>
ChronicleLogAppender<M>create(ChronicleLogConfig config, Name name, int size, Codec<M> codec)
Create a new logLogTailer<M>
createTailer(LogPartition partition, Name group, Codec<M> codec)
static int
discoverPartitions(Path basePath)
long
endOffset(int partition)
protected static boolean
exists(File basePath)
long
firstOffset(int partition)
String
getBasePath()
Codec<M>
getCodec()
Returns the codec used to write record.protected Path
getMetadataPath()
ChronicleRetentionDuration
getRetention()
protected void
initPartitions(boolean create)
protected static boolean
isPartitionDirectory(Path path)
protected boolean
isProcessed(ChronicleLogOffsetTracker tracker, long offset)
Name
name()
Returns the Log's name.static <M extends Externalizable>
ChronicleLogAppender<M>open(ChronicleLogConfig config, Name name, Codec<M> codec)
Open an existing log.static <M extends Externalizable>
ChronicleLogAppender<M>openWithoutRetention(ChronicleLogConfig config, Name name, Codec<M> codec)
static int
partitions(Path basePath)
protected static Properties
readMetadata(Path file)
protected void
saveMetadata()
int
size()
Returns the number of partitions in the Log.String
toString()
boolean
waitFor(LogOffset offset, Name group, Duration timeout)
Wait for consumer to process a message up to the offset.-
Methods inherited from class java.lang.Object
clone, equals, finalize, getClass, hashCode, notify, notifyAll, wait, wait, wait
-
Methods inherited from interface org.nuxeo.lib.stream.log.LogAppender
append
-
-
-
-
Field Detail
-
PARTITION_PREFIX
protected static final String PARTITION_PREFIX
- See Also:
- Constant Field Values
-
METADATA_FILE
protected static final String METADATA_FILE
- See Also:
- Constant Field Values
-
POLL_INTERVAL_MS
protected static final int POLL_INTERVAL_MS
- See Also:
- Constant Field Values
-
MAX_PARTITIONS
protected static final int MAX_PARTITIONS
- See Also:
- Constant Field Values
-
MSG_KEY
public static final String MSG_KEY
- See Also:
- Constant Field Values
-
CQ_BLOCK_SIZE
public static final int CQ_BLOCK_SIZE
- See Also:
- Constant Field Values
-
RETENTION_KEY
public static final String RETENTION_KEY
- See Also:
- Constant Field Values
-
PARTITIONS_KEY
public static final String PARTITIONS_KEY
- See Also:
- Constant Field Values
-
BLOCK_SIZE_KEY
public static final String BLOCK_SIZE_KEY
- See Also:
- Constant Field Values
-
partitions
protected final List<net.openhft.chronicle.queue.ChronicleQueue> partitions
-
nbPartitions
protected final int nbPartitions
-
basePath
protected final File basePath
-
blockSize
protected final int blockSize
-
name
protected final Name name
-
tailers
protected final ConcurrentLinkedQueue<ChronicleLogTailer<M extends Externalizable>> tailers
-
retention
protected final ChronicleRetentionDuration retention
-
codec
protected final Codec<M extends Externalizable> codec
-
closed
protected volatile boolean closed
-
-
Constructor Detail
-
ChronicleLogAppender
protected ChronicleLogAppender(ChronicleLogConfig config, Name name, Codec<M> codec, boolean withRetention)
Open an existing Log
-
ChronicleLogAppender
protected ChronicleLogAppender(ChronicleLogConfig config, Name name, int size, Codec<M> codec)
Create a new Log
-
-
Method Detail
-
initPartitions
protected void initPartitions(boolean create)
-
saveMetadata
protected void saveMetadata()
-
getMetadataPath
protected Path getMetadataPath()
-
readMetadata
protected static Properties readMetadata(Path file)
-
exists
protected static boolean exists(File basePath)
-
create
public static <M extends Externalizable> ChronicleLogAppender<M> create(ChronicleLogConfig config, Name name, int size, Codec<M> codec)
Create a new log
-
open
public static <M extends Externalizable> ChronicleLogAppender<M> open(ChronicleLogConfig config, Name name, Codec<M> codec)
Open an existing log.
-
openWithoutRetention
public static <M extends Externalizable> ChronicleLogAppender<M> openWithoutRetention(ChronicleLogConfig config, Name name, Codec<M> codec)
-
getBasePath
public String getBasePath()
-
name
public Name name()
Description copied from interface:LogAppender
Returns the Log's name.- Specified by:
name
in interfaceLogAppender<M extends Externalizable>
-
size
public int size()
Description copied from interface:LogAppender
Returns the number of partitions in the Log.- Specified by:
size
in interfaceLogAppender<M extends Externalizable>
-
append
public LogOffset append(int partition, M message)
Description copied from interface:LogAppender
Append a message into a partition, returnsLogOffset
position of the message. This method is thread safe, a queue can be shared by multiple producers.- Specified by:
append
in interfaceLogAppender<M extends Externalizable>
- Parameters:
partition
- index lower thanLogAppender.size()
-
endOffset
public long endOffset(int partition)
-
firstOffset
public long firstOffset(int partition)
-
countMessages
public long countMessages(int partition, long lowerOffset, long upperOffset)
-
addTailer
protected LogTailer<M> addTailer(ChronicleLogTailer<M> tailer)
-
waitFor
public boolean waitFor(LogOffset offset, Name group, Duration timeout) throws InterruptedException
Description copied from interface:LogAppender
Wait for consumer to process a message up to the offset. The message is processed if a consumer of the group commits a greater or equals offset. Returntrue
if the message has been consumed,false
in case of timeout.- Specified by:
waitFor
in interfaceLogAppender<M extends Externalizable>
- Throws:
InterruptedException
-
closed
public boolean closed()
Description copied from interface:LogAppender
Returnstrue
if the appender has been closed by the manager.- Specified by:
closed
in interfaceLogAppender<M extends Externalizable>
-
getCodec
public Codec<M> getCodec()
Description copied from interface:LogAppender
Returns the codec used to write record. A null codec is the default legacy encoding.- Specified by:
getCodec
in interfaceLogAppender<M extends Externalizable>
-
isProcessed
protected boolean isProcessed(ChronicleLogOffsetTracker tracker, long offset)
-
close
public void close()
- Specified by:
close
in interfaceAutoCloseable
- Specified by:
close
in interfaceCloseableLogAppender<M extends Externalizable>
-
partitions
public static int partitions(Path basePath)
-
discoverPartitions
public static int discoverPartitions(Path basePath)
-
isPartitionDirectory
protected static boolean isPartitionDirectory(Path path)
-
getRetention
public ChronicleRetentionDuration getRetention()
-
-