Add Payload.serializedSize() 10/101310/3
authorRobert Varga <robert.varga@pantheon.tech>
Wed, 18 May 2022 07:15:22 +0000 (09:15 +0200)
committerRobert Varga <nite@hq.sk>
Thu, 26 May 2022 21:59:14 +0000 (21:59 +0000)
There is a rather big difference in payload sizing when we consider
how big an entry is: in-memory size can easily be "zero", which can
translate to a serialized size of hundreds of bytes. This difference
is problematic, as we use the former to estimate how many payloads
we can squeeze in AppendEntries and we compare that to the configured
payload limit. Even when there is some (32KiB by default) cushion, we
can end up blowing past the frame size.

Add Payload.serializedSize(), which should provide a semi-conservative
estimate of serialized size and use that to select the cut-off.

Also improve SimpleReplicatedLogEntry's estimates by performing a a
quick serialization operation -- which reduces potential waste for each
entry by 294 bytes, as our hard-coded estimate of 400 bytes was way too
conservative.

JIRA: CONTROLLER-2037
Change-Id: I5abe7d00db9e10f1c66e6db0f7c82854f9aa352d
Signed-off-by: Robert Varga <robert.varga@pantheon.tech>
(cherry picked from commit 86e8e4a06b682aa772c834a2cef56d0596540e1b)

24 files changed:
opendaylight/md-sal/sal-akka-raft-example/src/main/java/org/opendaylight/controller/cluster/example/messages/KeyValue.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/AbstractReplicatedLogImpl.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/ReplicatedLogEntry.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractLeader.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/messages/Payload.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/persisted/NoopPayload.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/persisted/ServerConfigurationPayload.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/persisted/SimpleReplicatedLogEntry.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/persisted/SimpleReplicatedLogEntrySerializer.java
opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/AbstractRaftActorIntegrationTest.java
opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/AbstractReplicatedLogImplTest.java
opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/MockRaftActorContext.java
opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/RaftActorDelegatingPersistentDataProviderTest.java
opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/ReplicationAndSnapshotsWithLaggingFollowerIntegrationTest.java
opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/LeaderTest.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/persisted/AbortTransactionPayload.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/persisted/AbstractIdentifiablePayload.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/persisted/CloseLocalHistoryPayload.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/persisted/CommitTransactionPayload.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/persisted/CreateLocalHistoryPayload.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/persisted/DisableTrackingPayload.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/persisted/PurgeLocalHistoryPayload.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/persisted/PurgeTransactionPayload.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/persisted/SkipTransactionsPayload.java

index 93121834fef254cc7b8ad8dc64e3c1b00417eb09..78eea5cd862a8f26dd7d1b73df156266f58fe8e3 100644 (file)
@@ -18,7 +18,7 @@ public final class KeyValue extends Payload {
     public KeyValue() {
     }
 
     public KeyValue() {
     }
 
-    public KeyValue(String key, String value) {
+    public KeyValue(final String key, final String value) {
         this.key = key;
         this.value = value;
     }
         this.key = key;
         this.value = value;
     }
@@ -32,13 +32,19 @@ public final class KeyValue extends Payload {
     }
 
     @Override
     }
 
     @Override
-    public String toString() {
-        return "KeyValue{" + "key='" + key + '\'' + ", value='" + value + '\'' + '}';
+    public int size() {
+        return value.length() + key.length();
     }
 
     @Override
     }
 
     @Override
-    public int size() {
-        return value.length() + key.length();
+    public int serializedSize() {
+        // Should be a better estimate
+        return size();
+    }
+
+    @Override
+    public String toString() {
+        return "KeyValue{" + "key='" + key + '\'' + ", value='" + value + '\'' + '}';
     }
 
     @Override
     }
 
     @Override
index 64506ee6867fedd656b190f420be7ac7ec44c9b9..53d317fba1148d7c10093d443d4e0ea7a80dfbeb 100644 (file)
@@ -43,7 +43,7 @@ public abstract class AbstractReplicatedLogImpl implements ReplicatedLog {
         this.snapshotTerm = snapshotTerm;
         this.logContext = logContext;
 
         this.snapshotTerm = snapshotTerm;
         this.logContext = logContext;
 
-        this.journal = new ArrayList<>(unAppliedEntries.size());
+        journal = new ArrayList<>(unAppliedEntries.size());
         for (ReplicatedLogEntry entry: unAppliedEntries) {
             append(entry);
         }
         for (ReplicatedLogEntry entry: unAppliedEntries) {
             append(entry);
         }
@@ -168,7 +168,7 @@ public abstract class AbstractReplicatedLogImpl implements ReplicatedLog {
         long totalSize = 0;
         for (int i = fromIndex; i < toIndex; i++) {
             ReplicatedLogEntry entry = journal.get(i);
         long totalSize = 0;
         for (int i = fromIndex; i < toIndex; i++) {
             ReplicatedLogEntry entry = journal.get(i);
-            totalSize += entry.size();
+            totalSize += entry.serializedSize();
             if (totalSize <= maxDataSize) {
                 retList.add(entry);
             } else {
             if (totalSize <= maxDataSize) {
                 retList.add(entry);
             } else {
index 1f08523acd4d7920f0052df21db86a7947a3d161..360f6b690376c1c413b1a2f04bc8a7e946400758 100644 (file)
@@ -42,6 +42,15 @@ public interface ReplicatedLogEntry {
      */
     int size();
 
      */
     int size();
 
+    /**
+     * Return the estimate of serialized size of this entry when passed through serialization. The estimate needs to
+     * be reasonably accurate and should err on the side of caution and report a slightly-higher size in face of
+     * uncertainty.
+     *
+     * @return An estimate of serialized size.
+     */
+    int serializedSize();
+
     /**
      * Checks if persistence is pending for this entry.
      *
     /**
      * Checks if persistence is pending for this entry.
      *
index 9ba446479e97a5ce1b8d7d8a2a2330640f262fce..7fad368cc5c0a88bf92dd5d3b18d9b98904b7cdc 100644 (file)
@@ -783,7 +783,7 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior {
 
         // If the first entry's size exceeds the max data size threshold, it will be returned from the call above. If
         // that is the case, then we need to slice it into smaller chunks.
 
         // If the first entry's size exceeds the max data size threshold, it will be returned from the call above. If
         // that is the case, then we need to slice it into smaller chunks.
-        if (entries.size() != 1 || entries.get(0).getData().size() <= maxDataSize) {
+        if (entries.size() != 1 || entries.get(0).getData().serializedSize() <= maxDataSize) {
             // Don't need to slice.
             return entries;
         }
             // Don't need to slice.
             return entries;
         }
index 78a249749dfce31d7a8625aced8d957de526ea0b..bda1d1e22ca1762f71f5c0516e6c23cfa07929be 100644 (file)
@@ -26,6 +26,15 @@ public abstract class Payload implements Serializable {
      */
     public abstract int size();
 
      */
     public abstract int size();
 
+    /**
+     * Return the estimate of serialized size of this payload when passed through serialization. The estimate needs to
+     * be reasonably accurate and should err on the side of caution and report a slightly-higher size in face of
+     * uncertainty.
+     *
+     * @return An estimate of serialized size.
+     */
+    public abstract int serializedSize();
+
     /**
      * Return the serialization proxy for this object.
      *
     /**
      * Return the serialization proxy for this object.
      *
index 800ea45d15c087fe59b399df3444d7883d74edfc..e190bf0919828714047ddbab275858acf5bb6ad3 100644 (file)
@@ -9,6 +9,7 @@ package org.opendaylight.controller.cluster.raft.persisted;
 
 import akka.dispatch.ControlMessage;
 import java.io.Serializable;
 
 import akka.dispatch.ControlMessage;
 import java.io.Serializable;
+import org.apache.commons.lang3.SerializationUtils;
 import org.opendaylight.controller.cluster.raft.messages.Payload;
 
 /**
 import org.opendaylight.controller.cluster.raft.messages.Payload;
 
 /**
@@ -31,8 +32,11 @@ public final class NoopPayload extends Payload implements ControlMessage {
 
     private static final long serialVersionUID = 1L;
     private static final Proxy PROXY = new Proxy();
 
     private static final long serialVersionUID = 1L;
     private static final Proxy PROXY = new Proxy();
+    // Estimate to how big the proxy is. Note this includes object stream overhead, so it is a bit conservative
+    private static final int PROXY_SIZE = SerializationUtils.serialize(PROXY).length;
 
     private NoopPayload() {
 
     private NoopPayload() {
+        // Hidden on purpose
     }
 
     @Override
     }
 
     @Override
@@ -40,6 +44,11 @@ public final class NoopPayload extends Payload implements ControlMessage {
         return 0;
     }
 
         return 0;
     }
 
+    @Override
+    public int serializedSize() {
+        return PROXY_SIZE;
+    }
+
     @Override
     protected Object writeReplace() {
         return PROXY;
     @Override
     protected Object writeReplace() {
         return PROXY;
index eb668175d6e1e36f377cc103a5969b7b564b608c..68a5945b9f860652e7f5e27741f4e6700d95ddaf 100644 (file)
@@ -89,6 +89,11 @@ public final class ServerConfigurationPayload extends Payload implements Persist
 
     @Override
     public int size() {
 
     @Override
     public int size() {
+        return serializedSize();
+    }
+
+    @Override
+    public int serializedSize() {
         if (serializedSize < 0) {
             try (ByteArrayOutputStream bos = new ByteArrayOutputStream()) {
                 try (ObjectOutputStream out = new ObjectOutputStream(bos)) {
         if (serializedSize < 0) {
             try (ByteArrayOutputStream bos = new ByteArrayOutputStream()) {
                 try (ObjectOutputStream out = new ObjectOutputStream(bos)) {
index 10bbf35bc385b2dd862489811e8ece38690d3fbd..56a81f57f366bfc75874fc53e2d360019c994ecc 100644 (file)
@@ -14,6 +14,7 @@ import java.io.IOException;
 import java.io.ObjectInput;
 import java.io.ObjectOutput;
 import java.io.Serializable;
 import java.io.ObjectInput;
 import java.io.ObjectOutput;
 import java.io.Serializable;
+import org.apache.commons.lang3.SerializationUtils;
 import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
 import org.opendaylight.controller.cluster.raft.messages.Payload;
 
 import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
 import org.opendaylight.controller.cluster.raft.messages.Payload;
 
@@ -26,7 +27,9 @@ public final class SimpleReplicatedLogEntry implements ReplicatedLogEntry, Seria
     private static final class Proxy implements Externalizable {
         private static final long serialVersionUID = 1L;
 
     private static final class Proxy implements Externalizable {
         private static final long serialVersionUID = 1L;
 
-        private ReplicatedLogEntry replicatedLogEntry;
+        private long index;
+        private long term;
+        private Payload data;
 
         // checkstyle flags the public modifier as redundant which really doesn't make sense since it clearly isn't
         // redundant. It is explicitly needed for Java serialization to be able to create instances via reflection.
 
         // checkstyle flags the public modifier as redundant which really doesn't make sense since it clearly isn't
         // redundant. It is explicitly needed for Java serialization to be able to create instances via reflection.
@@ -35,33 +38,34 @@ public final class SimpleReplicatedLogEntry implements ReplicatedLogEntry, Seria
             // For Externalizable
         }
 
             // For Externalizable
         }
 
-        Proxy(final ReplicatedLogEntry replicatedLogEntry) {
-            this.replicatedLogEntry = replicatedLogEntry;
-        }
-
-        static int estimatedSerializedSize(final ReplicatedLogEntry replicatedLogEntry) {
-            return 8 /* index */ + 8 /* term */ + replicatedLogEntry.getData().size()
-                    + 400 /* estimated extra padding for class info */;
+        Proxy(final SimpleReplicatedLogEntry replicatedLogEntry) {
+            index = replicatedLogEntry.getIndex();
+            term = replicatedLogEntry.getTerm();
+            data = replicatedLogEntry.getData();
         }
 
         @Override
         public void writeExternal(final ObjectOutput out) throws IOException {
         }
 
         @Override
         public void writeExternal(final ObjectOutput out) throws IOException {
-            out.writeLong(replicatedLogEntry.getIndex());
-            out.writeLong(replicatedLogEntry.getTerm());
-            out.writeObject(replicatedLogEntry.getData());
+            out.writeLong(index);
+            out.writeLong(term);
+            out.writeObject(data);
         }
 
         @Override
         public void readExternal(final ObjectInput in) throws IOException, ClassNotFoundException {
         }
 
         @Override
         public void readExternal(final ObjectInput in) throws IOException, ClassNotFoundException {
-            replicatedLogEntry = new SimpleReplicatedLogEntry(in.readLong(), in.readLong(), (Payload) in.readObject());
+            index = in.readLong();
+            term = in.readLong();
+            data = (Payload) in.readObject();
         }
 
         private Object readResolve() {
         }
 
         private Object readResolve() {
-            return replicatedLogEntry;
+            return new SimpleReplicatedLogEntry(index, term, data);
         }
     }
 
     private static final long serialVersionUID = 1L;
         }
     }
 
     private static final long serialVersionUID = 1L;
+    // Estimate to how big the proxy is. Note this includes object stream overhead, so it is a bit conservative
+    private static final int PROXY_SIZE = SerializationUtils.serialize(new Proxy()).length;
 
     private final long index;
     private final long term;
 
     private final long index;
     private final long term;
@@ -98,7 +102,12 @@ public final class SimpleReplicatedLogEntry implements ReplicatedLogEntry, Seria
 
     @Override
     public int size() {
 
     @Override
     public int size() {
-        return getData().size();
+        return payload.size();
+    }
+
+    @Override
+    public int serializedSize() {
+        return PROXY_SIZE + payload.serializedSize();
     }
 
     @Override
     }
 
     @Override
@@ -115,10 +124,6 @@ public final class SimpleReplicatedLogEntry implements ReplicatedLogEntry, Seria
         return new Proxy(this);
     }
 
         return new Proxy(this);
     }
 
-    public int estimatedSerializedSize() {
-        return Proxy.estimatedSerializedSize(this);
-    }
-
     @Override
     public int hashCode() {
         final int prime = 31;
     @Override
     public int hashCode() {
         final int prime = 31;
index ca6e6dff30156b3e571281c1c85f89ac619a390c..f1c2fea4a6013cc41f1dde5008fff04170d4711a 100644 (file)
@@ -49,7 +49,7 @@ public class SimpleReplicatedLogEntrySerializer extends JSerializer {
         checkArgument(obj instanceof SimpleReplicatedLogEntry, "Unsupported object type %s", obj.getClass());
 
         SimpleReplicatedLogEntry replicatedLogEntry = (SimpleReplicatedLogEntry)obj;
         checkArgument(obj instanceof SimpleReplicatedLogEntry, "Unsupported object type %s", obj.getClass());
 
         SimpleReplicatedLogEntry replicatedLogEntry = (SimpleReplicatedLogEntry)obj;
-        final int estimatedSerializedSize = replicatedLogEntry.estimatedSerializedSize();
+        final int estimatedSerializedSize = replicatedLogEntry.serializedSize();
 
         final ByteArrayOutputStream bos = new ByteArrayOutputStream(estimatedSerializedSize);
         SerializationUtils.serialize(replicatedLogEntry, bos);
 
         final ByteArrayOutputStream bos = new ByteArrayOutputStream(estimatedSerializedSize);
         SerializationUtils.serialize(replicatedLogEntry, bos);
index 1e4859254fe091d0196e2ae75dd2325715593e84..0e8222dbb881cf95fe8c50da4d941fe5cea9bc81 100644 (file)
@@ -126,7 +126,7 @@ public abstract class AbstractRaftActorIntegrationTest extends AbstractActorTest
 
         TestRaftActor(final Builder builder) {
             super(builder);
 
         TestRaftActor(final Builder builder) {
             super(builder);
-            this.collectorActor = builder.collectorActor;
+            collectorActor = builder.collectorActor;
         }
 
         public void startDropMessages(final Class<?> msgClass) {
         }
 
         public void startDropMessages(final Class<?> msgClass) {
@@ -149,7 +149,7 @@ public abstract class AbstractRaftActorIntegrationTest extends AbstractActorTest
         @Override
         public void handleCommand(final Object message) {
             if (message instanceof MockPayload) {
         @Override
         public void handleCommand(final Object message) {
             if (message instanceof MockPayload) {
-                MockPayload payload = (MockPayload) message;
+                final MockPayload payload = (MockPayload) message;
                 super.persistData(collectorActor, new MockIdentifier(payload.toString()), payload, false);
                 return;
             }
                 super.persistData(collectorActor, new MockIdentifier(payload.toString()), payload, false);
                 return;
             }
@@ -214,13 +214,14 @@ public abstract class AbstractRaftActorIntegrationTest extends AbstractActorTest
             }
 
             public Builder collectorActor(final ActorRef newCollectorActor) {
             }
 
             public Builder collectorActor(final ActorRef newCollectorActor) {
-                this.collectorActor = newCollectorActor;
+                collectorActor = newCollectorActor;
                 return this;
             }
         }
     }
 
                 return this;
             }
         }
     }
 
-    protected static final int SNAPSHOT_CHUNK_SIZE = 100;
+    // FIXME: this is an arbitrary limit. Document interactions and/or improve them to improve maintainability
+    protected static final int SNAPSHOT_CHUNK_SIZE = 700;
 
     protected final Logger testLog = LoggerFactory.getLogger(getClass());
 
 
     protected final Logger testLog = LoggerFactory.getLogger(getClass());
 
index 983b26da9c15353eb6a440418c938a3c3413fb18..65ac83d0d00c17d6c8a7e47136a2e95772a754bb 100644 (file)
@@ -145,25 +145,32 @@ public class AbstractReplicatedLogImplTest {
         from = replicatedLogImpl.getFrom(0, 20, ReplicatedLog.NO_MAX_SIZE);
         assertEquals(4, from.size());
         assertEquals("A", from.get(0).getData().toString());
         from = replicatedLogImpl.getFrom(0, 20, ReplicatedLog.NO_MAX_SIZE);
         assertEquals(4, from.size());
         assertEquals("A", from.get(0).getData().toString());
+        assertEquals("B", from.get(1).getData().toString());
+        assertEquals("C", from.get(2).getData().toString());
         assertEquals("D", from.get(3).getData().toString());
 
         assertEquals("D", from.get(3).getData().toString());
 
+        // Pre-calculate sizing information for use with capping
+        final int sizeB = from.get(1).serializedSize();
+        final int sizeC = from.get(2).serializedSize();
+        final int sizeD = from.get(3).serializedSize();
+
         from = replicatedLogImpl.getFrom(1, 2, ReplicatedLog.NO_MAX_SIZE);
         assertEquals(2, from.size());
         assertEquals("B", from.get(0).getData().toString());
         assertEquals("C", from.get(1).getData().toString());
 
         from = replicatedLogImpl.getFrom(1, 2, ReplicatedLog.NO_MAX_SIZE);
         assertEquals(2, from.size());
         assertEquals("B", from.get(0).getData().toString());
         assertEquals("C", from.get(1).getData().toString());
 
-        from = replicatedLogImpl.getFrom(1, 3, 2);
+        from = replicatedLogImpl.getFrom(1, 3, sizeB + sizeC);
         assertEquals(2, from.size());
         assertEquals("B", from.get(0).getData().toString());
         assertEquals("C", from.get(1).getData().toString());
 
         assertEquals(2, from.size());
         assertEquals("B", from.get(0).getData().toString());
         assertEquals("C", from.get(1).getData().toString());
 
-        from = replicatedLogImpl.getFrom(1, 3, 3);
+        from = replicatedLogImpl.getFrom(1, 3, sizeB + sizeC + sizeD);
         assertEquals(3, from.size());
         assertEquals("B", from.get(0).getData().toString());
         assertEquals("C", from.get(1).getData().toString());
         assertEquals("D", from.get(2).getData().toString());
 
         assertEquals(3, from.size());
         assertEquals("B", from.get(0).getData().toString());
         assertEquals("C", from.get(1).getData().toString());
         assertEquals("D", from.get(2).getData().toString());
 
-        from = replicatedLogImpl.getFrom(1, 2, 3);
+        from = replicatedLogImpl.getFrom(1, 2, sizeB + sizeC + sizeD);
         assertEquals(2, from.size());
         assertEquals("B", from.get(0).getData().toString());
         assertEquals("C", from.get(1).getData().toString());
         assertEquals(2, from.size());
         assertEquals("B", from.get(0).getData().toString());
         assertEquals("C", from.get(1).getData().toString());
index c1127f1bcb32b1d8a98bccb7e7afb8af6ea7326d..38cef1ed59cbb7db57a18784e368c2a94b8a649b 100644 (file)
@@ -227,6 +227,11 @@ public class MockRaftActorContext extends RaftActorContextImpl {
             return size;
         }
 
             return size;
         }
 
+        @Override
+        public int serializedSize() {
+            return size;
+        }
+
         @Override
         public String toString() {
             return data;
         @Override
         public String toString() {
             return data;
index 33247d3f0cf6f70da582a354e81ae6f931edf43b..9dedc67e68776215bf58acad376c07e35ab83137 100644 (file)
@@ -105,6 +105,11 @@ public class RaftActorDelegatingPersistentDataProviderTest {
             return 0;
         }
 
             return 0;
         }
 
+        @Override
+        public int serializedSize() {
+            return 0;
+        }
+
         @Override
         protected Object writeReplace() {
             // Not needed
         @Override
         protected Object writeReplace() {
             // Not needed
index d6a53a0aeeb6e84b86d457b3ca667dbaee111101..00147a3c0e0588e19314cfcfe0be0c3fd931e62e 100644 (file)
@@ -452,7 +452,7 @@ public class ReplicationAndSnapshotsWithLaggingFollowerIntegrationTest extends A
 
         setupFollower2();
 
 
         setupFollower2();
 
-        MessageCollectorActor.expectMatching(follower2CollectorActor, InstallSnapshot.class, 5);
+        MessageCollectorActor.expectMatching(follower2CollectorActor, InstallSnapshot.class, 1);
 
         follower2Actor.stop();
 
 
         follower2Actor.stop();
 
@@ -613,8 +613,8 @@ public class ReplicationAndSnapshotsWithLaggingFollowerIntegrationTest extends A
     /**
      * Resume the lagging follower 2 and verify it receives an install snapshot from the leader.
      */
     /**
      * Resume the lagging follower 2 and verify it receives an install snapshot from the leader.
      */
-    private void verifyInstallSnapshotToLaggingFollower(long lastAppliedIndex,
-            @Nullable ServerConfigurationPayload expServerConfig) {
+    private void verifyInstallSnapshotToLaggingFollower(final long lastAppliedIndex,
+            final @Nullable ServerConfigurationPayload expServerConfig) {
         testLog.info("verifyInstallSnapshotToLaggingFollower starting");
 
         MessageCollectorActor.clearMessages(leaderCollectorActor);
         testLog.info("verifyInstallSnapshotToLaggingFollower starting");
 
         MessageCollectorActor.clearMessages(leaderCollectorActor);
@@ -811,8 +811,8 @@ public class ReplicationAndSnapshotsWithLaggingFollowerIntegrationTest extends A
     /**
      * Kill the leader actor, reinstate it and verify the recovered journal.
      */
     /**
      * Kill the leader actor, reinstate it and verify the recovered journal.
      */
-    private void verifyLeaderRecoveryAfterReinstatement(long lastIndex, long snapshotIndex,
-            long firstJournalEntryIndex) {
+    private void verifyLeaderRecoveryAfterReinstatement(final long lastIndex, final long snapshotIndex,
+            final long firstJournalEntryIndex) {
         testLog.info("verifyLeaderRecoveryAfterReinstatement starting: lastIndex: {}, snapshotIndex: {}, "
             + "firstJournalEntryIndex: {}", lastIndex, snapshotIndex, firstJournalEntryIndex);
 
         testLog.info("verifyLeaderRecoveryAfterReinstatement starting: lastIndex: {}, snapshotIndex: {}, "
             + "firstJournalEntryIndex: {}", lastIndex, snapshotIndex, firstJournalEntryIndex);
 
@@ -845,7 +845,7 @@ public class ReplicationAndSnapshotsWithLaggingFollowerIntegrationTest extends A
         testLog.info("verifyLeaderRecoveryAfterReinstatement ending");
     }
 
         testLog.info("verifyLeaderRecoveryAfterReinstatement ending");
     }
 
-    private void sendInitialPayloadsReplicatedToAllFollowers(String... data) {
+    private void sendInitialPayloadsReplicatedToAllFollowers(final String... data) {
 
         // Send the payloads.
         for (String d: data) {
 
         // Send the payloads.
         for (String d: data) {
index e10f0489e36ff2b1b6024abda0c54ad47127a85b..f847dbce9e860159bbd131a3dc88f39f97486ed1 100644 (file)
@@ -1793,7 +1793,8 @@ public class LeaderTest extends AbstractLeaderTest<Leader> {
         MockRaftActorContext leaderActorContext = createActorContextWithFollower();
         ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
                 new FiniteDuration(1000, TimeUnit.SECONDS));
         MockRaftActorContext leaderActorContext = createActorContextWithFollower();
         ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
                 new FiniteDuration(1000, TimeUnit.SECONDS));
-        ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setSnapshotChunkSize(2);
+        // Note: the size here depends on estimate
+        ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setSnapshotChunkSize(246);
 
         leaderActorContext.setReplicatedLog(
                 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 4, 1).build());
 
         leaderActorContext.setReplicatedLog(
                 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 4, 1).build());
@@ -2462,7 +2463,7 @@ public class LeaderTest extends AbstractLeaderTest<Leader> {
         assertEquals("New votedFor", null, actorContext.getTermInformation().getVotedFor());
     }
 
         assertEquals("New votedFor", null, actorContext.getTermInformation().getVotedFor());
     }
 
-    private class MockConfigParamsImpl extends DefaultConfigParamsImpl {
+    private static class MockConfigParamsImpl extends DefaultConfigParamsImpl {
 
         private final long electionTimeOutIntervalMillis;
         private final int snapshotChunkSize;
 
         private final long electionTimeOutIntervalMillis;
         private final int snapshotChunkSize;
index 0e34756cede02dfef04496f4f2e1f4066391b220..594dd957ca366357a15341dfa2ac30bf7fb3771a 100644 (file)
@@ -49,6 +49,7 @@ public final class AbortTransactionPayload extends AbstractIdentifiablePayload<T
 
     private static final Logger LOG = LoggerFactory.getLogger(AbortTransactionPayload.class);
     private static final long serialVersionUID = 1L;
 
     private static final Logger LOG = LoggerFactory.getLogger(AbortTransactionPayload.class);
     private static final long serialVersionUID = 1L;
+    private static final int PROXY_SIZE = externalizableProxySize(Proxy::new);
 
     AbortTransactionPayload(final TransactionIdentifier transactionId, final byte[] serialized) {
         super(transactionId, serialized);
 
     AbortTransactionPayload(final TransactionIdentifier transactionId, final byte[] serialized) {
         super(transactionId, serialized);
@@ -71,4 +72,9 @@ public final class AbortTransactionPayload extends AbstractIdentifiablePayload<T
     protected Proxy externalizableProxy(final byte[] serialized) {
         return new Proxy(serialized);
     }
     protected Proxy externalizableProxy(final byte[] serialized) {
         return new Proxy(serialized);
     }
+
+    @Override
+    protected int externalizableProxySize() {
+        return PROXY_SIZE;
+    }
 }
 }
index d081a13421e02ba8157d7f340b53fe87534a3d3a..f07d4dbe903d10244f9a0ec4e764b82864e5fb7e 100644 (file)
@@ -18,6 +18,8 @@ import java.io.IOException;
 import java.io.ObjectInput;
 import java.io.ObjectOutput;
 import java.io.Serializable;
 import java.io.ObjectInput;
 import java.io.ObjectOutput;
 import java.io.Serializable;
+import java.util.function.Function;
+import org.apache.commons.lang3.SerializationUtils;
 import org.eclipse.jdt.annotation.NonNull;
 import org.opendaylight.controller.cluster.raft.messages.IdentifiablePayload;
 import org.opendaylight.yangtools.concepts.Identifiable;
 import org.eclipse.jdt.annotation.NonNull;
 import org.opendaylight.controller.cluster.raft.messages.IdentifiablePayload;
 import org.opendaylight.yangtools.concepts.Identifiable;
@@ -88,6 +90,15 @@ public abstract class AbstractIdentifiablePayload<T extends Identifier> extends
         return serialized.length;
     }
 
         return serialized.length;
     }
 
+    @Override
+    public final int serializedSize() {
+        // TODO: this is not entirely accurate, as the serialization stream has additional overheads:
+        //       - 3 bytes for each block of data <256 bytes
+        //       - 5 bytes for each block of data >=256 bytes
+        //       - each block of data is limited to 1024 bytes as per serialization spec
+        return size() + externalizableProxySize();
+    }
+
     @Override
     public final String toString() {
         return MoreObjects.toStringHelper(this).add("identifier", identifier).add("size", size()).toString();
     @Override
     public final String toString() {
         return MoreObjects.toStringHelper(this).add("identifier", identifier).add("size", size()).toString();
@@ -100,4 +111,10 @@ public abstract class AbstractIdentifiablePayload<T extends Identifier> extends
 
     @SuppressWarnings("checkstyle:hiddenField")
     protected abstract @NonNull AbstractProxy<T> externalizableProxy(byte @NonNull[] serialized);
 
     @SuppressWarnings("checkstyle:hiddenField")
     protected abstract @NonNull AbstractProxy<T> externalizableProxy(byte @NonNull[] serialized);
+
+    protected abstract int externalizableProxySize();
+
+    protected static final int externalizableProxySize(final Function<byte[], ? extends AbstractProxy<?>> constructor) {
+        return SerializationUtils.serialize(constructor.apply(new byte[0])).length;
+    }
 }
 }
index 9acc113a422b9b962e122a5ad11c94c43c69be3b..92ce6c16215885177ec83bf14ca824be31be75b9 100644 (file)
@@ -49,6 +49,7 @@ public final class CloseLocalHistoryPayload extends AbstractIdentifiablePayload<
 
     private static final Logger LOG = LoggerFactory.getLogger(CloseLocalHistoryPayload.class);
     private static final long serialVersionUID = 1L;
 
     private static final Logger LOG = LoggerFactory.getLogger(CloseLocalHistoryPayload.class);
     private static final long serialVersionUID = 1L;
+    private static final int PROXY_SIZE = externalizableProxySize(Proxy::new);
 
     CloseLocalHistoryPayload(final LocalHistoryIdentifier historyId, final byte[] serialized) {
         super(historyId, serialized);
 
     CloseLocalHistoryPayload(final LocalHistoryIdentifier historyId, final byte[] serialized) {
         super(historyId, serialized);
@@ -71,4 +72,9 @@ public final class CloseLocalHistoryPayload extends AbstractIdentifiablePayload<
     protected Proxy externalizableProxy(final byte[] serialized) {
         return new Proxy(serialized);
     }
     protected Proxy externalizableProxy(final byte[] serialized) {
         return new Proxy(serialized);
     }
+
+    @Override
+    protected int externalizableProxySize() {
+        return PROXY_SIZE;
+    }
 }
 }
index accb9ef09708cb49385a3434f3eb50b5f1e203b5..d4d08a7292fa531f9da60b7861ad29c796f1640c 100644 (file)
@@ -27,6 +27,7 @@ import java.io.Serializable;
 import java.io.StreamCorruptedException;
 import java.util.AbstractMap.SimpleImmutableEntry;
 import java.util.Map.Entry;
 import java.io.StreamCorruptedException;
 import java.util.AbstractMap.SimpleImmutableEntry;
 import java.util.Map.Entry;
+import org.apache.commons.lang3.SerializationUtils;
 import org.eclipse.jdt.annotation.NonNull;
 import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
 import org.opendaylight.controller.cluster.datastore.persisted.DataTreeCandidateInputOutput.DataTreeCandidateWithVersion;
 import org.eclipse.jdt.annotation.NonNull;
 import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
 import org.opendaylight.controller.cluster.datastore.persisted.DataTreeCandidateInputOutput.DataTreeCandidateWithVersion;
@@ -116,6 +117,12 @@ public abstract class CommitTransactionPayload extends IdentifiablePayload<Trans
         }
     }
 
         }
     }
 
+    @Override
+    public final int serializedSize() {
+        // TODO: this is not entirely accurate as the the byte[] can be chunked by the serialization stream
+        return ProxySizeHolder.PROXY_SIZE + size();
+    }
+
     /**
      * The cached candidate needs to be cleared after it is done applying to the DataTree, otherwise it would be keeping
      * deserialized in memory which are not needed anymore leading to wasted memory. This lets the payload know that
     /**
      * The cached candidate needs to be cleared after it is done applying to the DataTree, otherwise it would be keeping
      * deserialized in memory which are not needed anymore leading to wasted memory. This lets the payload know that
@@ -197,6 +204,15 @@ public abstract class CommitTransactionPayload extends IdentifiablePayload<Trans
         }
     }
 
         }
     }
 
+    // Exists to break initialization dependency between CommitTransactionPayload/Simple/Proxy
+    private static final class ProxySizeHolder {
+        static final int PROXY_SIZE = SerializationUtils.serialize(new Proxy(new Simple(new byte[0]))).length;
+
+        private ProxySizeHolder() {
+            // Hidden on purpose
+        }
+    }
+
     private static final class Proxy implements Externalizable {
         private static final long serialVersionUID = 1L;
 
     private static final class Proxy implements Externalizable {
         private static final long serialVersionUID = 1L;
 
index dbf72f38d8de0f0016a4651c3ef46fcebe5e4760..91329b2074b5f9a1b9b43e5662d6f3c062c88b86 100644 (file)
@@ -49,6 +49,7 @@ public final class CreateLocalHistoryPayload extends AbstractIdentifiablePayload
 
     private static final Logger LOG = LoggerFactory.getLogger(CreateLocalHistoryPayload.class);
     private static final long serialVersionUID = 1L;
 
     private static final Logger LOG = LoggerFactory.getLogger(CreateLocalHistoryPayload.class);
     private static final long serialVersionUID = 1L;
+    private static final int PROXY_SIZE = externalizableProxySize(Proxy::new);
 
     CreateLocalHistoryPayload(final LocalHistoryIdentifier historyId, final byte[] serialized) {
         super(historyId, serialized);
 
     CreateLocalHistoryPayload(final LocalHistoryIdentifier historyId, final byte[] serialized) {
         super(historyId, serialized);
@@ -71,4 +72,9 @@ public final class CreateLocalHistoryPayload extends AbstractIdentifiablePayload
     protected Proxy externalizableProxy(final byte[] serialized) {
         return new Proxy(serialized);
     }
     protected Proxy externalizableProxy(final byte[] serialized) {
         return new Proxy(serialized);
     }
+
+    @Override
+    protected int externalizableProxySize() {
+        return PROXY_SIZE;
+    }
 }
 }
index 29dd0725245e761a2551a2cf00bab41b6c6e35f8..6314b174edea3a88cbbd2cccfd7e93183f3c6a54 100644 (file)
@@ -40,6 +40,7 @@ public final class DisableTrackingPayload extends AbstractIdentifiablePayload<Cl
 
     private static final Logger LOG = LoggerFactory.getLogger(DisableTrackingPayload.class);
     private static final long serialVersionUID = 1L;
 
     private static final Logger LOG = LoggerFactory.getLogger(DisableTrackingPayload.class);
     private static final long serialVersionUID = 1L;
+    private static final int PROXY_SIZE = externalizableProxySize(Proxy::new);
 
     DisableTrackingPayload(final ClientIdentifier clientId, final byte[] serialized) {
         super(clientId, serialized);
 
     DisableTrackingPayload(final ClientIdentifier clientId, final byte[] serialized) {
         super(clientId, serialized);
@@ -62,4 +63,9 @@ public final class DisableTrackingPayload extends AbstractIdentifiablePayload<Cl
     protected Proxy externalizableProxy(final byte[] serialized) {
         return new Proxy(serialized);
     }
     protected Proxy externalizableProxy(final byte[] serialized) {
         return new Proxy(serialized);
     }
+
+    @Override
+    protected int externalizableProxySize() {
+        return PROXY_SIZE;
+    }
 }
 }
index 8d9a8d217a67e201dda8c8f9ba9f705ac939557c..eaf7a15441960734ceabcb5c7752feb6d2abd8af 100644 (file)
@@ -50,6 +50,7 @@ public final class PurgeLocalHistoryPayload extends AbstractIdentifiablePayload<
 
     private static final Logger LOG = LoggerFactory.getLogger(PurgeLocalHistoryPayload.class);
     private static final long serialVersionUID = 1L;
 
     private static final Logger LOG = LoggerFactory.getLogger(PurgeLocalHistoryPayload.class);
     private static final long serialVersionUID = 1L;
+    private static final int PROXY_SIZE = externalizableProxySize(Proxy::new);
 
     PurgeLocalHistoryPayload(final LocalHistoryIdentifier historyId, final byte[] serialized) {
         super(historyId, serialized);
 
     PurgeLocalHistoryPayload(final LocalHistoryIdentifier historyId, final byte[] serialized) {
         super(historyId, serialized);
@@ -72,4 +73,9 @@ public final class PurgeLocalHistoryPayload extends AbstractIdentifiablePayload<
     protected Proxy externalizableProxy(final byte[] serialized) {
         return new Proxy(serialized);
     }
     protected Proxy externalizableProxy(final byte[] serialized) {
         return new Proxy(serialized);
     }
+
+    @Override
+    protected int externalizableProxySize() {
+        return PROXY_SIZE;
+    }
 }
 }
index ac849723e168b1b439c1a90dd4f4b9f87504a07d..c5d87be96593c3666ef0a37e311441df3ffea615 100644 (file)
@@ -49,6 +49,7 @@ public final class PurgeTransactionPayload extends AbstractIdentifiablePayload<T
 
     private static final Logger LOG = LoggerFactory.getLogger(PurgeTransactionPayload.class);
     private static final long serialVersionUID = 1L;
 
     private static final Logger LOG = LoggerFactory.getLogger(PurgeTransactionPayload.class);
     private static final long serialVersionUID = 1L;
+    private static final int PROXY_SIZE = externalizableProxySize(Proxy::new);
 
     PurgeTransactionPayload(final TransactionIdentifier transactionId, final byte[] serialized) {
         super(transactionId, serialized);
 
     PurgeTransactionPayload(final TransactionIdentifier transactionId, final byte[] serialized) {
         super(transactionId, serialized);
@@ -71,4 +72,9 @@ public final class PurgeTransactionPayload extends AbstractIdentifiablePayload<T
     protected Proxy externalizableProxy(final byte[] serialized) {
         return new Proxy(serialized);
     }
     protected Proxy externalizableProxy(final byte[] serialized) {
         return new Proxy(serialized);
     }
+
+    @Override
+    protected int externalizableProxySize() {
+        return PROXY_SIZE;
+    }
 }
 }
index ec6e227a75a91bf672fb081a796c381b14754b48..f3ebeb843331ede2d9ca5e15788db45687faf6eb 100644 (file)
@@ -58,6 +58,7 @@ public final class SkipTransactionsPayload extends AbstractIdentifiablePayload<L
 
     private static final Logger LOG = LoggerFactory.getLogger(SkipTransactionsPayload.class);
     private static final long serialVersionUID = 1L;
 
     private static final Logger LOG = LoggerFactory.getLogger(SkipTransactionsPayload.class);
     private static final long serialVersionUID = 1L;
+    private static final int PROXY_SIZE = externalizableProxySize(Proxy::new);
 
     @SuppressFBWarnings(value = "SE_BAD_FIELD", justification = "Handled via externalizable proxy")
     private final @NonNull ImmutableUnsignedLongSet transactionIds;
 
     @SuppressFBWarnings(value = "SE_BAD_FIELD", justification = "Handled via externalizable proxy")
     private final @NonNull ImmutableUnsignedLongSet transactionIds;
@@ -91,4 +92,9 @@ public final class SkipTransactionsPayload extends AbstractIdentifiablePayload<L
     protected Proxy externalizableProxy(final byte[] serialized) {
         return new Proxy(serialized);
     }
     protected Proxy externalizableProxy(final byte[] serialized) {
         return new Proxy(serialized);
     }
+
+    @Override
+    protected int externalizableProxySize() {
+        return PROXY_SIZE;
+    }
 }
 }