Separate byte-level atomic-storage access
[controller.git] / atomix-storage / src / main / java / io / atomix / storage / journal / SegmentedJournalWriter.java
index 71120891a1514847ed44789c10e1f5b95692d0de..7c331ccb2463d5bf595a23ae319712cd4a042d38 100644 (file)
@@ -1,5 +1,6 @@
 /*
  * Copyright 2017-2022 Open Networking Foundation and others.  All rights reserved.
+ * Copyright (c) 2024 PANTHEON.tech, s.r.o. and others.
  *
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
  */
 package io.atomix.storage.journal;
 
-import static com.google.common.base.Verify.verifyNotNull;
+import static java.util.Objects.requireNonNull;
 
 /**
- * Raft log writer.
+ * A {@link JournalWriter} backed by a {@link ByteBufWriter}.
  */
 final class SegmentedJournalWriter<E> implements JournalWriter<E> {
-  private final SegmentedJournal<E> journal;
-  private JournalSegment currentSegment;
-  private JournalSegmentWriter currentWriter;
+    private final ByteBufMapper<E> mapper;
+    private final ByteBufWriter writer;
 
-  SegmentedJournalWriter(SegmentedJournal<E> journal) {
-    this.journal = journal;
-    this.currentSegment = journal.getLastSegment();
-    this.currentWriter = currentSegment.acquireWriter();
-  }
-
-  @Override
-  public long getLastIndex() {
-    return currentWriter.getLastIndex();
-  }
-
-  @Override
-  public long getNextIndex() {
-    return currentWriter.getNextIndex();
-  }
-
-  @Override
-  public void reset(long index) {
-    if (index > currentSegment.firstIndex()) {
-      currentSegment.releaseWriter();
-      currentSegment = journal.resetSegments(index);
-      currentWriter = currentSegment.acquireWriter();
-    } else {
-      truncate(index - 1);
+    SegmentedJournalWriter(final ByteBufWriter writer, final ByteBufMapper<E> mapper) {
+        this.writer = requireNonNull(writer);
+        this.mapper = requireNonNull(mapper);
     }
-    journal.resetHead(index);
-  }
 
-  @Override
-  public void commit(long index) {
-    if (index > journal.getCommitIndex()) {
-      journal.setCommitIndex(index);
-      if (journal.isFlushOnCommit()) {
-        flush();
-      }
+    @Override
+    public long getLastIndex() {
+        return writer.lastIndex();
     }
-  }
 
-  @Override
-  public <T extends E> Indexed<T> append(T entry) {
-    final var bytes = journal.serializer().serialize(entry);
-    var index = currentWriter.append(bytes);
-    if (index != null) {
-      return new Indexed<>(index, entry, bytes.readableBytes());
+    @Override
+    public long getNextIndex() {
+        return writer.nextIndex();
     }
 
-    //  Slow path: we do not have enough capacity
-    currentWriter.flush();
-    currentSegment.releaseWriter();
-    currentSegment = journal.getNextSegment();
-    currentWriter = currentSegment.acquireWriter();
-    final var newIndex = verifyNotNull(currentWriter.append(bytes));
-    return new Indexed<>(newIndex, entry, bytes.readableBytes());
-  }
-
-  @Override
-  public void truncate(long index) {
-    if (index < journal.getCommitIndex()) {
-      throw new IndexOutOfBoundsException("Cannot truncate committed index: " + index);
+    @Override
+    public void reset(final long index) {
+        writer.reset(index);
     }
 
-    // Delete all segments with first indexes greater than the given index.
-    while (index < currentSegment.firstIndex() && currentSegment != journal.getFirstSegment()) {
-      currentSegment.releaseWriter();
-      journal.removeSegment(currentSegment);
-      currentSegment = journal.getLastSegment();
-      currentWriter = currentSegment.acquireWriter();
+    @Override
+    public void commit(final long index) {
+        writer.commit(index);
     }
 
-    // Truncate the current index.
-    currentWriter.truncate(index);
+    @Override
+    public <T extends E> Indexed<T> append(final T entry) {
+        final var buf = mapper.objectToBytes(entry);
+        return new Indexed<>(writer.append(buf), entry, buf.readableBytes());
+    }
 
-    // Reset segment readers.
-    journal.resetTail(index + 1);
-  }
+    @Override
+    public void truncate(final long index) {
+        writer.truncate(index);
+    }
 
-  @Override
-  public void flush() {
-    currentWriter.flush();
-  }
+    @Override
+    public void flush() {
+        writer.flush();
+    }
 }