Do not use BufferOverflowException for EOF signalling 84/111084/2
authorRobert Varga <robert.varga@pantheon.tech>
Tue, 26 Mar 2024 23:20:26 +0000 (00:20 +0100)
committerRobert Varga <robert.varga@pantheon.tech>
Wed, 27 Mar 2024 00:15:46 +0000 (01:15 +0100)
A neat benefit of well-structured code is that it makes it clear what
the contracts are.

In this patch we eliminate a source of BufferOverflowExceptions and opt
to report the fact next segment is needed via a @Nullable return.

JIRA: CONTROLLER-2100
Change-Id: I1b9535053561709dc2a4b60446cd7cabd19da659
Signed-off-by: Robert Varga <robert.varga@pantheon.tech>
atomix-storage/src/main/java/io/atomix/storage/journal/JournalSegmentWriter.java
atomix-storage/src/main/java/io/atomix/storage/journal/JournalWriter.java
atomix-storage/src/main/java/io/atomix/storage/journal/SegmentedJournalWriter.java
atomix-storage/src/main/java/io/atomix/storage/journal/StorageException.java

index df8ca35068b53cafd3ca96083c4ef71135d9ff63..c7c035be65b06b187ea8f6f508a30be45812155f 100644 (file)
@@ -20,15 +20,18 @@ import static java.util.Objects.requireNonNull;
 
 import com.esotericsoftware.kryo.KryoException;
 import io.atomix.storage.journal.index.JournalIndex;
-import java.nio.BufferOverflowException;
 import java.nio.ByteBuffer;
 import java.nio.MappedByteBuffer;
 import java.nio.channels.FileChannel;
 import java.util.zip.CRC32;
 import org.eclipse.jdt.annotation.NonNull;
 import org.eclipse.jdt.annotation.Nullable;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 abstract sealed class JournalSegmentWriter<E> permits DiskJournalSegmentWriter, MappedJournalSegmentWriter {
+    private static final Logger LOG = LoggerFactory.getLogger(JournalSegmentWriter.class);
+
     final @NonNull FileChannel channel;
     final @NonNull JournalSegment<E> segment;
     private final @NonNull JournalIndex index;
@@ -88,12 +91,12 @@ abstract sealed class JournalSegmentWriter<E> permits DiskJournalSegmentWriter,
     }
 
     /**
-     * Appends an entry to the journal.
+     * Tries to append an entry to the journal.
      *
      * @param entry The entry to append.
-     * @return The appended indexed entry.
+     * @return The appended indexed entry, or {@code null} if there is not enough space available
      */
-    final <T extends E> Indexed<T> append(final T entry) {
+    final <T extends E> @Nullable Indexed<T> append(final T entry) {
         // Store the entry index.
         final long index = getNextIndex();
         final int position = currentPosition;
@@ -102,7 +105,8 @@ abstract sealed class JournalSegmentWriter<E> permits DiskJournalSegmentWriter,
         final int bodyPosition = position + HEADER_BYTES;
         final int avail = maxSegmentSize - bodyPosition;
         if (avail < 0) {
-          throw new BufferOverflowException();
+            LOG.trace("Not enough space for {} at {}", index, position);
+            return null;
         }
 
         final var writeLimit = Math.min(avail, maxEntrySize);
@@ -112,11 +116,12 @@ abstract sealed class JournalSegmentWriter<E> permits DiskJournalSegmentWriter,
         } catch (KryoException e) {
             if (writeLimit != maxEntrySize) {
                 // We have not provided enough capacity, signal to roll to next segment
-                throw new BufferOverflowException();
+                LOG.trace("Tail serialization with {} bytes available failed", writeLimit, e);
+                return null;
             }
 
             // Just reset the buffer. There's no need to zero the bytes since we haven't written the length or checksum.
-            throw new StorageException.TooLarge("Entry size exceeds maximum allowed bytes (" + maxEntrySize + ")");
+            throw new StorageException.TooLarge("Entry size exceeds maximum allowed bytes (" + maxEntrySize + ")", e);
         }
 
         final int length = diskEntry.position() - HEADER_BYTES;
index b8acf1e5937f2194c4485e23e2845e3244742aea..1462463e9d68c8049f68f85ea34fdfbb4734144a 100644 (file)
  */
 package io.atomix.storage.journal;
 
+import org.eclipse.jdt.annotation.NonNull;
+
 /**
  * Log writer.
  *
  * @author <a href="http://github.com/kuujo">Jordan Halterman</a>
  */
 public interface JournalWriter<E> {
-  /**
-   * Returns the last written index.
-   *
-   * @return The last written index.
-   */
-  long getLastIndex();
+    /**
+     * Returns the last written index.
+     *
+     * @return The last written index.
+     */
+    long getLastIndex();
 
-  /**
-   * Returns the last entry written.
-   *
-   * @return The last entry written.
-   */
-  Indexed<E> getLastEntry();
+    /**
+     * Returns the last entry written.
+     *
+     * @return The last entry written.
+     */
+    Indexed<E> getLastEntry();
 
-  /**
-   * Returns the next index to be written.
-   *
-   * @return The next index to be written.
-   */
-  long getNextIndex();
+    /**
+     * Returns the next index to be written.
+     *
+     * @return The next index to be written.
+     */
+    long getNextIndex();
 
-  /**
-   * Appends an entry to the journal.
-   *
-   * @param entry The entry to append.
-   * @return The appended indexed entry.
-   */
-  <T extends E> Indexed<T> append(T entry);
+    /**
+     * Appends an entry to the journal.
+     *
+     * @param entry The entry to append.
+     * @return The appended indexed entry.
+     */
+    <T extends E> @NonNull Indexed<T> append(T entry);
 
-  /**
-   * Commits entries up to the given index.
-   *
-   * @param index The index up to which to commit entries.
-   */
-  void commit(long index);
+    /**
+     * Commits entries up to the given index.
+     *
+     * @param index The index up to which to commit entries.
+     */
+    void commit(long index);
 
-  /**
-   * Resets the head of the journal to the given index.
-   *
-   * @param index the index to which to reset the head of the journal
-   */
-  void reset(long index);
+    /**
+     * Resets the head of the journal to the given index.
+     *
+     * @param index the index to which to reset the head of the journal
+     */
+    void reset(long index);
 
-  /**
-   * Truncates the log to the given index.
-   *
-   * @param index The index to which to truncate the log.
-   */
-  void truncate(long index);
+    /**
+     * Truncates the log to the given index.
+     *
+     * @param index The index to which to truncate the log.
+     */
+    void truncate(long index);
 
-  /**
-   * Flushes written entries to disk.
-   */
-  void flush();
+    /**
+     * Flushes written entries to disk.
+     */
+    void flush();
 }
index 389c06aa4ec77cada95360182f47c78ead15a6de..a95622e5535ef69af5c405729fd84d6bd96ab515 100644 (file)
@@ -15,7 +15,7 @@
  */
 package io.atomix.storage.journal;
 
-import java.nio.BufferOverflowException;
+import static com.google.common.base.Verify.verifyNotNull;
 
 /**
  * Raft log writer.
@@ -70,19 +70,17 @@ final class SegmentedJournalWriter<E> implements JournalWriter<E> {
 
   @Override
   public <T extends E> Indexed<T> append(T entry) {
-    try {
-      return currentWriter.append(entry);
-    } catch (BufferOverflowException e) {
-      if (currentSegment.firstIndex() == currentWriter.getNextIndex()) {
-        throw e;
-      }
+    var indexed = currentWriter.append(entry);
+    if (indexed != null) {
+      return indexed;
     }
 
+    //  Slow path: we do not have enough capacity
     currentWriter.flush();
     currentSegment.releaseWriter();
     currentSegment = journal.getNextSegment();
     currentWriter = currentSegment.acquireWriter();
-    return currentWriter.append(entry);
+    return verifyNotNull(currentWriter.append(entry));
   }
 
   @Override
index e5dd9a2073a3e03699be04f0ed69f47c81999c4c..0a220ec6526b7ac7e897b5943335c6762a2a3d7e 100644 (file)
@@ -50,6 +50,10 @@ public class StorageException extends RuntimeException {
         public TooLarge(final String message) {
             super(message);
         }
+
+        public TooLarge(final String message, final Throwable cause) {
+            super(message, cause);
+        }
     }
 
     /**