From 07f772e01ad41bfa6aacf037c2e19b2be87f3bc6 Mon Sep 17 00:00:00 2001 From: Robert Varga Date: Sat, 9 Mar 2024 18:28:02 +0100 Subject: [PATCH] Track channel position in explicit field Rather than manipulating the channel, use an explicit field to keep track of our current position. JIRA: CONTROLLER-2095 Change-Id: I50adbef833251035f7c54ec624f9a03d59b5c7c9 Signed-off-by: Robert Varga --- .../FileChannelJournalSegmentWriter.java | 90 +++++++++---------- 1 file changed, 44 insertions(+), 46 deletions(-) diff --git a/atomix-storage/src/main/java/io/atomix/storage/journal/FileChannelJournalSegmentWriter.java b/atomix-storage/src/main/java/io/atomix/storage/journal/FileChannelJournalSegmentWriter.java index ed9ff922f7..f6dda9c6e0 100644 --- a/atomix-storage/src/main/java/io/atomix/storage/journal/FileChannelJournalSegmentWriter.java +++ b/atomix-storage/src/main/java/io/atomix/storage/journal/FileChannelJournalSegmentWriter.java @@ -51,6 +51,7 @@ class FileChannelJournalSegmentWriter implements JournalWriter { private final ByteBuffer memory; private final long firstIndex; private Indexed lastEntry; + private long currentPosition; FileChannelJournalSegmentWriter( FileChannel channel, @@ -74,15 +75,12 @@ class FileChannelJournalSegmentWriter implements JournalWriter { long nextIndex = firstIndex; // Clear the buffer indexes. - try { - channel.position(JournalSegmentDescriptor.BYTES); - - // Record the current buffer position. - long position = channel.position(); + currentPosition = JournalSegmentDescriptor.BYTES; + try { // Clear memory buffer and read fist chunk memory.clear(); - channel.read(memory, position); + channel.read(memory, JournalSegmentDescriptor.BYTES); memory.flip(); // Read the entry length. @@ -110,18 +108,17 @@ class FileChannelJournalSegmentWriter implements JournalWriter { entryBytes.rewind(); final E entry = namespace.deserialize(entryBytes); lastEntry = new Indexed<>(nextIndex, entry, length); - this.index.index(nextIndex, (int) position); + this.index.index(nextIndex, (int) currentPosition); nextIndex++; // Update the current position for indexing. - position = position + Integer.BYTES + Integer.BYTES + length; - channel.position(position); + currentPosition = currentPosition + Integer.BYTES + Integer.BYTES + length; memory.position(memory.position() + length); // Read more bytes from the segment if necessary. if (memory.remaining() < maxEntrySize) { memory.clear(); - channel.read(memory, position); + channel.read(memory, currentPosition); memory.flip(); } @@ -175,48 +172,49 @@ class FileChannelJournalSegmentWriter implements JournalWriter { // Store the entry index. final long index = getNextIndex(); + // Serialize the entry. + memory.clear(); + memory.position(Integer.BYTES + Integer.BYTES); try { - // Serialize the entry. - memory.clear(); - memory.position(Integer.BYTES + Integer.BYTES); - try { - namespace.serialize(entry, memory); - } catch (KryoException e) { - throw new StorageException.TooLarge("Entry size exceeds maximum allowed bytes (" + maxEntrySize + ")"); - } - memory.flip(); + namespace.serialize(entry, memory); + } catch (KryoException e) { + throw new StorageException.TooLarge("Entry size exceeds maximum allowed bytes (" + maxEntrySize + ")"); + } + memory.flip(); - final int length = memory.limit() - (Integer.BYTES + Integer.BYTES); + final int length = memory.limit() - (Integer.BYTES + Integer.BYTES); - // Ensure there's enough space left in the buffer to store the entry. - long position = channel.position(); - if (segment.descriptor().maxSegmentSize() - position < length + Integer.BYTES + Integer.BYTES) { - throw new BufferOverflowException(); - } + // Ensure there's enough space left in the buffer to store the entry. + if (segment.descriptor().maxSegmentSize() - currentPosition < length + Integer.BYTES + Integer.BYTES) { + throw new BufferOverflowException(); + } - // If the entry length exceeds the maximum entry size then throw an exception. - if (length > maxEntrySize) { - throw new StorageException.TooLarge("Entry size " + length + " exceeds maximum allowed bytes (" + maxEntrySize + ")"); - } + // If the entry length exceeds the maximum entry size then throw an exception. + if (length > maxEntrySize) { + throw new StorageException.TooLarge("Entry size " + length + " exceeds maximum allowed bytes (" + maxEntrySize + ")"); + } + + // Compute the checksum for the entry. + final CRC32 crc32 = new CRC32(); + crc32.update(memory.array(), Integer.BYTES + Integer.BYTES, memory.limit() - (Integer.BYTES + Integer.BYTES)); + final long checksum = crc32.getValue(); - // Compute the checksum for the entry. - final CRC32 crc32 = new CRC32(); - crc32.update(memory.array(), Integer.BYTES + Integer.BYTES, memory.limit() - (Integer.BYTES + Integer.BYTES)); - final long checksum = crc32.getValue(); - - // Create a single byte[] in memory for the entire entry and write it as a batch to the underlying buffer. - memory.putInt(0, length); - memory.putInt(Integer.BYTES, (int) checksum); - channel.write(memory); - - // Update the last entry with the correct index/term/length. - Indexed indexedEntry = new Indexed<>(index, entry, length); - this.lastEntry = indexedEntry; - this.index.index(index, (int) position); - return (Indexed) indexedEntry; + // Create a single byte[] in memory for the entire entry and write it as a batch to the underlying buffer. + memory.putInt(0, length); + memory.putInt(Integer.BYTES, (int) checksum); + try { + channel.write(memory, currentPosition); } catch (IOException e) { throw new StorageException(e); } + + // Update the last entry with the correct index/term/length. + Indexed indexedEntry = new Indexed<>(index, entry, length); + this.lastEntry = indexedEntry; + this.index.index(index, (int) currentPosition); + + currentPosition = currentPosition + Integer.BYTES + Integer.BYTES + length; + return (Indexed) indexedEntry; } @Override @@ -240,14 +238,14 @@ class FileChannelJournalSegmentWriter implements JournalWriter { try { if (index < segment.index()) { // Reset the writer to the first entry. - channel.position(JournalSegmentDescriptor.BYTES); + currentPosition = JournalSegmentDescriptor.BYTES; } else { // Reset the writer to the given index. reset(index); } // Zero the entry header at current channel position. - channel.write(ZERO_ENTRY_HEADER.asReadOnlyBuffer(), channel.position()); + channel.write(ZERO_ENTRY_HEADER.asReadOnlyBuffer(), currentPosition); } catch (IOException e) { throw new StorageException(e); } -- 2.36.6