We have SegmentEntry.HEADER_BYTES instead of ENTRY_HEADER_BYTES.
JIRA: CONTROLLER-2109
Change-Id: I4bc66155dccd3e177a61bad28be50e0bc62827ab
Signed-off-by: Robert Varga <robert.varga@pantheon.tech>
JournalSerdes namespace) {
super(segment, maxEntrySize, index, namespace);
this.channel = channel;
- this.memory = ByteBuffer.allocate((maxEntrySize + JournalSegmentWriter.ENTRY_HEADER_BYTES) * 2);
+ this.memory = ByteBuffer.allocate((maxEntrySize + SegmentEntry.HEADER_BYTES) * 2);
reset();
}
*/
package io.atomix.storage.journal;
+import static io.atomix.storage.journal.SegmentEntry.HEADER_BYTES;
+
import com.esotericsoftware.kryo.KryoException;
import com.google.common.annotations.VisibleForTesting;
import io.atomix.storage.journal.index.JournalIndex;
*/
final class DiskJournalSegmentWriter<E> extends JournalSegmentWriter<E> {
private static final Logger LOG = LoggerFactory.getLogger(DiskJournalSegmentWriter.class);
- private static final ByteBuffer ZERO_ENTRY_HEADER = ByteBuffer.wrap(new byte[ENTRY_HEADER_BYTES]);
+ private static final ByteBuffer ZERO_ENTRY_HEADER = ByteBuffer.wrap(new byte[HEADER_BYTES]);
private final ByteBuffer memory;
private Indexed<E> lastEntry;
}
private static ByteBuffer allocMemory(int maxEntrySize) {
- final var buf = ByteBuffer.allocate((maxEntrySize + ENTRY_HEADER_BYTES) * 2);
+ final var buf = ByteBuffer.allocate((maxEntrySize + HEADER_BYTES) * 2);
buf.limit(0);
return buf;
}
nextIndex++;
// Update the current position for indexing.
- currentPosition = currentPosition + ENTRY_HEADER_BYTES + length;
+ currentPosition = currentPosition + HEADER_BYTES + length;
memory.position(memory.position() + length);
}
} catch (IOException e) {
final int maxEntrySize) throws IOException {
int remaining = memory.remaining();
boolean compacted;
- if (remaining < ENTRY_HEADER_BYTES) {
+ if (remaining < HEADER_BYTES) {
// We do not have the header available. Move the pointer and read.
channel.read(memory.compact());
remaining = memory.flip().remaining();
- if (remaining < ENTRY_HEADER_BYTES) {
+ if (remaining < HEADER_BYTES) {
// could happen with mis-padded segment
return null;
}
// Serialize the entry.
try {
- namespace.serialize(entry, memory.clear().position(ENTRY_HEADER_BYTES));
+ namespace.serialize(entry, memory.clear().position(HEADER_BYTES));
} catch (KryoException e) {
throw new StorageException.TooLarge("Entry size exceeds maximum allowed bytes (" + maxEntrySize + ")");
}
memory.flip();
- final int length = memory.limit() - ENTRY_HEADER_BYTES;
+ final int length = memory.limit() - HEADER_BYTES;
// Ensure there's enough space left in the buffer to store the entry.
- if (maxSegmentSize - currentPosition < length + ENTRY_HEADER_BYTES) {
+ if (maxSegmentSize - currentPosition < length + HEADER_BYTES) {
throw new BufferOverflowException();
}
// Compute the checksum for the entry.
final CRC32 crc32 = new CRC32();
- crc32.update(memory.array(), ENTRY_HEADER_BYTES, memory.limit() - ENTRY_HEADER_BYTES);
+ crc32.update(memory.array(), HEADER_BYTES, memory.limit() - HEADER_BYTES);
final long checksum = crc32.getValue();
// Create a single byte[] in memory for the entire entry and write it as a batch to the underlying buffer.
this.lastEntry = indexedEntry;
this.index.index(index, (int) currentPosition);
- currentPosition = currentPosition + ENTRY_HEADER_BYTES + length;
+ currentPosition = currentPosition + HEADER_BYTES + length;
return (Indexed<T>) indexedEntry;
}
import org.eclipse.jdt.annotation.Nullable;
abstract sealed class JournalSegmentWriter<E> permits DiskJournalSegmentWriter, MappedJournalSegmentWriter {
- /**
- * The size of the header, comprising of:
- * <ul>
- * <li>32-bit signed entry length</li>
- * <li>32-bit unsigned CRC32 checksum</li>
- * </li>
- */
- static final int ENTRY_HEADER_BYTES = Integer.BYTES + Integer.BYTES;
-
final @NonNull FileChannel channel;
final @NonNull JournalIndex index;
final @NonNull JournalSerdes namespace;
this.channel = requireNonNull(channel);
this.index = requireNonNull(index);
this.namespace = requireNonNull(namespace);
- this.maxSegmentSize = segment.descriptor().maxSegmentSize();
+ maxSegmentSize = segment.descriptor().maxSegmentSize();
this.maxEntrySize = maxEntrySize;
- this.firstIndex = segment.index();
+ firstIndex = segment.index();
}
- JournalSegmentWriter(JournalSegmentWriter<E> previous) {
- this.channel = previous.channel;
- this.index = previous.index;
- this.namespace = previous.namespace;
- this.maxSegmentSize = previous.maxSegmentSize;
- this.maxEntrySize = previous.maxEntrySize;
- this.firstIndex = previous.firstIndex;
+ JournalSegmentWriter(final JournalSegmentWriter<E> previous) {
+ channel = previous.channel;
+ index = previous.index;
+ namespace = previous.namespace;
+ maxSegmentSize = previous.maxSegmentSize;
+ maxEntrySize = previous.maxEntrySize;
+ firstIndex = previous.firstIndex;
}
/**
*/
package io.atomix.storage.journal;
+import static io.atomix.storage.journal.SegmentEntry.HEADER_BYTES;
+
import com.esotericsoftware.kryo.KryoException;
import io.atomix.storage.journal.index.JournalIndex;
-
import java.io.IOException;
import java.nio.BufferOverflowException;
import java.nio.BufferUnderflowException;
// Serialize the entry.
int position = buffer.position();
- if (position + ENTRY_HEADER_BYTES > buffer.limit()) {
+ if (position + HEADER_BYTES > buffer.limit()) {
throw new BufferOverflowException();
}
- buffer.position(position + ENTRY_HEADER_BYTES);
+ buffer.position(position + HEADER_BYTES);
try {
namespace.serialize(entry, buffer);
throw new BufferOverflowException();
}
- final int length = buffer.position() - (position + ENTRY_HEADER_BYTES);
+ final int length = buffer.position() - (position + HEADER_BYTES);
// If the entry length exceeds the maximum entry size then throw an exception.
if (length > maxEntrySize) {
// Compute the checksum for the entry.
final CRC32 crc32 = new CRC32();
- buffer.position(position + ENTRY_HEADER_BYTES);
+ buffer.position(position + HEADER_BYTES);
ByteBuffer slice = buffer.slice();
slice.limit(length);
crc32.update(slice);
final long checksum = crc32.getValue();
// Create a single byte[] in memory for the entire entry and write it as a batch to the underlying buffer.
- buffer.position(position).putInt(length).putInt((int) checksum).position(position + ENTRY_HEADER_BYTES + length);
+ buffer.position(position).putInt(length).putInt((int) checksum).position(position + HEADER_BYTES + length);
// Update the last entry with the correct index/term/length.
Indexed<E> indexedEntry = new Indexed<>(index, entry, length);
/**
* An {@link Indexed} entry read from {@link JournalSegment}.
+ *
+ * @param checksum The CRC32 checksum of data
+ * @param bytes Entry bytes
*/
record SegmentEntry(int checksum, ByteBuffer bytes) {
/**