Add Payload.serializedSize() 11/101211/9
authorRobert Varga <robert.varga@pantheon.tech>
Wed, 18 May 2022 07:15:22 +0000 (09:15 +0200)
committerRobert Varga <robert.varga@pantheon.tech>
Thu, 26 May 2022 18:08:35 +0000 (20:08 +0200)
There is a rather big difference in payload sizing when we consider
how big an entry is: in-memory size can easily be "zero", which can
translate to a serialized size of hundreds of bytes. This difference
is problematic, as we use the former to estimate how many payloads
we can squeeze in AppendEntries and we compare that to the configured
payload limit. Even when there is some (32KiB by default) cushion, we
can end up blowing past the frame size.

Add Payload.serializedSize(), which should provide a semi-conservative
estimate of serialized size and use that to select the cut-off.

Also improve SimpleReplicatedLogEntry's estimates by performing a a
quick serialization operation -- which reduces potential waste for each
entry by 294 bytes, as our hard-coded estimate of 400 bytes was way too
conservative.

JIRA: CONTROLLER-2037
Change-Id: I5abe7d00db9e10f1c66e6db0f7c82854f9aa352d
Signed-off-by: Robert Varga <robert.varga@pantheon.tech>
24 files changed:
opendaylight/md-sal/sal-akka-raft-example/src/main/java/org/opendaylight/controller/cluster/example/messages/KeyValue.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/AbstractReplicatedLogImpl.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/ReplicatedLogEntry.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractLeader.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/messages/Payload.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/persisted/NoopPayload.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/persisted/ServerConfigurationPayload.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/persisted/SimpleReplicatedLogEntry.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/persisted/SimpleReplicatedLogEntrySerializer.java
opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/AbstractRaftActorIntegrationTest.java
opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/AbstractReplicatedLogImplTest.java
opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/MockRaftActorContext.java
opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/RaftActorDelegatingPersistentDataProviderTest.java
opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/ReplicationAndSnapshotsWithLaggingFollowerIntegrationTest.java
opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/LeaderTest.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/persisted/AbortTransactionPayload.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/persisted/AbstractIdentifiablePayload.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/persisted/CloseLocalHistoryPayload.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/persisted/CommitTransactionPayload.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/persisted/CreateLocalHistoryPayload.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/persisted/DisableTrackingPayload.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/persisted/PurgeLocalHistoryPayload.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/persisted/PurgeTransactionPayload.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/persisted/SkipTransactionsPayload.java

index 9312183..78eea5c 100644 (file)
@@ -18,7 +18,7 @@ public final class KeyValue extends Payload {
     public KeyValue() {
     }
 
-    public KeyValue(String key, String value) {
+    public KeyValue(final String key, final String value) {
         this.key = key;
         this.value = value;
     }
@@ -32,13 +32,19 @@ public final class KeyValue extends Payload {
     }
 
     @Override
-    public String toString() {
-        return "KeyValue{" + "key='" + key + '\'' + ", value='" + value + '\'' + '}';
+    public int size() {
+        return value.length() + key.length();
     }
 
     @Override
-    public int size() {
-        return value.length() + key.length();
+    public int serializedSize() {
+        // Should be a better estimate
+        return size();
+    }
+
+    @Override
+    public String toString() {
+        return "KeyValue{" + "key='" + key + '\'' + ", value='" + value + '\'' + '}';
     }
 
     @Override
index 64506ee..53d317f 100644 (file)
@@ -43,7 +43,7 @@ public abstract class AbstractReplicatedLogImpl implements ReplicatedLog {
         this.snapshotTerm = snapshotTerm;
         this.logContext = logContext;
 
-        this.journal = new ArrayList<>(unAppliedEntries.size());
+        journal = new ArrayList<>(unAppliedEntries.size());
         for (ReplicatedLogEntry entry: unAppliedEntries) {
             append(entry);
         }
@@ -168,7 +168,7 @@ public abstract class AbstractReplicatedLogImpl implements ReplicatedLog {
         long totalSize = 0;
         for (int i = fromIndex; i < toIndex; i++) {
             ReplicatedLogEntry entry = journal.get(i);
-            totalSize += entry.size();
+            totalSize += entry.serializedSize();
             if (totalSize <= maxDataSize) {
                 retList.add(entry);
             } else {
index 1f08523..360f6b6 100644 (file)
@@ -42,6 +42,15 @@ public interface ReplicatedLogEntry {
      */
     int size();
 
+    /**
+     * Return the estimate of serialized size of this entry when passed through serialization. The estimate needs to
+     * be reasonably accurate and should err on the side of caution and report a slightly-higher size in face of
+     * uncertainty.
+     *
+     * @return An estimate of serialized size.
+     */
+    int serializedSize();
+
     /**
      * Checks if persistence is pending for this entry.
      *
index 71b2179..0188a6d 100644 (file)
@@ -781,7 +781,7 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior {
 
         // If the first entry's size exceeds the max data size threshold, it will be returned from the call above. If
         // that is the case, then we need to slice it into smaller chunks.
-        if (entries.size() != 1 || entries.get(0).getData().size() <= maxDataSize) {
+        if (entries.size() != 1 || entries.get(0).getData().serializedSize() <= maxDataSize) {
             // Don't need to slice.
             return entries;
         }
index 78a2497..bda1d1e 100644 (file)
@@ -26,6 +26,15 @@ public abstract class Payload implements Serializable {
      */
     public abstract int size();
 
+    /**
+     * Return the estimate of serialized size of this payload when passed through serialization. The estimate needs to
+     * be reasonably accurate and should err on the side of caution and report a slightly-higher size in face of
+     * uncertainty.
+     *
+     * @return An estimate of serialized size.
+     */
+    public abstract int serializedSize();
+
     /**
      * Return the serialization proxy for this object.
      *
index 800ea45..e190bf0 100644 (file)
@@ -9,6 +9,7 @@ package org.opendaylight.controller.cluster.raft.persisted;
 
 import akka.dispatch.ControlMessage;
 import java.io.Serializable;
+import org.apache.commons.lang3.SerializationUtils;
 import org.opendaylight.controller.cluster.raft.messages.Payload;
 
 /**
@@ -31,8 +32,11 @@ public final class NoopPayload extends Payload implements ControlMessage {
 
     private static final long serialVersionUID = 1L;
     private static final Proxy PROXY = new Proxy();
+    // Estimate to how big the proxy is. Note this includes object stream overhead, so it is a bit conservative
+    private static final int PROXY_SIZE = SerializationUtils.serialize(PROXY).length;
 
     private NoopPayload() {
+        // Hidden on purpose
     }
 
     @Override
@@ -40,6 +44,11 @@ public final class NoopPayload extends Payload implements ControlMessage {
         return 0;
     }
 
+    @Override
+    public int serializedSize() {
+        return PROXY_SIZE;
+    }
+
     @Override
     protected Object writeReplace() {
         return PROXY;
index eb66817..68a5945 100644 (file)
@@ -89,6 +89,11 @@ public final class ServerConfigurationPayload extends Payload implements Persist
 
     @Override
     public int size() {
+        return serializedSize();
+    }
+
+    @Override
+    public int serializedSize() {
         if (serializedSize < 0) {
             try (ByteArrayOutputStream bos = new ByteArrayOutputStream()) {
                 try (ObjectOutputStream out = new ObjectOutputStream(bos)) {
index 10bbf35..56a81f5 100644 (file)
@@ -14,6 +14,7 @@ import java.io.IOException;
 import java.io.ObjectInput;
 import java.io.ObjectOutput;
 import java.io.Serializable;
+import org.apache.commons.lang3.SerializationUtils;
 import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
 import org.opendaylight.controller.cluster.raft.messages.Payload;
 
@@ -26,7 +27,9 @@ public final class SimpleReplicatedLogEntry implements ReplicatedLogEntry, Seria
     private static final class Proxy implements Externalizable {
         private static final long serialVersionUID = 1L;
 
-        private ReplicatedLogEntry replicatedLogEntry;
+        private long index;
+        private long term;
+        private Payload data;
 
         // checkstyle flags the public modifier as redundant which really doesn't make sense since it clearly isn't
         // redundant. It is explicitly needed for Java serialization to be able to create instances via reflection.
@@ -35,33 +38,34 @@ public final class SimpleReplicatedLogEntry implements ReplicatedLogEntry, Seria
             // For Externalizable
         }
 
-        Proxy(final ReplicatedLogEntry replicatedLogEntry) {
-            this.replicatedLogEntry = replicatedLogEntry;
-        }
-
-        static int estimatedSerializedSize(final ReplicatedLogEntry replicatedLogEntry) {
-            return 8 /* index */ + 8 /* term */ + replicatedLogEntry.getData().size()
-                    + 400 /* estimated extra padding for class info */;
+        Proxy(final SimpleReplicatedLogEntry replicatedLogEntry) {
+            index = replicatedLogEntry.getIndex();
+            term = replicatedLogEntry.getTerm();
+            data = replicatedLogEntry.getData();
         }
 
         @Override
         public void writeExternal(final ObjectOutput out) throws IOException {
-            out.writeLong(replicatedLogEntry.getIndex());
-            out.writeLong(replicatedLogEntry.getTerm());
-            out.writeObject(replicatedLogEntry.getData());
+            out.writeLong(index);
+            out.writeLong(term);
+            out.writeObject(data);
         }
 
         @Override
         public void readExternal(final ObjectInput in) throws IOException, ClassNotFoundException {
-            replicatedLogEntry = new SimpleReplicatedLogEntry(in.readLong(), in.readLong(), (Payload) in.readObject());
+            index = in.readLong();
+            term = in.readLong();
+            data = (Payload) in.readObject();
         }
 
         private Object readResolve() {
-            return replicatedLogEntry;
+            return new SimpleReplicatedLogEntry(index, term, data);
         }
     }
 
     private static final long serialVersionUID = 1L;
+    // Estimate to how big the proxy is. Note this includes object stream overhead, so it is a bit conservative
+    private static final int PROXY_SIZE = SerializationUtils.serialize(new Proxy()).length;
 
     private final long index;
     private final long term;
@@ -98,7 +102,12 @@ public final class SimpleReplicatedLogEntry implements ReplicatedLogEntry, Seria
 
     @Override
     public int size() {
-        return getData().size();
+        return payload.size();
+    }
+
+    @Override
+    public int serializedSize() {
+        return PROXY_SIZE + payload.serializedSize();
     }
 
     @Override
@@ -115,10 +124,6 @@ public final class SimpleReplicatedLogEntry implements ReplicatedLogEntry, Seria
         return new Proxy(this);
     }
 
-    public int estimatedSerializedSize() {
-        return Proxy.estimatedSerializedSize(this);
-    }
-
     @Override
     public int hashCode() {
         final int prime = 31;
index ca6e6df..f1c2fea 100644 (file)
@@ -49,7 +49,7 @@ public class SimpleReplicatedLogEntrySerializer extends JSerializer {
         checkArgument(obj instanceof SimpleReplicatedLogEntry, "Unsupported object type %s", obj.getClass());
 
         SimpleReplicatedLogEntry replicatedLogEntry = (SimpleReplicatedLogEntry)obj;
-        final int estimatedSerializedSize = replicatedLogEntry.estimatedSerializedSize();
+        final int estimatedSerializedSize = replicatedLogEntry.serializedSize();
 
         final ByteArrayOutputStream bos = new ByteArrayOutputStream(estimatedSerializedSize);
         SerializationUtils.serialize(replicatedLogEntry, bos);
index 1e48592..5be865f 100644 (file)
@@ -126,7 +126,7 @@ public abstract class AbstractRaftActorIntegrationTest extends AbstractActorTest
 
         TestRaftActor(final Builder builder) {
             super(builder);
-            this.collectorActor = builder.collectorActor;
+            collectorActor = builder.collectorActor;
         }
 
         public void startDropMessages(final Class<?> msgClass) {
@@ -148,8 +148,7 @@ public abstract class AbstractRaftActorIntegrationTest extends AbstractActorTest
         @SuppressWarnings({ "rawtypes", "unchecked", "checkstyle:IllegalCatch" })
         @Override
         public void handleCommand(final Object message) {
-            if (message instanceof MockPayload) {
-                MockPayload payload = (MockPayload) message;
+            if (message instanceof MockPayload payload) {
                 super.persistData(collectorActor, new MockIdentifier(payload.toString()), payload, false);
                 return;
             }
@@ -214,13 +213,14 @@ public abstract class AbstractRaftActorIntegrationTest extends AbstractActorTest
             }
 
             public Builder collectorActor(final ActorRef newCollectorActor) {
-                this.collectorActor = newCollectorActor;
+                collectorActor = newCollectorActor;
                 return this;
             }
         }
     }
 
-    protected static final int SNAPSHOT_CHUNK_SIZE = 100;
+    // FIXME: this is an arbitrary limit. Document interactions and/or improve them to improve maintainability
+    protected static final int SNAPSHOT_CHUNK_SIZE = 700;
 
     protected final Logger testLog = LoggerFactory.getLogger(getClass());
 
index 983b26d..65ac83d 100644 (file)
@@ -145,25 +145,32 @@ public class AbstractReplicatedLogImplTest {
         from = replicatedLogImpl.getFrom(0, 20, ReplicatedLog.NO_MAX_SIZE);
         assertEquals(4, from.size());
         assertEquals("A", from.get(0).getData().toString());
+        assertEquals("B", from.get(1).getData().toString());
+        assertEquals("C", from.get(2).getData().toString());
         assertEquals("D", from.get(3).getData().toString());
 
+        // Pre-calculate sizing information for use with capping
+        final int sizeB = from.get(1).serializedSize();
+        final int sizeC = from.get(2).serializedSize();
+        final int sizeD = from.get(3).serializedSize();
+
         from = replicatedLogImpl.getFrom(1, 2, ReplicatedLog.NO_MAX_SIZE);
         assertEquals(2, from.size());
         assertEquals("B", from.get(0).getData().toString());
         assertEquals("C", from.get(1).getData().toString());
 
-        from = replicatedLogImpl.getFrom(1, 3, 2);
+        from = replicatedLogImpl.getFrom(1, 3, sizeB + sizeC);
         assertEquals(2, from.size());
         assertEquals("B", from.get(0).getData().toString());
         assertEquals("C", from.get(1).getData().toString());
 
-        from = replicatedLogImpl.getFrom(1, 3, 3);
+        from = replicatedLogImpl.getFrom(1, 3, sizeB + sizeC + sizeD);
         assertEquals(3, from.size());
         assertEquals("B", from.get(0).getData().toString());
         assertEquals("C", from.get(1).getData().toString());
         assertEquals("D", from.get(2).getData().toString());
 
-        from = replicatedLogImpl.getFrom(1, 2, 3);
+        from = replicatedLogImpl.getFrom(1, 2, sizeB + sizeC + sizeD);
         assertEquals(2, from.size());
         assertEquals("B", from.get(0).getData().toString());
         assertEquals("C", from.get(1).getData().toString());
index 0d40079..6d4ec22 100644 (file)
@@ -227,6 +227,11 @@ public class MockRaftActorContext extends RaftActorContextImpl {
             return size;
         }
 
+        @Override
+        public int serializedSize() {
+            return size;
+        }
+
         @Override
         public String toString() {
             return data;
index 33247d3..9dedc67 100644 (file)
@@ -105,6 +105,11 @@ public class RaftActorDelegatingPersistentDataProviderTest {
             return 0;
         }
 
+        @Override
+        public int serializedSize() {
+            return 0;
+        }
+
         @Override
         protected Object writeReplace() {
             // Not needed
index d6a53a0..00147a3 100644 (file)
@@ -452,7 +452,7 @@ public class ReplicationAndSnapshotsWithLaggingFollowerIntegrationTest extends A
 
         setupFollower2();
 
-        MessageCollectorActor.expectMatching(follower2CollectorActor, InstallSnapshot.class, 5);
+        MessageCollectorActor.expectMatching(follower2CollectorActor, InstallSnapshot.class, 1);
 
         follower2Actor.stop();
 
@@ -613,8 +613,8 @@ public class ReplicationAndSnapshotsWithLaggingFollowerIntegrationTest extends A
     /**
      * Resume the lagging follower 2 and verify it receives an install snapshot from the leader.
      */
-    private void verifyInstallSnapshotToLaggingFollower(long lastAppliedIndex,
-            @Nullable ServerConfigurationPayload expServerConfig) {
+    private void verifyInstallSnapshotToLaggingFollower(final long lastAppliedIndex,
+            final @Nullable ServerConfigurationPayload expServerConfig) {
         testLog.info("verifyInstallSnapshotToLaggingFollower starting");
 
         MessageCollectorActor.clearMessages(leaderCollectorActor);
@@ -811,8 +811,8 @@ public class ReplicationAndSnapshotsWithLaggingFollowerIntegrationTest extends A
     /**
      * Kill the leader actor, reinstate it and verify the recovered journal.
      */
-    private void verifyLeaderRecoveryAfterReinstatement(long lastIndex, long snapshotIndex,
-            long firstJournalEntryIndex) {
+    private void verifyLeaderRecoveryAfterReinstatement(final long lastIndex, final long snapshotIndex,
+            final long firstJournalEntryIndex) {
         testLog.info("verifyLeaderRecoveryAfterReinstatement starting: lastIndex: {}, snapshotIndex: {}, "
             + "firstJournalEntryIndex: {}", lastIndex, snapshotIndex, firstJournalEntryIndex);
 
@@ -845,7 +845,7 @@ public class ReplicationAndSnapshotsWithLaggingFollowerIntegrationTest extends A
         testLog.info("verifyLeaderRecoveryAfterReinstatement ending");
     }
 
-    private void sendInitialPayloadsReplicatedToAllFollowers(String... data) {
+    private void sendInitialPayloadsReplicatedToAllFollowers(final String... data) {
 
         // Send the payloads.
         for (String d: data) {
index e10f048..f847dbc 100644 (file)
@@ -1793,7 +1793,8 @@ public class LeaderTest extends AbstractLeaderTest<Leader> {
         MockRaftActorContext leaderActorContext = createActorContextWithFollower();
         ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
                 new FiniteDuration(1000, TimeUnit.SECONDS));
-        ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setSnapshotChunkSize(2);
+        // Note: the size here depends on estimate
+        ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setSnapshotChunkSize(246);
 
         leaderActorContext.setReplicatedLog(
                 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 4, 1).build());
@@ -2462,7 +2463,7 @@ public class LeaderTest extends AbstractLeaderTest<Leader> {
         assertEquals("New votedFor", null, actorContext.getTermInformation().getVotedFor());
     }
 
-    private class MockConfigParamsImpl extends DefaultConfigParamsImpl {
+    private static class MockConfigParamsImpl extends DefaultConfigParamsImpl {
 
         private final long electionTimeOutIntervalMillis;
         private final int snapshotChunkSize;
index 0931b91..793f2ea 100644 (file)
@@ -49,6 +49,7 @@ public final class AbortTransactionPayload extends AbstractIdentifiablePayload<T
 
     private static final Logger LOG = LoggerFactory.getLogger(AbortTransactionPayload.class);
     private static final long serialVersionUID = 1L;
+    private static final int PROXY_SIZE = externalizableProxySize(Proxy::new);
 
     AbortTransactionPayload(final TransactionIdentifier transactionId, final byte[] serialized) {
         super(transactionId, serialized);
@@ -71,4 +72,9 @@ public final class AbortTransactionPayload extends AbstractIdentifiablePayload<T
     protected Proxy externalizableProxy(final byte[] serialized) {
         return new Proxy(serialized);
     }
+
+    @Override
+    protected int externalizableProxySize() {
+        return PROXY_SIZE;
+    }
 }
index d081a13..f07d4db 100644 (file)
@@ -18,6 +18,8 @@ import java.io.IOException;
 import java.io.ObjectInput;
 import java.io.ObjectOutput;
 import java.io.Serializable;
+import java.util.function.Function;
+import org.apache.commons.lang3.SerializationUtils;
 import org.eclipse.jdt.annotation.NonNull;
 import org.opendaylight.controller.cluster.raft.messages.IdentifiablePayload;
 import org.opendaylight.yangtools.concepts.Identifiable;
@@ -88,6 +90,15 @@ public abstract class AbstractIdentifiablePayload<T extends Identifier> extends
         return serialized.length;
     }
 
+    @Override
+    public final int serializedSize() {
+        // TODO: this is not entirely accurate, as the serialization stream has additional overheads:
+        //       - 3 bytes for each block of data <256 bytes
+        //       - 5 bytes for each block of data >=256 bytes
+        //       - each block of data is limited to 1024 bytes as per serialization spec
+        return size() + externalizableProxySize();
+    }
+
     @Override
     public final String toString() {
         return MoreObjects.toStringHelper(this).add("identifier", identifier).add("size", size()).toString();
@@ -100,4 +111,10 @@ public abstract class AbstractIdentifiablePayload<T extends Identifier> extends
 
     @SuppressWarnings("checkstyle:hiddenField")
     protected abstract @NonNull AbstractProxy<T> externalizableProxy(byte @NonNull[] serialized);
+
+    protected abstract int externalizableProxySize();
+
+    protected static final int externalizableProxySize(final Function<byte[], ? extends AbstractProxy<?>> constructor) {
+        return SerializationUtils.serialize(constructor.apply(new byte[0])).length;
+    }
 }
index 1921ff8..c592513 100644 (file)
@@ -49,6 +49,7 @@ public final class CloseLocalHistoryPayload extends AbstractIdentifiablePayload<
 
     private static final Logger LOG = LoggerFactory.getLogger(CloseLocalHistoryPayload.class);
     private static final long serialVersionUID = 1L;
+    private static final int PROXY_SIZE = externalizableProxySize(Proxy::new);
 
     CloseLocalHistoryPayload(final LocalHistoryIdentifier historyId, final byte[] serialized) {
         super(historyId, serialized);
@@ -71,4 +72,9 @@ public final class CloseLocalHistoryPayload extends AbstractIdentifiablePayload<
     protected Proxy externalizableProxy(final byte[] serialized) {
         return new Proxy(serialized);
     }
+
+    @Override
+    protected int externalizableProxySize() {
+        return PROXY_SIZE;
+    }
 }
index accb9ef..d4d08a7 100644 (file)
@@ -27,6 +27,7 @@ import java.io.Serializable;
 import java.io.StreamCorruptedException;
 import java.util.AbstractMap.SimpleImmutableEntry;
 import java.util.Map.Entry;
+import org.apache.commons.lang3.SerializationUtils;
 import org.eclipse.jdt.annotation.NonNull;
 import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
 import org.opendaylight.controller.cluster.datastore.persisted.DataTreeCandidateInputOutput.DataTreeCandidateWithVersion;
@@ -116,6 +117,12 @@ public abstract class CommitTransactionPayload extends IdentifiablePayload<Trans
         }
     }
 
+    @Override
+    public final int serializedSize() {
+        // TODO: this is not entirely accurate as the the byte[] can be chunked by the serialization stream
+        return ProxySizeHolder.PROXY_SIZE + size();
+    }
+
     /**
      * The cached candidate needs to be cleared after it is done applying to the DataTree, otherwise it would be keeping
      * deserialized in memory which are not needed anymore leading to wasted memory. This lets the payload know that
@@ -197,6 +204,15 @@ public abstract class CommitTransactionPayload extends IdentifiablePayload<Trans
         }
     }
 
+    // Exists to break initialization dependency between CommitTransactionPayload/Simple/Proxy
+    private static final class ProxySizeHolder {
+        static final int PROXY_SIZE = SerializationUtils.serialize(new Proxy(new Simple(new byte[0]))).length;
+
+        private ProxySizeHolder() {
+            // Hidden on purpose
+        }
+    }
+
     private static final class Proxy implements Externalizable {
         private static final long serialVersionUID = 1L;
 
index 76ea934..65f0e5d 100644 (file)
@@ -49,6 +49,7 @@ public final class CreateLocalHistoryPayload extends AbstractIdentifiablePayload
 
     private static final Logger LOG = LoggerFactory.getLogger(CreateLocalHistoryPayload.class);
     private static final long serialVersionUID = 1L;
+    private static final int PROXY_SIZE = externalizableProxySize(Proxy::new);
 
     CreateLocalHistoryPayload(final LocalHistoryIdentifier historyId, final byte[] serialized) {
         super(historyId, serialized);
@@ -71,4 +72,9 @@ public final class CreateLocalHistoryPayload extends AbstractIdentifiablePayload
     protected Proxy externalizableProxy(final byte[] serialized) {
         return new Proxy(serialized);
     }
+
+    @Override
+    protected int externalizableProxySize() {
+        return PROXY_SIZE;
+    }
 }
index c5c6375..d8684fe 100644 (file)
@@ -40,6 +40,7 @@ public final class DisableTrackingPayload extends AbstractIdentifiablePayload<Cl
 
     private static final Logger LOG = LoggerFactory.getLogger(DisableTrackingPayload.class);
     private static final long serialVersionUID = 1L;
+    private static final int PROXY_SIZE = externalizableProxySize(Proxy::new);
 
     DisableTrackingPayload(final ClientIdentifier clientId, final byte[] serialized) {
         super(clientId, serialized);
@@ -62,4 +63,9 @@ public final class DisableTrackingPayload extends AbstractIdentifiablePayload<Cl
     protected Proxy externalizableProxy(final byte[] serialized) {
         return new Proxy(serialized);
     }
+
+    @Override
+    protected int externalizableProxySize() {
+        return PROXY_SIZE;
+    }
 }
index 4e3ca25..d440910 100644 (file)
@@ -50,6 +50,7 @@ public final class PurgeLocalHistoryPayload extends AbstractIdentifiablePayload<
 
     private static final Logger LOG = LoggerFactory.getLogger(PurgeLocalHistoryPayload.class);
     private static final long serialVersionUID = 1L;
+    private static final int PROXY_SIZE = externalizableProxySize(Proxy::new);
 
     PurgeLocalHistoryPayload(final LocalHistoryIdentifier historyId, final byte[] serialized) {
         super(historyId, serialized);
@@ -72,4 +73,9 @@ public final class PurgeLocalHistoryPayload extends AbstractIdentifiablePayload<
     protected Proxy externalizableProxy(final byte[] serialized) {
         return new Proxy(serialized);
     }
+
+    @Override
+    protected int externalizableProxySize() {
+        return PROXY_SIZE;
+    }
 }
index 725189e..f3c094c 100644 (file)
@@ -49,6 +49,7 @@ public final class PurgeTransactionPayload extends AbstractIdentifiablePayload<T
 
     private static final Logger LOG = LoggerFactory.getLogger(PurgeTransactionPayload.class);
     private static final long serialVersionUID = 1L;
+    private static final int PROXY_SIZE = externalizableProxySize(Proxy::new);
 
     PurgeTransactionPayload(final TransactionIdentifier transactionId, final byte[] serialized) {
         super(transactionId, serialized);
@@ -71,4 +72,9 @@ public final class PurgeTransactionPayload extends AbstractIdentifiablePayload<T
     protected Proxy externalizableProxy(final byte[] serialized) {
         return new Proxy(serialized);
     }
+
+    @Override
+    protected int externalizableProxySize() {
+        return PROXY_SIZE;
+    }
 }
index 70daab6..5b8ba73 100644 (file)
@@ -58,6 +58,7 @@ public final class SkipTransactionsPayload extends AbstractIdentifiablePayload<L
 
     private static final Logger LOG = LoggerFactory.getLogger(SkipTransactionsPayload.class);
     private static final long serialVersionUID = 1L;
+    private static final int PROXY_SIZE = externalizableProxySize(Proxy::new);
 
     @SuppressFBWarnings(value = "SE_BAD_FIELD", justification = "Handled via externalizable proxy")
     private final @NonNull ImmutableUnsignedLongSet transactionIds;
@@ -91,4 +92,9 @@ public final class SkipTransactionsPayload extends AbstractIdentifiablePayload<L
     protected Proxy externalizableProxy(final byte[] serialized) {
         return new Proxy(serialized);
     }
+
+    @Override
+    protected int externalizableProxySize() {
+        return PROXY_SIZE;
+    }
 }