import static io.atomix.storage.journal.SegmentEntry.HEADER_BYTES;
-import com.esotericsoftware.kryo.KryoException;
-import io.atomix.storage.journal.StorageException.TooLarge;
import io.atomix.storage.journal.index.JournalIndex;
import java.io.IOException;
-import java.nio.BufferOverflowException;
import java.nio.ByteBuffer;
import java.nio.MappedByteBuffer;
import java.nio.channels.FileChannel;
-import java.util.zip.CRC32;
/**
* Segment writer.
* @author <a href="http://github.com/kuujo">Jordan Halterman</a>
*/
final class DiskJournalSegmentWriter<E> extends JournalSegmentWriter<E> {
- private static final ByteBuffer ZERO_ENTRY_HEADER = ByteBuffer.wrap(new byte[HEADER_BYTES]);
-
- private final JournalSegmentReader<E> reader;
- private final ByteBuffer buffer;
-
- DiskJournalSegmentWriter(final FileChannel channel, final JournalSegment<E> segment, final int maxEntrySize,
- final JournalIndex index, final JournalSerdes namespace) {
- super(channel, segment, maxEntrySize, index, namespace);
-
- buffer = DiskFileReader.allocateBuffer(maxSegmentSize, maxEntrySize);
- reader = new JournalSegmentReader<>(segment,
- new DiskFileReader(segment.file().file().toPath(), channel, buffer), maxEntrySize, namespace);
- reset(0);
- }
-
- DiskJournalSegmentWriter(final JournalSegmentWriter<E> previous) {
- super(previous);
-
- buffer = DiskFileReader.allocateBuffer(maxSegmentSize, maxEntrySize);
- reader = new JournalSegmentReader<>(segment,
- new DiskFileReader(segment.file().file().toPath(), channel, buffer), maxEntrySize, namespace);
- }
-
- @Override
- MappedByteBuffer buffer() {
- return null;
- }
-
- @Override
- MappedJournalSegmentWriter<E> toMapped() {
- return new MappedJournalSegmentWriter<>(this);
- }
-
- @Override
- DiskJournalSegmentWriter<E> toFileChannel() {
- return this;
- }
-
- @Override
- JournalSegmentReader<E> reader() {
- return reader;
- }
-
- @Override
- @SuppressWarnings("unchecked")
- <T extends E> Indexed<T> append(final T entry) {
- // Store the entry index.
- final long index = getNextIndex();
-
- // Serialize the entry.
- try {
- namespace.serialize(entry, buffer.clear().position(HEADER_BYTES));
- } catch (KryoException e) {
- throw new StorageException.TooLarge("Entry size exceeds maximum allowed bytes (" + maxEntrySize + ")");
- }
- buffer.flip();
-
- final int length = buffer.limit() - HEADER_BYTES;
- // Ensure there's enough space left in the buffer to store the entry.
- if (maxSegmentSize - currentPosition < length + HEADER_BYTES) {
- throw new BufferOverflowException();
- }
-
- // If the entry length exceeds the maximum entry size then throw an exception.
- if (length > maxEntrySize) {
- throw new TooLarge("Entry size " + length + " exceeds maximum allowed bytes (" + maxEntrySize + ")");
- }
-
- // Compute the checksum for the entry.
- final var crc32 = new CRC32();
- crc32.update(buffer.slice(HEADER_BYTES, length));
-
- // Create a single byte[] in memory for the entire entry and write it as a batch to the underlying buffer.
- buffer.putInt(0, length).putInt(Integer.BYTES, (int) crc32.getValue());
- try {
- channel.write(buffer, currentPosition);
- } catch (IOException e) {
- throw new StorageException(e);
- }
-
- // Update the last entry with the correct index/term/length.
- final var indexedEntry = new Indexed<E>(index, entry, length);
- lastEntry = indexedEntry;
- this.index.index(index, currentPosition);
-
- currentPosition = currentPosition + HEADER_BYTES + length;
- return (Indexed<T>) indexedEntry;
- }
-
- @Override
- void writeEmptyHeader(final int position) {
- try {
- channel.write(ZERO_ENTRY_HEADER.asReadOnlyBuffer(), position);
- } catch (IOException e) {
- throw new StorageException(e);
+ private static final ByteBuffer ZERO_ENTRY_HEADER = ByteBuffer.wrap(new byte[HEADER_BYTES]);
+
+ private final JournalSegmentReader<E> reader;
+ private final ByteBuffer buffer;
+
+ DiskJournalSegmentWriter(final FileChannel channel, final JournalSegment<E> segment, final int maxEntrySize,
+ final JournalIndex index, final JournalSerdes namespace) {
+ super(channel, segment, maxEntrySize, index, namespace);
+
+ buffer = DiskFileReader.allocateBuffer(maxSegmentSize, maxEntrySize);
+ reader = new JournalSegmentReader<>(segment,
+ new DiskFileReader(segment.file().file().toPath(), channel, buffer), maxEntrySize, namespace);
+ reset(0);
+ }
+
+ DiskJournalSegmentWriter(final JournalSegmentWriter<E> previous) {
+ super(previous);
+
+ buffer = DiskFileReader.allocateBuffer(maxSegmentSize, maxEntrySize);
+ reader = new JournalSegmentReader<>(segment,
+ new DiskFileReader(segment.file().file().toPath(), channel, buffer), maxEntrySize, namespace);
+ }
+
+ @Override
+ MappedByteBuffer buffer() {
+ return null;
+ }
+
+ @Override
+ MappedJournalSegmentWriter<E> toMapped() {
+ return new MappedJournalSegmentWriter<>(this);
+ }
+
+ @Override
+ DiskJournalSegmentWriter<E> toFileChannel() {
+ return this;
+ }
+
+ @Override
+ JournalSegmentReader<E> reader() {
+ return reader;
}
- }
-
- @Override
- void flush() {
- try {
- if (channel.isOpen()) {
- channel.force(true);
- }
- } catch (IOException e) {
- throw new StorageException(e);
+
+ @Override
+ ByteBuffer startWrite(final int position, final int size) {
+ return buffer.clear().slice(0, size);
+ }
+
+ @Override
+ void commitWrite(final int position, final ByteBuffer entry) {
+ try {
+ channel.write(entry, position);
+ } catch (IOException e) {
+ throw new StorageException(e);
+ }
}
- }
- @Override
- void close() {
- flush();
- }
+ @Override
+ void writeEmptyHeader(final int position) {
+ try {
+ channel.write(ZERO_ENTRY_HEADER.asReadOnlyBuffer(), position);
+ } catch (IOException e) {
+ throw new StorageException(e);
+ }
+ }
+
+ @Override
+ void flush() {
+ try {
+ if (channel.isOpen()) {
+ channel.force(true);
+ }
+ } catch (IOException e) {
+ throw new StorageException(e);
+ }
+ }
+
+ @Override
+ void close() {
+ flush();
+ }
}