/*
- * Copyright 2017-present Open Networking Foundation
+ * Copyright 2017-2022 Open Networking Foundation and others. All rights reserved.
+ * Copyright (c) 2024 PANTHEON.tech, s.r.o.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* @author <a href="http://github.com/kuujo">Jordan Halterman</a>
*/
final class FileChannelJournalSegmentWriter<E> extends JournalSegmentWriter<E> {
- private static final ByteBuffer ZERO_ENTRY_HEADER = ByteBuffer.wrap(new byte[Integer.BYTES + Integer.BYTES]);
+ private static final ByteBuffer ZERO_ENTRY_HEADER = ByteBuffer.wrap(new byte[ENTRY_HEADER_BYTES]);
private final ByteBuffer memory;
private Indexed<E> lastEntry;
}
private static ByteBuffer allocMemory(int maxEntrySize) {
- final var buf = ByteBuffer.allocate((maxEntrySize + Integer.BYTES + Integer.BYTES) * 2);
+ final var buf = ByteBuffer.allocate((maxEntrySize + ENTRY_HEADER_BYTES) * 2);
buf.limit(0);
return buf;
}
nextIndex++;
// Update the current position for indexing.
- currentPosition = currentPosition + Integer.BYTES + Integer.BYTES + length;
+ currentPosition = currentPosition + ENTRY_HEADER_BYTES + length;
memory.position(memory.position() + length);
// Read more bytes from the segment if necessary.
}
}
- @Override
- long getLastIndex() {
- return lastEntry != null ? lastEntry.index() : firstIndex - 1;
- }
-
@Override
Indexed<E> getLastEntry() {
return lastEntry;
}
- @Override
- long getNextIndex() {
- if (lastEntry != null) {
- return lastEntry.index() + 1;
- } else {
- return firstIndex;
- }
- }
-
@Override
@SuppressWarnings("unchecked")
<T extends E> Indexed<T> append(T entry) {
// Serialize the entry.
try {
- namespace.serialize(entry, memory.clear().position(Integer.BYTES + Integer.BYTES));
+ namespace.serialize(entry, memory.clear().position(ENTRY_HEADER_BYTES));
} catch (KryoException e) {
throw new StorageException.TooLarge("Entry size exceeds maximum allowed bytes (" + maxEntrySize + ")");
}
memory.flip();
- final int length = memory.limit() - (Integer.BYTES + Integer.BYTES);
+ final int length = memory.limit() - ENTRY_HEADER_BYTES;
// Ensure there's enough space left in the buffer to store the entry.
- if (maxSegmentSize - currentPosition < length + Integer.BYTES + Integer.BYTES) {
+ if (maxSegmentSize - currentPosition < length + ENTRY_HEADER_BYTES) {
throw new BufferOverflowException();
}
// Compute the checksum for the entry.
final CRC32 crc32 = new CRC32();
- crc32.update(memory.array(), Integer.BYTES + Integer.BYTES, memory.limit() - (Integer.BYTES + Integer.BYTES));
+ crc32.update(memory.array(), ENTRY_HEADER_BYTES, memory.limit() - ENTRY_HEADER_BYTES);
final long checksum = crc32.getValue();
// Create a single byte[] in memory for the entire entry and write it as a batch to the underlying buffer.
this.lastEntry = indexedEntry;
this.index.index(index, (int) currentPosition);
- currentPosition = currentPosition + Integer.BYTES + Integer.BYTES + length;
+ currentPosition = currentPosition + ENTRY_HEADER_BYTES + length;
return (Indexed<T>) indexedEntry;
}