X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;f=atomix-storage%2Fsrc%2Fmain%2Fjava%2Fio%2Fatomix%2Fstorage%2Fjournal%2FMappedJournalSegmentWriter.java;h=99180c5840321165696796a45234f5dcfcbb237e;hb=479ecb0109aaa1c8682dd083600b777cbd68dc07;hp=f78e31541bffabda7ded4e9b5bab53093c3d1b3c;hpb=129be44b99a5e436beb30008552729b7f9583514;p=controller.git diff --git a/atomix-storage/src/main/java/io/atomix/storage/journal/MappedJournalSegmentWriter.java b/atomix-storage/src/main/java/io/atomix/storage/journal/MappedJournalSegmentWriter.java index f78e31541b..99180c5840 100644 --- a/atomix-storage/src/main/java/io/atomix/storage/journal/MappedJournalSegmentWriter.java +++ b/atomix-storage/src/main/java/io/atomix/storage/journal/MappedJournalSegmentWriter.java @@ -1,5 +1,6 @@ /* - * Copyright 2017-present Open Networking Foundation + * Copyright 2017-2022 Open Networking Foundation and others. All rights reserved. + * Copyright (c) 2024 PANTHEON.tech, s.r.o. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -15,9 +16,10 @@ */ package io.atomix.storage.journal; +import static io.atomix.storage.journal.SegmentEntry.HEADER_BYTES; + import com.esotericsoftware.kryo.KryoException; import io.atomix.storage.journal.index.JournalIndex; - import java.io.IOException; import java.nio.BufferOverflowException; import java.nio.BufferUnderflowException; @@ -63,9 +65,8 @@ final class MappedJournalSegmentWriter extends JournalSegmentWriter { MappedJournalSegmentWriter(JournalSegmentWriter previous, int position) { super(previous); mappedBuffer = mapBuffer(channel, maxSegmentSize); - buffer = mappedBuffer.slice(); + buffer = mappedBuffer.slice().position(position); lastEntry = previous.getLastEntry(); - buffer.position(position); } private static @NonNull MappedByteBuffer mapBuffer(FileChannel channel, int maxSegmentSize) { @@ -87,10 +88,10 @@ final class MappedJournalSegmentWriter extends JournalSegmentWriter { } @Override - FileChannelJournalSegmentWriter toFileChannel() { + DiskJournalSegmentWriter toFileChannel() { final int position = buffer.position(); close(); - return new FileChannelJournalSegmentWriter<>(this, position); + return new DiskJournalSegmentWriter<>(this, position); } @Override @@ -138,8 +139,7 @@ final class MappedJournalSegmentWriter extends JournalSegmentWriter { position = buffer.position() + length; buffer.position(position); - buffer.mark(); - length = buffer.getInt(); + length = buffer.mark().getInt(); } // Reset the buffer to the previous mark. @@ -162,11 +162,11 @@ final class MappedJournalSegmentWriter extends JournalSegmentWriter { // Serialize the entry. int position = buffer.position(); - if (position + ENTRY_HEADER_BYTES > buffer.limit()) { + if (position + HEADER_BYTES > buffer.limit()) { throw new BufferOverflowException(); } - buffer.position(position + ENTRY_HEADER_BYTES); + buffer.position(position + HEADER_BYTES); try { namespace.serialize(entry, buffer); @@ -174,7 +174,7 @@ final class MappedJournalSegmentWriter extends JournalSegmentWriter { throw new BufferOverflowException(); } - final int length = buffer.position() - (position + ENTRY_HEADER_BYTES); + final int length = buffer.position() - (position + HEADER_BYTES); // If the entry length exceeds the maximum entry size then throw an exception. if (length > maxEntrySize) { @@ -185,17 +185,14 @@ final class MappedJournalSegmentWriter extends JournalSegmentWriter { // Compute the checksum for the entry. final CRC32 crc32 = new CRC32(); - buffer.position(position + ENTRY_HEADER_BYTES); + buffer.position(position + HEADER_BYTES); ByteBuffer slice = buffer.slice(); slice.limit(length); crc32.update(slice); final long checksum = crc32.getValue(); // Create a single byte[] in memory for the entire entry and write it as a batch to the underlying buffer. - buffer.position(position); - buffer.putInt(length); - buffer.putInt((int) checksum); - buffer.position(position + ENTRY_HEADER_BYTES + length); + buffer.position(position).putInt(length).putInt((int) checksum).position(position + HEADER_BYTES + length); // Update the last entry with the correct index/term/length. Indexed indexedEntry = new Indexed<>(index, entry, length); @@ -228,8 +225,7 @@ final class MappedJournalSegmentWriter extends JournalSegmentWriter { // Zero the entry header at current buffer position. int position = buffer.position(); // Note: we issue a single putLong() instead of two putInt()s. - buffer.putLong(0); - buffer.position(position); + buffer.putLong(0).position(position); } @Override