/*
- * Copyright 2017-present Open Networking Foundation
+ * Copyright 2017-2022 Open Networking Foundation and others. All rights reserved.
+ * Copyright (c) 2024 PANTHEON.tech, s.r.o.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
*/
package io.atomix.storage.journal;
+import static io.atomix.storage.journal.SegmentEntry.HEADER_BYTES;
+
import com.esotericsoftware.kryo.KryoException;
import io.atomix.storage.journal.index.JournalIndex;
-
import java.io.IOException;
import java.nio.BufferOverflowException;
import java.nio.BufferUnderflowException;
MappedJournalSegmentWriter(JournalSegmentWriter<E> previous, int position) {
super(previous);
mappedBuffer = mapBuffer(channel, maxSegmentSize);
- buffer = mappedBuffer.slice();
+ buffer = mappedBuffer.slice().position(position);
lastEntry = previous.getLastEntry();
- buffer.position(position);
}
private static @NonNull MappedByteBuffer mapBuffer(FileChannel channel, int maxSegmentSize) {
}
@Override
- FileChannelJournalSegmentWriter<E> toFileChannel() {
+ DiskJournalSegmentWriter<E> toFileChannel() {
final int position = buffer.position();
close();
- return new FileChannelJournalSegmentWriter<>(this, position);
+ return new DiskJournalSegmentWriter<>(this, position);
}
@Override
position = buffer.position() + length;
buffer.position(position);
- buffer.mark();
- length = buffer.getInt();
+ length = buffer.mark().getInt();
}
// Reset the buffer to the previous mark.
// Serialize the entry.
int position = buffer.position();
- if (position + Integer.BYTES + Integer.BYTES > buffer.limit()) {
+ if (position + HEADER_BYTES > buffer.limit()) {
throw new BufferOverflowException();
}
- buffer.position(position + Integer.BYTES + Integer.BYTES);
+ buffer.position(position + HEADER_BYTES);
try {
namespace.serialize(entry, buffer);
throw new BufferOverflowException();
}
- final int length = buffer.position() - (position + Integer.BYTES + Integer.BYTES);
+ final int length = buffer.position() - (position + HEADER_BYTES);
// If the entry length exceeds the maximum entry size then throw an exception.
if (length > maxEntrySize) {
// Compute the checksum for the entry.
final CRC32 crc32 = new CRC32();
- buffer.position(position + Integer.BYTES + Integer.BYTES);
+ buffer.position(position + HEADER_BYTES);
ByteBuffer slice = buffer.slice();
slice.limit(length);
crc32.update(slice);
final long checksum = crc32.getValue();
// Create a single byte[] in memory for the entire entry and write it as a batch to the underlying buffer.
- buffer.position(position);
- buffer.putInt(length);
- buffer.putInt((int) checksum);
- buffer.position(position + Integer.BYTES + Integer.BYTES + length);
+ buffer.position(position).putInt(length).putInt((int) checksum).position(position + HEADER_BYTES + length);
// Update the last entry with the correct index/term/length.
Indexed<E> indexedEntry = new Indexed<>(index, entry, length);
// Zero the entry header at current buffer position.
int position = buffer.position();
// Note: we issue a single putLong() instead of two putInt()s.
- buffer.putLong(0);
- buffer.position(position);
+ buffer.putLong(0).position(position);
}
@Override