Add specific serializer for SimpleReplicatedLogEntry 90/78390/3
authorTom Pantelis <tompantelis@gmail.com>
Mon, 3 Dec 2018 17:59:43 +0000 (12:59 -0500)
committerTom Pantelis <tompantelis@gmail.com>
Mon, 3 Dec 2018 19:34:47 +0000 (14:34 -0500)
JFR shows a lot of re-allocations of the backing byte [] when
serializing SimpleReplicatedLogEntry. We can pretty closely
estimate the total serialized size with some reasonable padding
since the Payload is already serialized. Introduce a
specific akka serializer for this.

Jira: CONTROLLER-1872
Change-Id: I94ac9528657119b5bab60dc2f37eef98f18ca1b0
Signed-off-by: Tom Pantelis <tompantelis@gmail.com>
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/persisted/SimpleReplicatedLogEntry.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/persisted/SimpleReplicatedLogEntrySerializer.java [new file with mode: 0644]
opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/persisted/SimpleReplicatedLogEntrySerializerTest.java [new file with mode: 0644]
opendaylight/md-sal/sal-clustering-config/src/main/resources/initial/factory-akka.conf

index 85fa51e93f0b4fc8e5ce95eb2a3925671a050e2e..14ce5420d26081c06eed9a442e3dd39287c1c730 100644 (file)
@@ -39,6 +39,11 @@ public final class SimpleReplicatedLogEntry implements ReplicatedLogEntry, Seria
             this.replicatedLogEntry = replicatedLogEntry;
         }
 
             this.replicatedLogEntry = replicatedLogEntry;
         }
 
+        static int estimatedSerializedSize(ReplicatedLogEntry replicatedLogEntry) {
+            return 8 /* index */ + 8 /* term */ + replicatedLogEntry.getData().size()
+                    + 400 /* estimated extra padding for class info */;
+        }
+
         @Override
         public void writeExternal(final ObjectOutput out) throws IOException {
             out.writeLong(replicatedLogEntry.getIndex());
         @Override
         public void writeExternal(final ObjectOutput out) throws IOException {
             out.writeLong(replicatedLogEntry.getIndex());
@@ -110,6 +115,10 @@ public final class SimpleReplicatedLogEntry implements ReplicatedLogEntry, Seria
         return new Proxy(this);
     }
 
         return new Proxy(this);
     }
 
+    public int estimatedSerializedSize() {
+        return Proxy.estimatedSerializedSize(this);
+    }
+
     @Override
     public int hashCode() {
         final int prime = 31;
     @Override
     public int hashCode() {
         final int prime = 31;
diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/persisted/SimpleReplicatedLogEntrySerializer.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/persisted/SimpleReplicatedLogEntrySerializer.java
new file mode 100644 (file)
index 0000000..ca6e6df
--- /dev/null
@@ -0,0 +1,73 @@
+/*
+ * Copyright (c) 2018 Red Hat, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.raft.persisted;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import static java.util.Objects.requireNonNull;
+
+import akka.actor.ExtendedActorSystem;
+import akka.serialization.JSerializer;
+import akka.util.ClassLoaderObjectInputStream;
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import org.apache.commons.lang3.SerializationUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Specialized serializer for {@link SimpleReplicatedLogEntry} that optimizes serialization.
+ *
+ * @author Thomas Pantelis
+ */
+public class SimpleReplicatedLogEntrySerializer extends JSerializer {
+    private static final Logger LOG = LoggerFactory.getLogger(SimpleReplicatedLogEntrySerializer.class);
+
+    private final ExtendedActorSystem system;
+
+    public SimpleReplicatedLogEntrySerializer(final ExtendedActorSystem system) {
+        this.system = requireNonNull(system);
+    }
+
+    @Override
+    public int identifier() {
+        return 97439500;
+    }
+
+    @Override
+    public boolean includeManifest() {
+        return false;
+    }
+
+    @Override
+    public byte[] toBinary(Object obj) {
+        checkArgument(obj instanceof SimpleReplicatedLogEntry, "Unsupported object type %s", obj.getClass());
+
+        SimpleReplicatedLogEntry replicatedLogEntry = (SimpleReplicatedLogEntry)obj;
+        final int estimatedSerializedSize = replicatedLogEntry.estimatedSerializedSize();
+
+        final ByteArrayOutputStream bos = new ByteArrayOutputStream(estimatedSerializedSize);
+        SerializationUtils.serialize(replicatedLogEntry, bos);
+        final byte[] bytes = bos.toByteArray();
+
+        LOG.debug("Estimated serialized size {}, data size {} for payload: {}. Actual serialized size: {}",
+            estimatedSerializedSize, replicatedLogEntry.getData().size(), replicatedLogEntry.getData(), bytes.length);
+
+        return bytes;
+    }
+
+    @Override
+    public Object fromBinaryJava(byte[] bytes, Class<?> manifest) {
+        try (ClassLoaderObjectInputStream is = new ClassLoaderObjectInputStream(system.dynamicAccess().classLoader(),
+                new ByteArrayInputStream(bytes))) {
+            return is.readObject();
+        } catch (IOException | ClassNotFoundException e) {
+            throw new IllegalStateException("Failed to deserialize object", e);
+        }
+    }
+}
diff --git a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/persisted/SimpleReplicatedLogEntrySerializerTest.java b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/persisted/SimpleReplicatedLogEntrySerializerTest.java
new file mode 100644 (file)
index 0000000..be48c06
--- /dev/null
@@ -0,0 +1,49 @@
+/*
+ * Copyright (c) 2018 Red Hat, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.raft.persisted;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+
+import akka.actor.ExtendedActorSystem;
+import akka.testkit.javadsl.TestKit;
+import java.io.NotSerializableException;
+import org.junit.Test;
+import org.opendaylight.controller.cluster.raft.MockRaftActorContext;
+
+/**
+ * Unit tests for SimpleReplicatedLogEntrySerializer.
+ *
+ * @author Thomas Pantelis
+ */
+public class SimpleReplicatedLogEntrySerializerTest {
+
+    @Test
+    public void testToAndFromBinary() throws NotSerializableException {
+        SimpleReplicatedLogEntry expected = new SimpleReplicatedLogEntry(0, 1,
+                new MockRaftActorContext.MockPayload("A"));
+
+        final ExtendedActorSystem system = (ExtendedActorSystem) ExtendedActorSystem.create("test");
+        final Object deserialized;
+        try {
+            final SimpleReplicatedLogEntrySerializer serializer = new SimpleReplicatedLogEntrySerializer(system);
+            final byte[] bytes = serializer.toBinary(expected);
+            deserialized = serializer.fromBinary(bytes, SimpleReplicatedLogEntry.class);
+        } finally {
+            TestKit.shutdownActorSystem(system);
+        }
+
+        assertNotNull("fromBinary returned null", deserialized);
+        assertEquals("fromBinary return type", SimpleReplicatedLogEntry.class, deserialized.getClass());
+
+        SimpleReplicatedLogEntry actual = (SimpleReplicatedLogEntry)deserialized;
+        assertEquals("getTerm", expected.getTerm(), actual.getTerm());
+        assertEquals("getIndex", expected.getIndex(), actual.getIndex());
+        assertEquals("getData", expected.getData(), actual.getData());
+    }
+}
index 490bd876262f7faf160962cd34f72587728a58bf..7b86f1c5727483609e781108010e59012b8e6dbe 100644 (file)
@@ -63,11 +63,13 @@ odl-cluster-data {
         java = "akka.serialization.JavaSerializer"
         proto = "akka.remote.serialization.ProtobufSerializer"
         readylocal = "org.opendaylight.controller.cluster.datastore.messages.ReadyLocalTransactionSerializer"
         java = "akka.serialization.JavaSerializer"
         proto = "akka.remote.serialization.ProtobufSerializer"
         readylocal = "org.opendaylight.controller.cluster.datastore.messages.ReadyLocalTransactionSerializer"
+        simpleReplicatedLogEntry = "org.opendaylight.controller.cluster.raft.persisted.SimpleReplicatedLogEntrySerializer"
       }
 
       serialization-bindings {
         "com.google.protobuf.Message" = proto
         "org.opendaylight.controller.cluster.datastore.messages.ReadyLocalTransaction" = readylocal
       }
 
       serialization-bindings {
         "com.google.protobuf.Message" = proto
         "org.opendaylight.controller.cluster.datastore.messages.ReadyLocalTransaction" = readylocal
+        "org.opendaylight.controller.cluster.raft.persisted.SimpleReplicatedLogEntry" = simpleReplicatedLogEntry
       }
 
       default-dispatcher {
       }
 
       default-dispatcher {