Add Payload.serializedSize() 11/101211/9
authorRobert Varga <robert.varga@pantheon.tech>
Wed, 18 May 2022 07:15:22 +0000 (09:15 +0200)
committerRobert Varga <robert.varga@pantheon.tech>
Thu, 26 May 2022 18:08:35 +0000 (20:08 +0200)
There is a rather big difference in payload sizing when we consider
how big an entry is: in-memory size can easily be "zero", which can
translate to a serialized size of hundreds of bytes. This difference
is problematic, as we use the former to estimate how many payloads
we can squeeze in AppendEntries and we compare that to the configured
payload limit. Even when there is some (32KiB by default) cushion, we
can end up blowing past the frame size.

Add Payload.serializedSize(), which should provide a semi-conservative
estimate of serialized size and use that to select the cut-off.

Also improve SimpleReplicatedLogEntry's estimates by performing a a
quick serialization operation -- which reduces potential waste for each
entry by 294 bytes, as our hard-coded estimate of 400 bytes was way too
conservative.

JIRA: CONTROLLER-2037
Change-Id: I5abe7d00db9e10f1c66e6db0f7c82854f9aa352d
Signed-off-by: Robert Varga <robert.varga@pantheon.tech>
24 files changed:
opendaylight/md-sal/sal-akka-raft-example/src/main/java/org/opendaylight/controller/cluster/example/messages/KeyValue.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/AbstractReplicatedLogImpl.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/ReplicatedLogEntry.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractLeader.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/messages/Payload.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/persisted/NoopPayload.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/persisted/ServerConfigurationPayload.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/persisted/SimpleReplicatedLogEntry.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/persisted/SimpleReplicatedLogEntrySerializer.java
opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/AbstractRaftActorIntegrationTest.java
opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/AbstractReplicatedLogImplTest.java
opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/MockRaftActorContext.java
opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/RaftActorDelegatingPersistentDataProviderTest.java
opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/ReplicationAndSnapshotsWithLaggingFollowerIntegrationTest.java
opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/LeaderTest.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/persisted/AbortTransactionPayload.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/persisted/AbstractIdentifiablePayload.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/persisted/CloseLocalHistoryPayload.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/persisted/CommitTransactionPayload.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/persisted/CreateLocalHistoryPayload.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/persisted/DisableTrackingPayload.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/persisted/PurgeLocalHistoryPayload.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/persisted/PurgeTransactionPayload.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/persisted/SkipTransactionsPayload.java

index 93121834fef254cc7b8ad8dc64e3c1b00417eb09..78eea5cd862a8f26dd7d1b73df156266f58fe8e3 100644 (file)
@@ -18,7 +18,7 @@ public final class KeyValue extends Payload {
     public KeyValue() {
     }
 
-    public KeyValue(String key, String value) {
+    public KeyValue(final String key, final String value) {
         this.key = key;
         this.value = value;
     }
@@ -32,13 +32,19 @@ public final class KeyValue extends Payload {
     }
 
     @Override
-    public String toString() {
-        return "KeyValue{" + "key='" + key + '\'' + ", value='" + value + '\'' + '}';
+    public int size() {
+        return value.length() + key.length();
     }
 
     @Override
-    public int size() {
-        return value.length() + key.length();
+    public int serializedSize() {
+        // Should be a better estimate
+        return size();
+    }
+
+    @Override
+    public String toString() {
+        return "KeyValue{" + "key='" + key + '\'' + ", value='" + value + '\'' + '}';
     }
 
     @Override
index 64506ee6867fedd656b190f420be7ac7ec44c9b9..53d317fba1148d7c10093d443d4e0ea7a80dfbeb 100644 (file)
@@ -43,7 +43,7 @@ public abstract class AbstractReplicatedLogImpl implements ReplicatedLog {
         this.snapshotTerm = snapshotTerm;
         this.logContext = logContext;
 
-        this.journal = new ArrayList<>(unAppliedEntries.size());
+        journal = new ArrayList<>(unAppliedEntries.size());
         for (ReplicatedLogEntry entry: unAppliedEntries) {
             append(entry);
         }
@@ -168,7 +168,7 @@ public abstract class AbstractReplicatedLogImpl implements ReplicatedLog {
         long totalSize = 0;
         for (int i = fromIndex; i < toIndex; i++) {
             ReplicatedLogEntry entry = journal.get(i);
-            totalSize += entry.size();
+            totalSize += entry.serializedSize();
             if (totalSize <= maxDataSize) {
                 retList.add(entry);
             } else {
index 1f08523acd4d7920f0052df21db86a7947a3d161..360f6b690376c1c413b1a2f04bc8a7e946400758 100644 (file)
@@ -42,6 +42,15 @@ public interface ReplicatedLogEntry {
      */
     int size();
 
+    /**
+     * Return the estimate of serialized size of this entry when passed through serialization. The estimate needs to
+     * be reasonably accurate and should err on the side of caution and report a slightly-higher size in face of
+     * uncertainty.
+     *
+     * @return An estimate of serialized size.
+     */
+    int serializedSize();
+
     /**
      * Checks if persistence is pending for this entry.
      *
index 71b21799e44ed3c55b2ca1a3cda1de194a60aaf2..0188a6df1ac387e603962d838def314a7dab1c8d 100644 (file)
@@ -781,7 +781,7 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior {
 
         // If the first entry's size exceeds the max data size threshold, it will be returned from the call above. If
         // that is the case, then we need to slice it into smaller chunks.
-        if (entries.size() != 1 || entries.get(0).getData().size() <= maxDataSize) {
+        if (entries.size() != 1 || entries.get(0).getData().serializedSize() <= maxDataSize) {
             // Don't need to slice.
             return entries;
         }
index 78a249749dfce31d7a8625aced8d957de526ea0b..bda1d1e22ca1762f71f5c0516e6c23cfa07929be 100644 (file)
@@ -26,6 +26,15 @@ public abstract class Payload implements Serializable {
      */
     public abstract int size();
 
+    /**
+     * Return the estimate of serialized size of this payload when passed through serialization. The estimate needs to
+     * be reasonably accurate and should err on the side of caution and report a slightly-higher size in face of
+     * uncertainty.
+     *
+     * @return An estimate of serialized size.
+     */
+    public abstract int serializedSize();
+
     /**
      * Return the serialization proxy for this object.
      *
index 800ea45d15c087fe59b399df3444d7883d74edfc..e190bf0919828714047ddbab275858acf5bb6ad3 100644 (file)
@@ -9,6 +9,7 @@ package org.opendaylight.controller.cluster.raft.persisted;
 
 import akka.dispatch.ControlMessage;
 import java.io.Serializable;
+import org.apache.commons.lang3.SerializationUtils;
 import org.opendaylight.controller.cluster.raft.messages.Payload;
 
 /**
@@ -31,8 +32,11 @@ public final class NoopPayload extends Payload implements ControlMessage {
 
     private static final long serialVersionUID = 1L;
     private static final Proxy PROXY = new Proxy();
+    // Estimate to how big the proxy is. Note this includes object stream overhead, so it is a bit conservative
+    private static final int PROXY_SIZE = SerializationUtils.serialize(PROXY).length;
 
     private NoopPayload() {
+        // Hidden on purpose
     }
 
     @Override
@@ -40,6 +44,11 @@ public final class NoopPayload extends Payload implements ControlMessage {
         return 0;
     }
 
+    @Override
+    public int serializedSize() {
+        return PROXY_SIZE;
+    }
+
     @Override
     protected Object writeReplace() {
         return PROXY;
index eb668175d6e1e36f377cc103a5969b7b564b608c..68a5945b9f860652e7f5e27741f4e6700d95ddaf 100644 (file)
@@ -89,6 +89,11 @@ public final class ServerConfigurationPayload extends Payload implements Persist
 
     @Override
     public int size() {
+        return serializedSize();
+    }
+
+    @Override
+    public int serializedSize() {
         if (serializedSize < 0) {
             try (ByteArrayOutputStream bos = new ByteArrayOutputStream()) {
                 try (ObjectOutputStream out = new ObjectOutputStream(bos)) {
index 10bbf35bc385b2dd862489811e8ece38690d3fbd..56a81f57f366bfc75874fc53e2d360019c994ecc 100644 (file)
@@ -14,6 +14,7 @@ import java.io.IOException;
 import java.io.ObjectInput;
 import java.io.ObjectOutput;
 import java.io.Serializable;
+import org.apache.commons.lang3.SerializationUtils;
 import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
 import org.opendaylight.controller.cluster.raft.messages.Payload;
 
@@ -26,7 +27,9 @@ public final class SimpleReplicatedLogEntry implements ReplicatedLogEntry, Seria
     private static final class Proxy implements Externalizable {
         private static final long serialVersionUID = 1L;
 
-        private ReplicatedLogEntry replicatedLogEntry;
+        private long index;
+        private long term;
+        private Payload data;
 
         // checkstyle flags the public modifier as redundant which really doesn't make sense since it clearly isn't
         // redundant. It is explicitly needed for Java serialization to be able to create instances via reflection.
@@ -35,33 +38,34 @@ public final class SimpleReplicatedLogEntry implements ReplicatedLogEntry, Seria
             // For Externalizable
         }
 
-        Proxy(final ReplicatedLogEntry replicatedLogEntry) {
-            this.replicatedLogEntry = replicatedLogEntry;
-        }
-
-        static int estimatedSerializedSize(final ReplicatedLogEntry replicatedLogEntry) {
-            return 8 /* index */ + 8 /* term */ + replicatedLogEntry.getData().size()
-                    + 400 /* estimated extra padding for class info */;
+        Proxy(final SimpleReplicatedLogEntry replicatedLogEntry) {
+            index = replicatedLogEntry.getIndex();
+            term = replicatedLogEntry.getTerm();
+            data = replicatedLogEntry.getData();
         }
 
         @Override
         public void writeExternal(final ObjectOutput out) throws IOException {
-            out.writeLong(replicatedLogEntry.getIndex());
-            out.writeLong(replicatedLogEntry.getTerm());
-            out.writeObject(replicatedLogEntry.getData());
+            out.writeLong(index);
+            out.writeLong(term);
+            out.writeObject(data);
         }
 
         @Override
         public void readExternal(final ObjectInput in) throws IOException, ClassNotFoundException {
-            replicatedLogEntry = new SimpleReplicatedLogEntry(in.readLong(), in.readLong(), (Payload) in.readObject());
+            index = in.readLong();
+            term = in.readLong();
+            data = (Payload) in.readObject();
         }
 
         private Object readResolve() {
-            return replicatedLogEntry;
+            return new SimpleReplicatedLogEntry(index, term, data);
         }
     }
 
     private static final long serialVersionUID = 1L;
+    // Estimate to how big the proxy is. Note this includes object stream overhead, so it is a bit conservative
+    private static final int PROXY_SIZE = SerializationUtils.serialize(new Proxy()).length;
 
     private final long index;
     private final long term;
@@ -98,7 +102,12 @@ public final class SimpleReplicatedLogEntry implements ReplicatedLogEntry, Seria
 
     @Override
     public int size() {
-        return getData().size();
+        return payload.size();
+    }
+
+    @Override
+    public int serializedSize() {
+        return PROXY_SIZE + payload.serializedSize();
     }
 
     @Override
@@ -115,10 +124,6 @@ public final class SimpleReplicatedLogEntry implements ReplicatedLogEntry, Seria
         return new Proxy(this);
     }
 
-    public int estimatedSerializedSize() {
-        return Proxy.estimatedSerializedSize(this);
-    }
-
     @Override
     public int hashCode() {
         final int prime = 31;
index ca6e6dff30156b3e571281c1c85f89ac619a390c..f1c2fea4a6013cc41f1dde5008fff04170d4711a 100644 (file)
@@ -49,7 +49,7 @@ public class SimpleReplicatedLogEntrySerializer extends JSerializer {
         checkArgument(obj instanceof SimpleReplicatedLogEntry, "Unsupported object type %s", obj.getClass());
 
         SimpleReplicatedLogEntry replicatedLogEntry = (SimpleReplicatedLogEntry)obj;
-        final int estimatedSerializedSize = replicatedLogEntry.estimatedSerializedSize();
+        final int estimatedSerializedSize = replicatedLogEntry.serializedSize();
 
         final ByteArrayOutputStream bos = new ByteArrayOutputStream(estimatedSerializedSize);
         SerializationUtils.serialize(replicatedLogEntry, bos);
index 1e4859254fe091d0196e2ae75dd2325715593e84..5be865f8996f283390685cd5ab7c4b243ee319d0 100644 (file)
@@ -126,7 +126,7 @@ public abstract class AbstractRaftActorIntegrationTest extends AbstractActorTest
 
         TestRaftActor(final Builder builder) {
             super(builder);
-            this.collectorActor = builder.collectorActor;
+            collectorActor = builder.collectorActor;
         }
 
         public void startDropMessages(final Class<?> msgClass) {
@@ -148,8 +148,7 @@ public abstract class AbstractRaftActorIntegrationTest extends AbstractActorTest
         @SuppressWarnings({ "rawtypes", "unchecked", "checkstyle:IllegalCatch" })
         @Override
         public void handleCommand(final Object message) {
-            if (message instanceof MockPayload) {
-                MockPayload payload = (MockPayload) message;
+            if (message instanceof MockPayload payload) {
                 super.persistData(collectorActor, new MockIdentifier(payload.toString()), payload, false);
                 return;
             }
@@ -214,13 +213,14 @@ public abstract class AbstractRaftActorIntegrationTest extends AbstractActorTest
             }
 
             public Builder collectorActor(final ActorRef newCollectorActor) {
-                this.collectorActor = newCollectorActor;
+                collectorActor = newCollectorActor;
                 return this;
             }
         }
     }
 
-    protected static final int SNAPSHOT_CHUNK_SIZE = 100;
+    // FIXME: this is an arbitrary limit. Document interactions and/or improve them to improve maintainability
+    protected static final int SNAPSHOT_CHUNK_SIZE = 700;
 
     protected final Logger testLog = LoggerFactory.getLogger(getClass());
 
index 983b26da9c15353eb6a440418c938a3c3413fb18..65ac83d0d00c17d6c8a7e47136a2e95772a754bb 100644 (file)
@@ -145,25 +145,32 @@ public class AbstractReplicatedLogImplTest {
         from = replicatedLogImpl.getFrom(0, 20, ReplicatedLog.NO_MAX_SIZE);
         assertEquals(4, from.size());
         assertEquals("A", from.get(0).getData().toString());
+        assertEquals("B", from.get(1).getData().toString());
+        assertEquals("C", from.get(2).getData().toString());
         assertEquals("D", from.get(3).getData().toString());
 
+        // Pre-calculate sizing information for use with capping
+        final int sizeB = from.get(1).serializedSize();
+        final int sizeC = from.get(2).serializedSize();
+        final int sizeD = from.get(3).serializedSize();
+
         from = replicatedLogImpl.getFrom(1, 2, ReplicatedLog.NO_MAX_SIZE);
         assertEquals(2, from.size());
         assertEquals("B", from.get(0).getData().toString());
         assertEquals("C", from.get(1).getData().toString());
 
-        from = replicatedLogImpl.getFrom(1, 3, 2);
+        from = replicatedLogImpl.getFrom(1, 3, sizeB + sizeC);
         assertEquals(2, from.size());
         assertEquals("B", from.get(0).getData().toString());
         assertEquals("C", from.get(1).getData().toString());
 
-        from = replicatedLogImpl.getFrom(1, 3, 3);
+        from = replicatedLogImpl.getFrom(1, 3, sizeB + sizeC + sizeD);
         assertEquals(3, from.size());
         assertEquals("B", from.get(0).getData().toString());
         assertEquals("C", from.get(1).getData().toString());
         assertEquals("D", from.get(2).getData().toString());
 
-        from = replicatedLogImpl.getFrom(1, 2, 3);
+        from = replicatedLogImpl.getFrom(1, 2, sizeB + sizeC + sizeD);
         assertEquals(2, from.size());
         assertEquals("B", from.get(0).getData().toString());
         assertEquals("C", from.get(1).getData().toString());
index 0d4007913de0ef6d95a35e35c5158653a3238740..6d4ec22e3d6be213f9a5f09d9b62042b9649d527 100644 (file)
@@ -227,6 +227,11 @@ public class MockRaftActorContext extends RaftActorContextImpl {
             return size;
         }
 
+        @Override
+        public int serializedSize() {
+            return size;
+        }
+
         @Override
         public String toString() {
             return data;
index 33247d3f0cf6f70da582a354e81ae6f931edf43b..9dedc67e68776215bf58acad376c07e35ab83137 100644 (file)
@@ -105,6 +105,11 @@ public class RaftActorDelegatingPersistentDataProviderTest {
             return 0;
         }
 
+        @Override
+        public int serializedSize() {
+            return 0;
+        }
+
         @Override
         protected Object writeReplace() {
             // Not needed
index d6a53a0aeeb6e84b86d457b3ca667dbaee111101..00147a3c0e0588e19314cfcfe0be0c3fd931e62e 100644 (file)
@@ -452,7 +452,7 @@ public class ReplicationAndSnapshotsWithLaggingFollowerIntegrationTest extends A
 
         setupFollower2();
 
-        MessageCollectorActor.expectMatching(follower2CollectorActor, InstallSnapshot.class, 5);
+        MessageCollectorActor.expectMatching(follower2CollectorActor, InstallSnapshot.class, 1);
 
         follower2Actor.stop();
 
@@ -613,8 +613,8 @@ public class ReplicationAndSnapshotsWithLaggingFollowerIntegrationTest extends A
     /**
      * Resume the lagging follower 2 and verify it receives an install snapshot from the leader.
      */
-    private void verifyInstallSnapshotToLaggingFollower(long lastAppliedIndex,
-            @Nullable ServerConfigurationPayload expServerConfig) {
+    private void verifyInstallSnapshotToLaggingFollower(final long lastAppliedIndex,
+            final @Nullable ServerConfigurationPayload expServerConfig) {
         testLog.info("verifyInstallSnapshotToLaggingFollower starting");
 
         MessageCollectorActor.clearMessages(leaderCollectorActor);
@@ -811,8 +811,8 @@ public class ReplicationAndSnapshotsWithLaggingFollowerIntegrationTest extends A
     /**
      * Kill the leader actor, reinstate it and verify the recovered journal.
      */
-    private void verifyLeaderRecoveryAfterReinstatement(long lastIndex, long snapshotIndex,
-            long firstJournalEntryIndex) {
+    private void verifyLeaderRecoveryAfterReinstatement(final long lastIndex, final long snapshotIndex,
+            final long firstJournalEntryIndex) {
         testLog.info("verifyLeaderRecoveryAfterReinstatement starting: lastIndex: {}, snapshotIndex: {}, "
             + "firstJournalEntryIndex: {}", lastIndex, snapshotIndex, firstJournalEntryIndex);
 
@@ -845,7 +845,7 @@ public class ReplicationAndSnapshotsWithLaggingFollowerIntegrationTest extends A
         testLog.info("verifyLeaderRecoveryAfterReinstatement ending");
     }
 
-    private void sendInitialPayloadsReplicatedToAllFollowers(String... data) {
+    private void sendInitialPayloadsReplicatedToAllFollowers(final String... data) {
 
         // Send the payloads.
         for (String d: data) {
index e10f0489e36ff2b1b6024abda0c54ad47127a85b..f847dbce9e860159bbd131a3dc88f39f97486ed1 100644 (file)
@@ -1793,7 +1793,8 @@ public class LeaderTest extends AbstractLeaderTest<Leader> {
         MockRaftActorContext leaderActorContext = createActorContextWithFollower();
         ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
                 new FiniteDuration(1000, TimeUnit.SECONDS));
-        ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setSnapshotChunkSize(2);
+        // Note: the size here depends on estimate
+        ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setSnapshotChunkSize(246);
 
         leaderActorContext.setReplicatedLog(
                 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 4, 1).build());
@@ -2462,7 +2463,7 @@ public class LeaderTest extends AbstractLeaderTest<Leader> {
         assertEquals("New votedFor", null, actorContext.getTermInformation().getVotedFor());
     }
 
-    private class MockConfigParamsImpl extends DefaultConfigParamsImpl {
+    private static class MockConfigParamsImpl extends DefaultConfigParamsImpl {
 
         private final long electionTimeOutIntervalMillis;
         private final int snapshotChunkSize;
index 0931b91ac7b524d2e8f0ccfefd068af18210ee19..793f2ea0628cdf967551eb90e6c87519dd62dd01 100644 (file)
@@ -49,6 +49,7 @@ public final class AbortTransactionPayload extends AbstractIdentifiablePayload<T
 
     private static final Logger LOG = LoggerFactory.getLogger(AbortTransactionPayload.class);
     private static final long serialVersionUID = 1L;
+    private static final int PROXY_SIZE = externalizableProxySize(Proxy::new);
 
     AbortTransactionPayload(final TransactionIdentifier transactionId, final byte[] serialized) {
         super(transactionId, serialized);
@@ -71,4 +72,9 @@ public final class AbortTransactionPayload extends AbstractIdentifiablePayload<T
     protected Proxy externalizableProxy(final byte[] serialized) {
         return new Proxy(serialized);
     }
+
+    @Override
+    protected int externalizableProxySize() {
+        return PROXY_SIZE;
+    }
 }
index d081a13421e02ba8157d7f340b53fe87534a3d3a..f07d4dbe903d10244f9a0ec4e764b82864e5fb7e 100644 (file)
@@ -18,6 +18,8 @@ import java.io.IOException;
 import java.io.ObjectInput;
 import java.io.ObjectOutput;
 import java.io.Serializable;
+import java.util.function.Function;
+import org.apache.commons.lang3.SerializationUtils;
 import org.eclipse.jdt.annotation.NonNull;
 import org.opendaylight.controller.cluster.raft.messages.IdentifiablePayload;
 import org.opendaylight.yangtools.concepts.Identifiable;
@@ -88,6 +90,15 @@ public abstract class AbstractIdentifiablePayload<T extends Identifier> extends
         return serialized.length;
     }
 
+    @Override
+    public final int serializedSize() {
+        // TODO: this is not entirely accurate, as the serialization stream has additional overheads:
+        //       - 3 bytes for each block of data <256 bytes
+        //       - 5 bytes for each block of data >=256 bytes
+        //       - each block of data is limited to 1024 bytes as per serialization spec
+        return size() + externalizableProxySize();
+    }
+
     @Override
     public final String toString() {
         return MoreObjects.toStringHelper(this).add("identifier", identifier).add("size", size()).toString();
@@ -100,4 +111,10 @@ public abstract class AbstractIdentifiablePayload<T extends Identifier> extends
 
     @SuppressWarnings("checkstyle:hiddenField")
     protected abstract @NonNull AbstractProxy<T> externalizableProxy(byte @NonNull[] serialized);
+
+    protected abstract int externalizableProxySize();
+
+    protected static final int externalizableProxySize(final Function<byte[], ? extends AbstractProxy<?>> constructor) {
+        return SerializationUtils.serialize(constructor.apply(new byte[0])).length;
+    }
 }
index 1921ff889fa99998330cdca94b5ee93a98961bb6..c5925136737c12d25e96e90d4af1e51e2518463c 100644 (file)
@@ -49,6 +49,7 @@ public final class CloseLocalHistoryPayload extends AbstractIdentifiablePayload<
 
     private static final Logger LOG = LoggerFactory.getLogger(CloseLocalHistoryPayload.class);
     private static final long serialVersionUID = 1L;
+    private static final int PROXY_SIZE = externalizableProxySize(Proxy::new);
 
     CloseLocalHistoryPayload(final LocalHistoryIdentifier historyId, final byte[] serialized) {
         super(historyId, serialized);
@@ -71,4 +72,9 @@ public final class CloseLocalHistoryPayload extends AbstractIdentifiablePayload<
     protected Proxy externalizableProxy(final byte[] serialized) {
         return new Proxy(serialized);
     }
+
+    @Override
+    protected int externalizableProxySize() {
+        return PROXY_SIZE;
+    }
 }
index accb9ef09708cb49385a3434f3eb50b5f1e203b5..d4d08a7292fa531f9da60b7861ad29c796f1640c 100644 (file)
@@ -27,6 +27,7 @@ import java.io.Serializable;
 import java.io.StreamCorruptedException;
 import java.util.AbstractMap.SimpleImmutableEntry;
 import java.util.Map.Entry;
+import org.apache.commons.lang3.SerializationUtils;
 import org.eclipse.jdt.annotation.NonNull;
 import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
 import org.opendaylight.controller.cluster.datastore.persisted.DataTreeCandidateInputOutput.DataTreeCandidateWithVersion;
@@ -116,6 +117,12 @@ public abstract class CommitTransactionPayload extends IdentifiablePayload<Trans
         }
     }
 
+    @Override
+    public final int serializedSize() {
+        // TODO: this is not entirely accurate as the the byte[] can be chunked by the serialization stream
+        return ProxySizeHolder.PROXY_SIZE + size();
+    }
+
     /**
      * The cached candidate needs to be cleared after it is done applying to the DataTree, otherwise it would be keeping
      * deserialized in memory which are not needed anymore leading to wasted memory. This lets the payload know that
@@ -197,6 +204,15 @@ public abstract class CommitTransactionPayload extends IdentifiablePayload<Trans
         }
     }
 
+    // Exists to break initialization dependency between CommitTransactionPayload/Simple/Proxy
+    private static final class ProxySizeHolder {
+        static final int PROXY_SIZE = SerializationUtils.serialize(new Proxy(new Simple(new byte[0]))).length;
+
+        private ProxySizeHolder() {
+            // Hidden on purpose
+        }
+    }
+
     private static final class Proxy implements Externalizable {
         private static final long serialVersionUID = 1L;
 
index 76ea934cb42433c7ff833346e302e3a308f271c9..65f0e5d0983a0c64f33dfbd8789b867808b53e8f 100644 (file)
@@ -49,6 +49,7 @@ public final class CreateLocalHistoryPayload extends AbstractIdentifiablePayload
 
     private static final Logger LOG = LoggerFactory.getLogger(CreateLocalHistoryPayload.class);
     private static final long serialVersionUID = 1L;
+    private static final int PROXY_SIZE = externalizableProxySize(Proxy::new);
 
     CreateLocalHistoryPayload(final LocalHistoryIdentifier historyId, final byte[] serialized) {
         super(historyId, serialized);
@@ -71,4 +72,9 @@ public final class CreateLocalHistoryPayload extends AbstractIdentifiablePayload
     protected Proxy externalizableProxy(final byte[] serialized) {
         return new Proxy(serialized);
     }
+
+    @Override
+    protected int externalizableProxySize() {
+        return PROXY_SIZE;
+    }
 }
index c5c6375ff267e8ddd72b9d047cfd0cfcd4e21945..d8684fef985b0b04600dcfd2e8a7f27f0eefc66e 100644 (file)
@@ -40,6 +40,7 @@ public final class DisableTrackingPayload extends AbstractIdentifiablePayload<Cl
 
     private static final Logger LOG = LoggerFactory.getLogger(DisableTrackingPayload.class);
     private static final long serialVersionUID = 1L;
+    private static final int PROXY_SIZE = externalizableProxySize(Proxy::new);
 
     DisableTrackingPayload(final ClientIdentifier clientId, final byte[] serialized) {
         super(clientId, serialized);
@@ -62,4 +63,9 @@ public final class DisableTrackingPayload extends AbstractIdentifiablePayload<Cl
     protected Proxy externalizableProxy(final byte[] serialized) {
         return new Proxy(serialized);
     }
+
+    @Override
+    protected int externalizableProxySize() {
+        return PROXY_SIZE;
+    }
 }
index 4e3ca254a94051e3e5e8bb843c02d681d7fcdedf..d440910943d213e57cd45037e29593bec864c130 100644 (file)
@@ -50,6 +50,7 @@ public final class PurgeLocalHistoryPayload extends AbstractIdentifiablePayload<
 
     private static final Logger LOG = LoggerFactory.getLogger(PurgeLocalHistoryPayload.class);
     private static final long serialVersionUID = 1L;
+    private static final int PROXY_SIZE = externalizableProxySize(Proxy::new);
 
     PurgeLocalHistoryPayload(final LocalHistoryIdentifier historyId, final byte[] serialized) {
         super(historyId, serialized);
@@ -72,4 +73,9 @@ public final class PurgeLocalHistoryPayload extends AbstractIdentifiablePayload<
     protected Proxy externalizableProxy(final byte[] serialized) {
         return new Proxy(serialized);
     }
+
+    @Override
+    protected int externalizableProxySize() {
+        return PROXY_SIZE;
+    }
 }
index 725189e5fc64864f14eb478a0b960eada435fa4a..f3c094c0ddbbaabfbd56a722bf68c9e3bb5fb1b7 100644 (file)
@@ -49,6 +49,7 @@ public final class PurgeTransactionPayload extends AbstractIdentifiablePayload<T
 
     private static final Logger LOG = LoggerFactory.getLogger(PurgeTransactionPayload.class);
     private static final long serialVersionUID = 1L;
+    private static final int PROXY_SIZE = externalizableProxySize(Proxy::new);
 
     PurgeTransactionPayload(final TransactionIdentifier transactionId, final byte[] serialized) {
         super(transactionId, serialized);
@@ -71,4 +72,9 @@ public final class PurgeTransactionPayload extends AbstractIdentifiablePayload<T
     protected Proxy externalizableProxy(final byte[] serialized) {
         return new Proxy(serialized);
     }
+
+    @Override
+    protected int externalizableProxySize() {
+        return PROXY_SIZE;
+    }
 }
index 70daab692a80b77ceb86051f86740c1676d1a550..5b8ba739420f0953593cdb20596352fbdb3741e0 100644 (file)
@@ -58,6 +58,7 @@ public final class SkipTransactionsPayload extends AbstractIdentifiablePayload<L
 
     private static final Logger LOG = LoggerFactory.getLogger(SkipTransactionsPayload.class);
     private static final long serialVersionUID = 1L;
+    private static final int PROXY_SIZE = externalizableProxySize(Proxy::new);
 
     @SuppressFBWarnings(value = "SE_BAD_FIELD", justification = "Handled via externalizable proxy")
     private final @NonNull ImmutableUnsignedLongSet transactionIds;
@@ -91,4 +92,9 @@ public final class SkipTransactionsPayload extends AbstractIdentifiablePayload<L
     protected Proxy externalizableProxy(final byte[] serialized) {
         return new Proxy(serialized);
     }
+
+    @Override
+    protected int externalizableProxySize() {
+        return PROXY_SIZE;
+    }
 }