Improve ByteBufMapper.objectToBytes() contract 56/111656/7
authorRobert Varga <robert.varga@pantheon.tech>
Wed, 8 May 2024 05:51:31 +0000 (07:51 +0200)
committerRobert Varga <nite@hq.sk>
Wed, 8 May 2024 23:41:09 +0000 (23:41 +0000)
We currently treat each IOException as if it were caused by an overflow,
but other causes may exist.

Differentiate between EOFException and plain old IOException, the former
indicating an overflow.

For JournalSerdes/KryoException examine the message as there is no other
indicator.

JIRA: CONTROLLER-2115
Change-Id: I58f0de1fdb5652f0887502f610cce42395d14cae
Signed-off-by: Robert Varga <robert.varga@pantheon.tech>
atomix-storage/src/main/java/io/atomix/storage/journal/ByteBufMapper.java
atomix-storage/src/main/java/io/atomix/storage/journal/JournalSegmentWriter.java
atomix-storage/src/main/java/io/atomix/storage/journal/JournalSerdes.java

index a0f6f804494f0e679ac7ebd7a0ccb66d8f5933aa..e29dbcb60a5bf586f09975e53c40fb7c6baa57e2 100644 (file)
@@ -16,6 +16,7 @@
 package io.atomix.storage.journal;
 
 import io.netty.buffer.ByteBuf;
+import java.io.EOFException;
 import java.io.IOException;
 import org.eclipse.jdt.annotation.NonNullByDefault;
 
@@ -38,7 +39,8 @@ public interface ByteBufMapper<T> {
      *
      * @param obj the object
      * @param buf target buffer
-     * @throws IOException if an I/O error occurs
+     * @throws EOFException if the buffer does not have sufficient capacity
+     * @throws IOException if some other I/O error occurs
      */
     void objectToBytes(T obj, ByteBuf buf) throws IOException;
 }
index 33f596f496604b90a4f88c7f93c803ff097d1452..9c3da7a42e9e049089f98d3d9e1e8517fbed7bdf 100644 (file)
@@ -21,6 +21,7 @@ import static java.util.Objects.requireNonNull;
 import io.atomix.storage.journal.StorageException.TooLarge;
 import io.atomix.storage.journal.index.JournalIndex;
 import io.netty.buffer.Unpooled;
+import java.io.EOFException;
 import java.io.IOException;
 import java.nio.MappedByteBuffer;
 import org.eclipse.jdt.annotation.NonNull;
@@ -99,7 +100,7 @@ final class JournalSegmentWriter {
         final var bytes = Unpooled.wrappedBuffer(diskEntry.position(HEADER_BYTES));
         try {
             mapper.objectToBytes(entry, bytes);
-        } catch (IOException e) {
+        } catch (EOFException e) {
             // We ran out of buffer space: let's decide who's fault it is:
             if (writeLimit == maxEntrySize) {
                 // - it is the entry and/or mapper. This is not exactly accurate, as there may be other serialization
@@ -110,6 +111,8 @@ final class JournalSegmentWriter {
             // - it is us, as we do not have the capacity to hold maxEntrySize bytes
             LOG.trace("Tail serialization with {} bytes available failed", writeLimit, e);
             return null;
+        } catch (IOException e) {
+            throw new StorageException(e);
         }
 
         // Determine length, trim distEntry and compute checksum. We are okay with computeChecksum() consuming
index bf8518de2f8462bc77e83bd83ac64e1cd80e2e27..0265b8c6799e4bb2df144214d06fb3620b7616d2 100644 (file)
@@ -21,6 +21,7 @@ import com.google.common.annotations.Beta;
 import com.google.common.annotations.VisibleForTesting;
 import io.atomix.utils.serializer.KryoJournalSerdesBuilder;
 import io.netty.buffer.ByteBuf;
+import java.io.EOFException;
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
@@ -125,7 +126,7 @@ public interface JournalSerdes {
                 try {
                     serialize(obj, buffer);
                 } catch (KryoException e) {
-                    throw new IOException(e);
+                    throw newIOException(e);
                 } finally {
                     // adjust writerIndex so that readableBytes() the bytes written
                     bytes.writerIndex(bytes.readerIndex() + buffer.position());
@@ -136,6 +137,24 @@ public interface JournalSerdes {
             public T bytesToObject(final long index, final ByteBuf bytes) {
                 return deserialize(bytes.nioBuffer());
             }
+
+            private static IOException newIOException(final KryoException cause) {
+                // We may have multiple nested KryoExceptions, intertwined with others, like IOExceptions. Let's find
+                // the deepest one.
+                var rootKryo = cause;
+                for (var nextCause = rootKryo.getCause(); nextCause != null; nextCause = nextCause.getCause()) {
+                    if (nextCause instanceof KryoException kryo) {
+                        rootKryo = kryo;
+                    }
+                }
+                // It would be nice to have a better way of discerning these, but alas it is what it is.
+                if (rootKryo.getMessage().startsWith("Buffer overflow.")) {
+                    final var ex = new EOFException();
+                    ex.initCause(cause);
+                    return ex;
+                }
+                return new IOException(rootKryo);
+            }
         };
     }