Move entry serialization back to ByteBufWriter
[controller.git] / atomix-storage / src / main / java / io / atomix / storage / journal / JournalSerdes.java
index 29b5bed7ab6ac75c570ca1187e614463e2680613..bf8518de2f8462bc77e83bd83ac64e1cd80e2e27 100644 (file)
  */
 package io.atomix.storage.journal;
 
+import com.esotericsoftware.kryo.KryoException;
 import com.google.common.annotations.Beta;
 import com.google.common.annotations.VisibleForTesting;
 import io.atomix.utils.serializer.KryoJournalSerdesBuilder;
 import io.netty.buffer.ByteBuf;
-import io.netty.buffer.Unpooled;
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
@@ -120,13 +120,21 @@ public interface JournalSerdes {
     default <T> ByteBufMapper<T> toMapper() {
         return new ByteBufMapper<>() {
             @Override
-            public ByteBuf objectToBytes(final T obj) {
-                return Unpooled.wrappedBuffer(serialize(obj));
+            public void objectToBytes(final T obj, final ByteBuf bytes) throws IOException {
+                final var buffer = bytes.nioBuffer();
+                try {
+                    serialize(obj, buffer);
+                } catch (KryoException e) {
+                    throw new IOException(e);
+                } finally {
+                    // adjust writerIndex so that readableBytes() the bytes written
+                    bytes.writerIndex(bytes.readerIndex() + buffer.position());
+                }
             }
 
             @Override
-            public T bytesToObject(final ByteBuf buf) {
-                return deserialize(buf.nioBuffer());
+            public T bytesToObject(final long index, final ByteBuf bytes) {
+                return deserialize(bytes.nioBuffer());
             }
         };
     }