<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
</dependency>
+ <dependency>
+ <groupId>io.netty</groupId>
+ <artifactId>netty-buffer</artifactId>
+ </dependency>
<dependency>
<groupId>org.eclipse.jdt</groupId>
<artifactId>org.eclipse.jdt.annotation</artifactId>
* A {@link JournalReader} traversing only committed entries.
*/
final class CommitsSegmentJournalReader<E> extends SegmentedJournalReader<E> {
- CommitsSegmentJournalReader(SegmentedJournal<E> journal, JournalSegment<E> segment) {
+ CommitsSegmentJournalReader(SegmentedJournal<E> journal, JournalSegment segment) {
super(journal, segment);
}
*
* @author <a href="http://github.com/kuujo">Jordan Halterman</a>
*/
-final class DiskJournalSegmentWriter<E> extends JournalSegmentWriter<E> {
+final class DiskJournalSegmentWriter extends JournalSegmentWriter {
private static final ByteBuffer ZERO_ENTRY_HEADER = ByteBuffer.wrap(new byte[HEADER_BYTES]);
- private final JournalSegmentReader<E> reader;
+ private final JournalSegmentReader reader;
private final ByteBuffer buffer;
- DiskJournalSegmentWriter(final FileChannel channel, final JournalSegment<E> segment, final int maxEntrySize,
- final JournalIndex index, final JournalSerdes namespace) {
- super(channel, segment, maxEntrySize, index, namespace);
+ DiskJournalSegmentWriter(final FileChannel channel, final JournalSegment segment, final int maxEntrySize,
+ final JournalIndex index) {
+ super(channel, segment, maxEntrySize, index);
buffer = DiskFileReader.allocateBuffer(maxSegmentSize, maxEntrySize);
- reader = new JournalSegmentReader<>(segment,
- new DiskFileReader(segment.file().file().toPath(), channel, buffer), maxEntrySize, namespace);
+ reader = new JournalSegmentReader(segment,
+ new DiskFileReader(segment.file().file().toPath(), channel, buffer), maxEntrySize);
reset(0);
}
- DiskJournalSegmentWriter(final JournalSegmentWriter<E> previous) {
+ DiskJournalSegmentWriter(final JournalSegmentWriter previous) {
super(previous);
buffer = DiskFileReader.allocateBuffer(maxSegmentSize, maxEntrySize);
- reader = new JournalSegmentReader<>(segment,
- new DiskFileReader(segment.file().file().toPath(), channel, buffer), maxEntrySize, namespace);
+ reader = new JournalSegmentReader(segment,
+ new DiskFileReader(segment.file().file().toPath(), channel, buffer), maxEntrySize);
}
@Override
}
@Override
- MappedJournalSegmentWriter<E> toMapped() {
- return new MappedJournalSegmentWriter<>(this);
+ MappedJournalSegmentWriter toMapped() {
+ return new MappedJournalSegmentWriter(this);
}
@Override
- DiskJournalSegmentWriter<E> toFileChannel() {
+ DiskJournalSegmentWriter toFileChannel() {
return this;
}
@Override
- JournalSegmentReader<E> reader() {
+ JournalSegmentReader reader() {
return reader;
}
*
* @author <a href="http://github.com/kuujo">Jordan Halterman</a>
*/
-final class JournalSegment<E> implements AutoCloseable {
+final class JournalSegment implements AutoCloseable {
private final JournalSegmentFile file;
private final JournalSegmentDescriptor descriptor;
private final StorageLevel storageLevel;
private final int maxEntrySize;
private final JournalIndex journalIndex;
- private final JournalSerdes namespace;
- private final Set<JournalSegmentReader<E>> readers = ConcurrentHashMap.newKeySet();
+ private final Set<JournalSegmentReader> readers = ConcurrentHashMap.newKeySet();
private final AtomicInteger references = new AtomicInteger();
private final FileChannel channel;
- private JournalSegmentWriter<E> writer;
+ private JournalSegmentWriter writer;
private boolean open = true;
JournalSegment(
JournalSegmentDescriptor descriptor,
StorageLevel storageLevel,
int maxEntrySize,
- double indexDensity,
- JournalSerdes namespace) {
+ double indexDensity) {
this.file = file;
this.descriptor = descriptor;
this.storageLevel = storageLevel;
this.maxEntrySize = maxEntrySize;
- this.namespace = namespace;
journalIndex = new SparseJournalIndex(indexDensity);
try {
channel = FileChannel.open(file.file().toPath(),
throw new StorageException(e);
}
writer = switch (storageLevel) {
- case DISK -> new DiskJournalSegmentWriter<>(channel, this, maxEntrySize, journalIndex, namespace);
- case MAPPED -> new MappedJournalSegmentWriter<>(channel, this, maxEntrySize, journalIndex, namespace)
- .toFileChannel();
+ case DISK -> new DiskJournalSegmentWriter(channel, this, maxEntrySize, journalIndex);
+ case MAPPED -> new MappedJournalSegmentWriter(channel, this, maxEntrySize, journalIndex).toFileChannel();
};
}
*
* @return The segment writer.
*/
- JournalSegmentWriter<E> acquireWriter() {
+ JournalSegmentWriter acquireWriter() {
checkOpen();
acquire();
*
* @return A new segment reader.
*/
- JournalSegmentReader<E> createReader() {
+ JournalSegmentReader createReader() {
checkOpen();
acquire();
final var path = file.file().toPath();
final var fileReader = buffer != null ? new MappedFileReader(path, buffer)
: new DiskFileReader(path, channel, descriptor.maxSegmentSize(), maxEntrySize);
- final var reader = new JournalSegmentReader<>(this, fileReader, maxEntrySize, namespace);
+ final var reader = new JournalSegmentReader(this, fileReader, maxEntrySize);
reader.setPosition(JournalSegmentDescriptor.BYTES);
readers.add(reader);
return reader;
*
* @param reader the closed segment reader
*/
- void closeReader(JournalSegmentReader<E> reader) {
+ void closeReader(JournalSegmentReader reader) {
if (readers.remove(reader)) {
release();
}
import static com.google.common.base.Verify.verify;
import static java.util.Objects.requireNonNull;
-import com.esotericsoftware.kryo.KryoException;
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.Unpooled;
import java.util.zip.CRC32;
import org.eclipse.jdt.annotation.Nullable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-final class JournalSegmentReader<E> {
+final class JournalSegmentReader {
private static final Logger LOG = LoggerFactory.getLogger(JournalSegmentReader.class);
- private final JournalSegment<E> segment;
- private final JournalSerdes namespace;
+ private final JournalSegment segment;
private final FileReader fileReader;
private final int maxSegmentSize;
private final int maxEntrySize;
private int position;
- JournalSegmentReader(final JournalSegment<E> segment, final FileReader fileReader,
- final int maxEntrySize, final JournalSerdes namespace) {
+ JournalSegmentReader(final JournalSegment segment, final FileReader fileReader, final int maxEntrySize) {
this.segment = requireNonNull(segment);
this.fileReader = requireNonNull(fileReader);
maxSegmentSize = segment.descriptor().maxSegmentSize();
this.maxEntrySize = maxEntrySize;
- this.namespace = requireNonNull(namespace);
}
/**
}
/**
- * Reads the next entry, assigning it specified index.
+ * Reads the next binary data block
*
* @param index entry index
- * @return The entry, or {@code null}
+ * @return The binary data, or {@code null}
*/
- @Nullable Indexed<E> readEntry(final long index) {
+ @Nullable ByteBuf readBytes(final long index) {
// Check if there is enough in the buffer remaining
final int remaining = maxSegmentSize - position - SegmentEntry.HEADER_BYTES;
if (remaining < 0) {
final int checksum = buffer.getInt(Integer.BYTES);
// Slice off the entry's bytes
- final var entryBytes = buffer.slice(SegmentEntry.HEADER_BYTES, length);
+ final var entryBuffer = buffer.slice(SegmentEntry.HEADER_BYTES, length);
// Compute the checksum for the entry bytes.
final var crc32 = new CRC32();
- crc32.update(entryBytes);
+ crc32.update(entryBuffer);
// If the stored checksum does not equal the computed checksum, do not proceed further
final var computed = (int) crc32.getValue();
return null;
}
- // Attempt to deserialize
- final E entry;
- try {
- entry = namespace.deserialize(entryBytes.rewind());
- } catch (KryoException e) {
- // TODO: promote this to a hard error, as it should never happen
- LOG.debug("Failed to deserialize entry", e);
- invalidateCache();
- return null;
- }
+ // update position
+ position += SegmentEntry.HEADER_BYTES + length;
- // We are all set. Update the position.
- position = position + SegmentEntry.HEADER_BYTES + length;
- return new Indexed<>(index, entry, length);
+ // return bytes
+ entryBuffer.rewind();
+ return Unpooled.buffer(length).writeBytes(entryBuffer);
}
/**
import static io.atomix.storage.journal.SegmentEntry.HEADER_BYTES;
import static java.util.Objects.requireNonNull;
-import com.esotericsoftware.kryo.KryoException;
import io.atomix.storage.journal.index.JournalIndex;
+import io.netty.buffer.ByteBuf;
import java.nio.ByteBuffer;
import java.nio.MappedByteBuffer;
import java.nio.channels.FileChannel;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-abstract sealed class JournalSegmentWriter<E> permits DiskJournalSegmentWriter, MappedJournalSegmentWriter {
+abstract sealed class JournalSegmentWriter permits DiskJournalSegmentWriter, MappedJournalSegmentWriter {
private static final Logger LOG = LoggerFactory.getLogger(JournalSegmentWriter.class);
final @NonNull FileChannel channel;
- final @NonNull JournalSegment<E> segment;
+ final @NonNull JournalSegment segment;
private final @NonNull JournalIndex index;
- final @NonNull JournalSerdes namespace;
final int maxSegmentSize;
final int maxEntrySize;
- private Indexed<E> lastEntry;
private int currentPosition;
+ private Long lastIndex;
+ private ByteBuf lastWritten;
- JournalSegmentWriter(final FileChannel channel, final JournalSegment<E> segment, final int maxEntrySize,
- final JournalIndex index, final JournalSerdes namespace) {
+ JournalSegmentWriter(final FileChannel channel, final JournalSegment segment, final int maxEntrySize,
+ final JournalIndex index) {
this.channel = requireNonNull(channel);
this.segment = requireNonNull(segment);
this.index = requireNonNull(index);
- this.namespace = requireNonNull(namespace);
maxSegmentSize = segment.descriptor().maxSegmentSize();
this.maxEntrySize = maxEntrySize;
}
- JournalSegmentWriter(final JournalSegmentWriter<E> previous) {
+ JournalSegmentWriter(final JournalSegmentWriter previous) {
channel = previous.channel;
segment = previous.segment;
index = previous.index;
- namespace = previous.namespace;
maxSegmentSize = previous.maxSegmentSize;
maxEntrySize = previous.maxEntrySize;
- lastEntry = previous.lastEntry;
+ lastWritten = previous.lastWritten;
+ lastIndex = previous.lastIndex;
currentPosition = previous.currentPosition;
}
* @return The last written index.
*/
final long getLastIndex() {
- return lastEntry != null ? lastEntry.index() : segment.firstIndex() - 1;
+ return lastIndex != null ? lastIndex : segment.firstIndex() - 1;
}
/**
- * Returns the last entry written.
+ * Returns the last data written.
*
- * @return The last entry written.
+ * @return The last data written.
*/
- final Indexed<E> getLastEntry() {
- return lastEntry;
+ final ByteBuf getLastWritten() {
+ return lastWritten == null ? null : lastWritten.slice();
}
/**
* @return The next index to be written.
*/
final long getNextIndex() {
- return lastEntry != null ? lastEntry.index() + 1 : segment.firstIndex();
+ return lastIndex != null ? lastIndex + 1 : segment.firstIndex();
}
/**
- * Tries to append an entry to the journal.
+ * Tries to append a binary data to the journal.
*
- * @param entry The entry to append.
- * @return The appended indexed entry, or {@code null} if there is not enough space available
+ * @param buf binary data to append
+ * @return The index of appended data, or {@code null} if segment has no space
*/
- final <T extends E> @Nullable Indexed<T> append(final T entry) {
+ final Long append(final ByteBuf buf) {
+ final var length = buf.readableBytes();
+ if (length > maxEntrySize) {
+ throw new StorageException.TooLarge("Serialized entry size exceeds maximum allowed bytes ("
+ + maxEntrySize + ")");
+ }
+
// Store the entry index.
final long index = getNextIndex();
final int position = currentPosition;
- // Serialize the entry.
- final int bodyPosition = position + HEADER_BYTES;
- final int avail = maxSegmentSize - bodyPosition;
- if (avail < 0) {
+ // check space available
+ final int nextPosition = position + HEADER_BYTES + length;
+ if (nextPosition >= maxSegmentSize) {
LOG.trace("Not enough space for {} at {}", index, position);
return null;
}
- final var writeLimit = Math.min(avail, maxEntrySize);
- final var diskEntry = startWrite(position, writeLimit + HEADER_BYTES).position(HEADER_BYTES);
- try {
- namespace.serialize(entry, diskEntry);
- } catch (KryoException e) {
- if (writeLimit != maxEntrySize) {
- // We have not provided enough capacity, signal to roll to next segment
- LOG.trace("Tail serialization with {} bytes available failed", writeLimit, e);
- return null;
- }
-
- // Just reset the buffer. There's no need to zero the bytes since we haven't written the length or checksum.
- throw new StorageException.TooLarge("Entry size exceeds maximum allowed bytes (" + maxEntrySize + ")", e);
- }
-
- final int length = diskEntry.position() - HEADER_BYTES;
+ // allocate buffer and write data
+ final var writeBuffer = startWrite(position, length + HEADER_BYTES).position(HEADER_BYTES);
+ writeBuffer.put(buf.nioBuffer());
// Compute the checksum for the entry.
final var crc32 = new CRC32();
- crc32.update(diskEntry.flip().position(HEADER_BYTES));
+ crc32.update(writeBuffer.flip().position(HEADER_BYTES));
// Create a single byte[] in memory for the entire entry and write it as a batch to the underlying buffer.
- diskEntry.putInt(0, length).putInt(Integer.BYTES, (int) crc32.getValue());
- commitWrite(position, diskEntry.rewind());
+ writeBuffer.putInt(0, length).putInt(Integer.BYTES, (int) crc32.getValue());
+ commitWrite(position, writeBuffer.rewind());
// Update the last entry with the correct index/term/length.
- final var indexedEntry = new Indexed<E>(index, entry, length);
- lastEntry = indexedEntry;
+ currentPosition = nextPosition;
+ lastWritten = buf;
+ lastIndex = index;
this.index.index(index, position);
- currentPosition = bodyPosition + length;
-
- @SuppressWarnings("unchecked")
- final var ugly = (Indexed<T>) indexedEntry;
- return ugly;
+ return index;
}
abstract ByteBuffer startWrite(int position, int size);
}
}
- abstract JournalSegmentReader<E> reader();
+ abstract JournalSegmentReader reader();
- private void resetWithBuffer(final JournalSegmentReader<E> reader, final long index) {
+ private void resetWithBuffer(final JournalSegmentReader reader, final long index) {
long nextIndex = segment.firstIndex();
// Clear the buffer indexes and acquire ownership of the buffer
reader.setPosition(JournalSegmentDescriptor.BYTES);
while (index == 0 || nextIndex <= index) {
- final var entry = reader.readEntry(nextIndex);
- if (entry == null) {
+ final var buf = reader.readBytes(nextIndex);
+ if (buf == null) {
break;
}
- lastEntry = entry;
+ lastWritten = buf;
+ lastIndex = nextIndex;
this.index.index(nextIndex, currentPosition);
nextIndex++;
// Update the current position for indexing.
- currentPosition = currentPosition + HEADER_BYTES + entry.size();
+ currentPosition += HEADER_BYTES + buf.readableBytes();
}
}
return;
}
- // Reset the last entry.
- lastEntry = null;
+ // Reset the last written
+ lastIndex = null;
+ lastWritten = null;
// Truncate the index.
this.index.truncate(index);
*/
abstract @Nullable MappedByteBuffer buffer();
- abstract @NonNull MappedJournalSegmentWriter<E> toMapped();
+ abstract @NonNull MappedJournalSegmentWriter toMapped();
- abstract @NonNull DiskJournalSegmentWriter<E> toFileChannel();
+ abstract @NonNull DiskJournalSegmentWriter toFileChannel();
}
/**
* Support for serialization of {@link Journal} entries.
+ *
+ * @deprecated due to dependency on outdated Kryo library, {@link JournalSerializer} to be used instead.
*/
+@Deprecated(forRemoval = true, since="9.0.3")
public interface JournalSerdes {
/**
* Serializes given object to byte array.
--- /dev/null
+/*
+ * Copyright (c) 2024 PANTHEON.tech s.r.o. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package io.atomix.storage.journal;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.ByteBufUtil;
+import io.netty.buffer.Unpooled;
+
+/**
+ * Support for serialization of {@link Journal} entries.
+ */
+public interface JournalSerializer<T> {
+
+ /**
+ * Serializes given object to byte array.
+ *
+ * @param obj Object to serialize
+ * @return serialized bytes as {@link ByteBuf}
+ */
+ ByteBuf serialize(T obj) ;
+
+ /**
+ * Deserializes given byte array to Object.
+ *
+ * @param buf serialized bytes as {@link ByteBuf}
+ * @return deserialized Object
+ */
+ T deserialize(final ByteBuf buf);
+
+ static <E> JournalSerializer<E> wrap(final JournalSerdes serdes) {
+ return new JournalSerializer<>() {
+ @Override
+ public ByteBuf serialize(final E obj) {
+ return Unpooled.wrappedBuffer(serdes.serialize(obj));
+ }
+
+ @Override
+ public E deserialize(final ByteBuf buf) {
+ return serdes.deserialize(ByteBufUtil.getBytes(buf));
+ }
+ };
+ }
+}
*
* @author <a href="http://github.com/kuujo">Jordan Halterman</a>
*/
-final class MappedJournalSegmentWriter<E> extends JournalSegmentWriter<E> {
+final class MappedJournalSegmentWriter extends JournalSegmentWriter {
private final @NonNull MappedByteBuffer mappedBuffer;
- private final JournalSegmentReader<E> reader;
+ private final JournalSegmentReader reader;
private final ByteBuffer buffer;
- MappedJournalSegmentWriter(final FileChannel channel, final JournalSegment<E> segment, final int maxEntrySize,
- final JournalIndex index, final JournalSerdes namespace) {
- super(channel, segment, maxEntrySize, index, namespace);
+ MappedJournalSegmentWriter(final FileChannel channel, final JournalSegment segment, final int maxEntrySize,
+ final JournalIndex index) {
+ super(channel, segment, maxEntrySize, index);
mappedBuffer = mapBuffer(channel, maxSegmentSize);
buffer = mappedBuffer.slice();
- reader = new JournalSegmentReader<>(segment, new MappedFileReader(segment.file().file().toPath(), mappedBuffer),
- maxEntrySize, namespace);
+ reader = new JournalSegmentReader(segment, new MappedFileReader(segment.file().file().toPath(), mappedBuffer),
+ maxEntrySize);
reset(0);
}
- MappedJournalSegmentWriter(final JournalSegmentWriter<E> previous) {
+ MappedJournalSegmentWriter(final JournalSegmentWriter previous) {
super(previous);
mappedBuffer = mapBuffer(channel, maxSegmentSize);
buffer = mappedBuffer.slice();
- reader = new JournalSegmentReader<>(segment, new MappedFileReader(segment.file().file().toPath(), mappedBuffer),
- maxEntrySize, namespace);
+ reader = new JournalSegmentReader(segment, new MappedFileReader(segment.file().file().toPath(), mappedBuffer),
+ maxEntrySize);
}
private static @NonNull MappedByteBuffer mapBuffer(final FileChannel channel, final int maxSegmentSize) {
}
@Override
- MappedJournalSegmentWriter<E> toMapped() {
+ MappedJournalSegmentWriter toMapped() {
return this;
}
@Override
- DiskJournalSegmentWriter<E> toFileChannel() {
+ DiskJournalSegmentWriter toFileChannel() {
close();
- return new DiskJournalSegmentWriter<>(this);
+ return new DiskJournalSegmentWriter(this);
}
@Override
- JournalSegmentReader<E> reader() {
+ JournalSegmentReader reader() {
return reader;
}
*/
package io.atomix.storage.journal;
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkState;
+import static java.util.Objects.requireNonNull;
+
import java.io.File;
import java.io.IOException;
import java.io.RandomAccessFile;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import static com.google.common.base.Preconditions.checkArgument;
-import static com.google.common.base.Preconditions.checkState;
-import static java.util.Objects.requireNonNull;
-
/**
* Segmented journal.
*/
private final String name;
private final StorageLevel storageLevel;
private final File directory;
- private final JournalSerdes namespace;
+ private final JournalSerializer<E> serializer;
private final int maxSegmentSize;
private final int maxEntrySize;
private final int maxEntriesPerSegment;
private final SegmentedJournalWriter<E> writer;
private volatile long commitIndex;
- private final ConcurrentNavigableMap<Long, JournalSegment<E>> segments = new ConcurrentSkipListMap<>();
- private final Collection<SegmentedJournalReader<E>> readers = ConcurrentHashMap.newKeySet();
- private JournalSegment<E> currentSegment;
+ private final ConcurrentNavigableMap<Long, JournalSegment> segments = new ConcurrentSkipListMap<>();
+ private final Collection<SegmentedJournalReader> readers = ConcurrentHashMap.newKeySet();
+ private JournalSegment currentSegment;
private volatile boolean open = true;
this.name = requireNonNull(name, "name cannot be null");
this.storageLevel = requireNonNull(storageLevel, "storageLevel cannot be null");
this.directory = requireNonNull(directory, "directory cannot be null");
- this.namespace = requireNonNull(namespace, "namespace cannot be null");
+ this.serializer = JournalSerializer.wrap(requireNonNull(namespace, "namespace cannot be null"));
this.maxSegmentSize = maxSegmentSize;
this.maxEntrySize = maxEntrySize;
this.maxEntriesPerSegment = maxEntriesPerSegment;
*
* @return the collection of journal segments
*/
- public Collection<JournalSegment<E>> segments() {
+ public Collection<JournalSegment> segments() {
return segments.values();
}
* @param index the starting index
* @return the journal segments starting with indexes greater than or equal to the given index
*/
- public Collection<JournalSegment<E>> segments(long index) {
+ public Collection<JournalSegment> segments(long index) {
return segments.tailMap(index).values();
}
+ /**
+ * Returns serializer instance.
+ *
+ * @return serializer instance
+ */
+ JournalSerializer<E> serializer() {
+ return serializer;
+ }
+
/**
* Returns the total size of the journal.
*
*/
private synchronized void open() {
// Load existing log segments from disk.
- for (JournalSegment<E> segment : loadSegments()) {
+ for (JournalSegment segment : loadSegments()) {
segments.put(segment.descriptor().index(), segment);
}
* Resets the current segment, creating a new segment if necessary.
*/
private synchronized void resetCurrentSegment() {
- JournalSegment<E> lastSegment = getLastSegment();
+ JournalSegment lastSegment = getLastSegment();
if (lastSegment != null) {
currentSegment = lastSegment;
} else {
* @param index the starting index of the journal
* @return the first segment
*/
- JournalSegment<E> resetSegments(long index) {
+ JournalSegment resetSegments(long index) {
assertOpen();
// If the index already equals the first segment index, skip the reset.
- JournalSegment<E> firstSegment = getFirstSegment();
+ JournalSegment firstSegment = getFirstSegment();
if (index == firstSegment.firstIndex()) {
return firstSegment;
}
- for (JournalSegment<E> segment : segments.values()) {
+ for (JournalSegment segment : segments.values()) {
segment.close();
segment.delete();
}
*
* @throws IllegalStateException if the segment manager is not open
*/
- JournalSegment<E> getFirstSegment() {
+ JournalSegment getFirstSegment() {
assertOpen();
- Map.Entry<Long, JournalSegment<E>> segment = segments.firstEntry();
+ Map.Entry<Long, JournalSegment> segment = segments.firstEntry();
return segment != null ? segment.getValue() : null;
}
*
* @throws IllegalStateException if the segment manager is not open
*/
- JournalSegment<E> getLastSegment() {
+ JournalSegment getLastSegment() {
assertOpen();
- Map.Entry<Long, JournalSegment<E>> segment = segments.lastEntry();
+ Map.Entry<Long, JournalSegment> segment = segments.lastEntry();
return segment != null ? segment.getValue() : null;
}
* @return The next segment.
* @throws IllegalStateException if the segment manager is not open
*/
- synchronized JournalSegment<E> getNextSegment() {
+ synchronized JournalSegment getNextSegment() {
assertOpen();
assertDiskSpace();
- JournalSegment<E> lastSegment = getLastSegment();
+ JournalSegment lastSegment = getLastSegment();
JournalSegmentDescriptor descriptor = JournalSegmentDescriptor.builder()
.withId(lastSegment != null ? lastSegment.descriptor().id() + 1 : 1)
.withIndex(currentSegment.lastIndex() + 1)
* @param index The segment index with which to look up the next segment.
* @return The next segment for the given index.
*/
- JournalSegment<E> getNextSegment(long index) {
- Map.Entry<Long, JournalSegment<E>> nextSegment = segments.higherEntry(index);
+ JournalSegment getNextSegment(long index) {
+ Map.Entry<Long, JournalSegment> nextSegment = segments.higherEntry(index);
return nextSegment != null ? nextSegment.getValue() : null;
}
* @param index The index for which to return the segment.
* @throws IllegalStateException if the segment manager is not open
*/
- synchronized JournalSegment<E> getSegment(long index) {
+ synchronized JournalSegment getSegment(long index) {
assertOpen();
// Check if the current segment contains the given index first in order to prevent an unnecessary map lookup.
if (currentSegment != null && index > currentSegment.firstIndex()) {
}
// If the index is in another segment, get the entry with the next lowest first index.
- Map.Entry<Long, JournalSegment<E>> segment = segments.floorEntry(index);
+ Map.Entry<Long, JournalSegment> segment = segments.floorEntry(index);
if (segment != null) {
return segment.getValue();
}
*
* @param segment The segment to remove.
*/
- synchronized void removeSegment(JournalSegment<E> segment) {
+ synchronized void removeSegment(JournalSegment segment) {
segments.remove(segment.firstIndex());
segment.close();
segment.delete();
/**
* Creates a new segment.
*/
- JournalSegment<E> createSegment(JournalSegmentDescriptor descriptor) {
+ JournalSegment createSegment(JournalSegmentDescriptor descriptor) {
File segmentFile = JournalSegmentFile.createSegmentFile(name, directory, descriptor.id());
RandomAccessFile raf;
} catch (IOException e) {
}
}
- JournalSegment<E> segment = newSegment(new JournalSegmentFile(segmentFile), descriptor);
+ JournalSegment segment = newSegment(new JournalSegmentFile(segmentFile), descriptor);
LOG.debug("Created segment: {}", segment);
return segment;
}
* @param descriptor The segment descriptor.
* @return The segment instance.
*/
- protected JournalSegment<E> newSegment(JournalSegmentFile segmentFile, JournalSegmentDescriptor descriptor) {
- return new JournalSegment<>(segmentFile, descriptor, storageLevel, maxEntrySize, indexDensity, namespace);
+ protected JournalSegment newSegment(JournalSegmentFile segmentFile, JournalSegmentDescriptor descriptor) {
+ return new JournalSegment(segmentFile, descriptor, storageLevel, maxEntrySize, indexDensity);
}
/**
* Loads a segment.
*/
- private JournalSegment<E> loadSegment(long segmentId) {
+ private JournalSegment loadSegment(long segmentId) {
File segmentFile = JournalSegmentFile.createSegmentFile(name, directory, segmentId);
ByteBuffer buffer = ByteBuffer.allocate(JournalSegmentDescriptor.BYTES);
try (FileChannel channel = openChannel(segmentFile)) {
channel.read(buffer);
buffer.flip();
JournalSegmentDescriptor descriptor = new JournalSegmentDescriptor(buffer);
- JournalSegment<E> segment = newSegment(new JournalSegmentFile(segmentFile), descriptor);
+ JournalSegment segment = newSegment(new JournalSegmentFile(segmentFile), descriptor);
LOG.debug("Loaded disk segment: {} ({})", descriptor.id(), segmentFile.getName());
return segment;
} catch (IOException e) {
*
* @return A collection of segments for the log.
*/
- protected Collection<JournalSegment<E>> loadSegments() {
+ protected Collection<JournalSegment> loadSegments() {
// Ensure log directories are created.
directory.mkdirs();
- TreeMap<Long, JournalSegment<E>> segments = new TreeMap<>();
+ TreeMap<Long, JournalSegment> segments = new TreeMap<>();
// Iterate through all files in the log directory.
for (File file : directory.listFiles(File::isFile)) {
JournalSegmentDescriptor descriptor = new JournalSegmentDescriptor(buffer);
// Load the segment.
- JournalSegment<E> segment = loadSegment(descriptor.id());
+ JournalSegment segment = loadSegment(descriptor.id());
// Add the segment to the segments list.
LOG.debug("Found segment: {} ({})", segment.descriptor().id(), segmentFile.file().getName());
}
// Verify that all the segments in the log align with one another.
- JournalSegment<E> previousSegment = null;
+ JournalSegment previousSegment = null;
boolean corrupted = false;
- Iterator<Map.Entry<Long, JournalSegment<E>>> iterator = segments.entrySet().iterator();
+ Iterator<Map.Entry<Long, JournalSegment>> iterator = segments.entrySet().iterator();
while (iterator.hasNext()) {
- JournalSegment<E> segment = iterator.next().getValue();
+ JournalSegment segment = iterator.next().getValue();
if (previousSegment != null && previousSegment.lastIndex() != segment.firstIndex() - 1) {
LOG.warn("Journal is inconsistent. {} is not aligned with prior segment {}", segment.file().file(), previousSegment.file().file());
corrupted = true;
* @return indicates whether a segment can be removed from the journal
*/
public boolean isCompactable(long index) {
- Map.Entry<Long, JournalSegment<E>> segmentEntry = segments.floorEntry(index);
+ Map.Entry<Long, JournalSegment> segmentEntry = segments.floorEntry(index);
return segmentEntry != null && segments.headMap(segmentEntry.getValue().firstIndex()).size() > 0;
}
* @return the starting index of the last segment in the log
*/
public long getCompactableIndex(long index) {
- Map.Entry<Long, JournalSegment<E>> segmentEntry = segments.floorEntry(index);
+ Map.Entry<Long, JournalSegment> segmentEntry = segments.floorEntry(index);
return segmentEntry != null ? segmentEntry.getValue().firstIndex() : 0;
}
final var compactSegments = segments.headMap(segmentEntry.getValue().firstIndex());
if (!compactSegments.isEmpty()) {
LOG.debug("{} - Compacting {} segment(s)", name, compactSegments.size());
- for (JournalSegment<E> segment : compactSegments.values()) {
+ for (JournalSegment segment : compactSegments.values()) {
LOG.trace("Deleting segment: {}", segment);
segment.close();
segment.delete();
sealed class SegmentedJournalReader<E> implements JournalReader<E> permits CommitsSegmentJournalReader {
final SegmentedJournal<E> journal;
- private JournalSegment<E> currentSegment;
- private JournalSegmentReader<E> currentReader;
+ private JournalSegment currentSegment;
+ private JournalSegmentReader currentReader;
private Indexed<E> currentEntry;
private long nextIndex;
- SegmentedJournalReader(final SegmentedJournal<E> journal, final JournalSegment<E> segment) {
+ SegmentedJournalReader(final SegmentedJournal<E> journal, final JournalSegment segment) {
this.journal = requireNonNull(journal);
currentSegment = requireNonNull(segment);
currentReader = segment.createReader();
*/
private void rewind(final long index) {
if (currentSegment.firstIndex() >= index) {
- JournalSegment<E> segment = journal.getSegment(index - 1);
+ JournalSegment segment = journal.getSegment(index - 1);
if (segment != null) {
currentReader.close();
@Override
public Indexed<E> tryNext() {
- var next = currentReader.readEntry(nextIndex);
- if (next == null) {
+ var buf = currentReader.readBytes(nextIndex);
+ if (buf == null) {
final var nextSegment = journal.getNextSegment(currentSegment.firstIndex());
if (nextSegment == null || nextSegment.firstIndex() != nextIndex) {
return null;
currentSegment = nextSegment;
currentReader = currentSegment.createReader();
- next = currentReader.readEntry(nextIndex);
- if (next == null) {
+ buf = currentReader.readBytes(nextIndex);
+ if (buf == null) {
return null;
}
}
- nextIndex = nextIndex + 1;
- currentEntry = next;
- return next;
+ final var entry = journal.serializer().deserialize(buf);
+ currentEntry = new Indexed<>(nextIndex++, entry, buf.readableBytes());
+ return currentEntry;
}
@Override
*/
final class SegmentedJournalWriter<E> implements JournalWriter<E> {
private final SegmentedJournal<E> journal;
- private JournalSegment<E> currentSegment;
- private JournalSegmentWriter<E> currentWriter;
+ private JournalSegment currentSegment;
+ private JournalSegmentWriter currentWriter;
SegmentedJournalWriter(SegmentedJournal<E> journal) {
this.journal = journal;
@Override
public Indexed<E> getLastEntry() {
- return currentWriter.getLastEntry();
+ final var lastWritten = currentWriter.getLastWritten();
+ if (lastWritten == null) {
+ return null;
+ }
+ final E deserialized = journal.serializer().deserialize(lastWritten);
+ return new Indexed<>(currentWriter.getLastIndex(), deserialized, lastWritten.readableBytes()) ;
}
@Override
@Override
public <T extends E> Indexed<T> append(T entry) {
- var indexed = currentWriter.append(entry);
- if (indexed != null) {
- return indexed;
+ final var bytes = journal.serializer().serialize(entry);
+ var index = currentWriter.append(bytes);
+ if (index != null) {
+ return new Indexed<>(index, entry, bytes.readableBytes());
}
// Slow path: we do not have enough capacity
currentSegment.releaseWriter();
currentSegment = journal.getNextSegment();
currentWriter = currentSegment.acquireWriter();
- return verifyNotNull(currentWriter.append(entry));
+ final var newIndex = verifyNotNull(currentWriter.append(bytes));
+ return new Indexed<>(newIndex, entry, bytes.readableBytes());
}
@Override
<type>xml</type>
<classifier>features</classifier>
</dependency>
+ <dependency>
+ <groupId>org.opendaylight.odlparent</groupId>
+ <artifactId>odl-netty-4</artifactId>
+ <type>xml</type>
+ <classifier>features</classifier>
+ </dependency>
<dependency>
<groupId>org.opendaylight.odlparent</groupId>
<artifactId>odl-servlet-api</artifactId>
<feature name="odl-mdsal-clustering-commons" version="${project.version}">
<feature version="[13,14)">odl-apache-commons-lang3</feature>
<feature version="[13,14)">odl-dropwizard-metrics</feature>
+ <feature version="[13,14)">odl-netty-4</feature>
<feature version="[13,14)">odl-servlet-api</feature>
<feature version="[13,14)">odl-yangtools-data</feature>
<feature version="[13,14)">odl-yangtools-codec</feature>