From 86e8e4a06b682aa772c834a2cef56d0596540e1b Mon Sep 17 00:00:00 2001 From: Robert Varga Date: Wed, 18 May 2022 09:15:22 +0200 Subject: [PATCH] Add Payload.serializedSize() There is a rather big difference in payload sizing when we consider how big an entry is: in-memory size can easily be "zero", which can translate to a serialized size of hundreds of bytes. This difference is problematic, as we use the former to estimate how many payloads we can squeeze in AppendEntries and we compare that to the configured payload limit. Even when there is some (32KiB by default) cushion, we can end up blowing past the frame size. Add Payload.serializedSize(), which should provide a semi-conservative estimate of serialized size and use that to select the cut-off. Also improve SimpleReplicatedLogEntry's estimates by performing a a quick serialization operation -- which reduces potential waste for each entry by 294 bytes, as our hard-coded estimate of 400 bytes was way too conservative. JIRA: CONTROLLER-2037 Change-Id: I5abe7d00db9e10f1c66e6db0f7c82854f9aa352d Signed-off-by: Robert Varga --- .../cluster/example/messages/KeyValue.java | 16 +++++--- .../raft/AbstractReplicatedLogImpl.java | 4 +- .../cluster/raft/ReplicatedLogEntry.java | 9 ++++ .../raft/behaviors/AbstractLeader.java | 2 +- .../cluster/raft/messages/Payload.java | 9 ++++ .../cluster/raft/persisted/NoopPayload.java | 9 ++++ .../persisted/ServerConfigurationPayload.java | 5 +++ .../persisted/SimpleReplicatedLogEntry.java | 41 +++++++++++-------- .../SimpleReplicatedLogEntrySerializer.java | 2 +- .../AbstractRaftActorIntegrationTest.java | 10 ++--- .../raft/AbstractReplicatedLogImplTest.java | 13 ++++-- .../cluster/raft/MockRaftActorContext.java | 5 +++ ...rDelegatingPersistentDataProviderTest.java | 5 +++ ...otsWithLaggingFollowerIntegrationTest.java | 12 +++--- .../cluster/raft/behaviors/LeaderTest.java | 5 ++- .../persisted/AbortTransactionPayload.java | 6 +++ .../AbstractIdentifiablePayload.java | 17 ++++++++ .../persisted/CloseLocalHistoryPayload.java | 6 +++ .../persisted/CommitTransactionPayload.java | 16 ++++++++ .../persisted/CreateLocalHistoryPayload.java | 6 +++ .../persisted/DisableTrackingPayload.java | 6 +++ .../persisted/PurgeLocalHistoryPayload.java | 6 +++ .../persisted/PurgeTransactionPayload.java | 6 +++ .../persisted/SkipTransactionsPayload.java | 6 +++ 24 files changed, 179 insertions(+), 43 deletions(-) diff --git a/opendaylight/md-sal/sal-akka-raft-example/src/main/java/org/opendaylight/controller/cluster/example/messages/KeyValue.java b/opendaylight/md-sal/sal-akka-raft-example/src/main/java/org/opendaylight/controller/cluster/example/messages/KeyValue.java index 93121834fe..78eea5cd86 100644 --- a/opendaylight/md-sal/sal-akka-raft-example/src/main/java/org/opendaylight/controller/cluster/example/messages/KeyValue.java +++ b/opendaylight/md-sal/sal-akka-raft-example/src/main/java/org/opendaylight/controller/cluster/example/messages/KeyValue.java @@ -18,7 +18,7 @@ public final class KeyValue extends Payload { public KeyValue() { } - public KeyValue(String key, String value) { + public KeyValue(final String key, final String value) { this.key = key; this.value = value; } @@ -32,13 +32,19 @@ public final class KeyValue extends Payload { } @Override - public String toString() { - return "KeyValue{" + "key='" + key + '\'' + ", value='" + value + '\'' + '}'; + public int size() { + return value.length() + key.length(); } @Override - public int size() { - return value.length() + key.length(); + public int serializedSize() { + // Should be a better estimate + return size(); + } + + @Override + public String toString() { + return "KeyValue{" + "key='" + key + '\'' + ", value='" + value + '\'' + '}'; } @Override diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/AbstractReplicatedLogImpl.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/AbstractReplicatedLogImpl.java index 64506ee686..53d317fba1 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/AbstractReplicatedLogImpl.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/AbstractReplicatedLogImpl.java @@ -43,7 +43,7 @@ public abstract class AbstractReplicatedLogImpl implements ReplicatedLog { this.snapshotTerm = snapshotTerm; this.logContext = logContext; - this.journal = new ArrayList<>(unAppliedEntries.size()); + journal = new ArrayList<>(unAppliedEntries.size()); for (ReplicatedLogEntry entry: unAppliedEntries) { append(entry); } @@ -168,7 +168,7 @@ public abstract class AbstractReplicatedLogImpl implements ReplicatedLog { long totalSize = 0; for (int i = fromIndex; i < toIndex; i++) { ReplicatedLogEntry entry = journal.get(i); - totalSize += entry.size(); + totalSize += entry.serializedSize(); if (totalSize <= maxDataSize) { retList.add(entry); } else { diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/ReplicatedLogEntry.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/ReplicatedLogEntry.java index 1f08523acd..360f6b6903 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/ReplicatedLogEntry.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/ReplicatedLogEntry.java @@ -42,6 +42,15 @@ public interface ReplicatedLogEntry { */ int size(); + /** + * Return the estimate of serialized size of this entry when passed through serialization. The estimate needs to + * be reasonably accurate and should err on the side of caution and report a slightly-higher size in face of + * uncertainty. + * + * @return An estimate of serialized size. + */ + int serializedSize(); + /** * Checks if persistence is pending for this entry. * diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractLeader.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractLeader.java index 71b21799e4..0188a6df1a 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractLeader.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractLeader.java @@ -781,7 +781,7 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior { // If the first entry's size exceeds the max data size threshold, it will be returned from the call above. If // that is the case, then we need to slice it into smaller chunks. - if (entries.size() != 1 || entries.get(0).getData().size() <= maxDataSize) { + if (entries.size() != 1 || entries.get(0).getData().serializedSize() <= maxDataSize) { // Don't need to slice. return entries; } diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/messages/Payload.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/messages/Payload.java index 78a249749d..bda1d1e22c 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/messages/Payload.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/messages/Payload.java @@ -26,6 +26,15 @@ public abstract class Payload implements Serializable { */ public abstract int size(); + /** + * Return the estimate of serialized size of this payload when passed through serialization. The estimate needs to + * be reasonably accurate and should err on the side of caution and report a slightly-higher size in face of + * uncertainty. + * + * @return An estimate of serialized size. + */ + public abstract int serializedSize(); + /** * Return the serialization proxy for this object. * diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/persisted/NoopPayload.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/persisted/NoopPayload.java index 800ea45d15..e190bf0919 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/persisted/NoopPayload.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/persisted/NoopPayload.java @@ -9,6 +9,7 @@ package org.opendaylight.controller.cluster.raft.persisted; import akka.dispatch.ControlMessage; import java.io.Serializable; +import org.apache.commons.lang3.SerializationUtils; import org.opendaylight.controller.cluster.raft.messages.Payload; /** @@ -31,8 +32,11 @@ public final class NoopPayload extends Payload implements ControlMessage { private static final long serialVersionUID = 1L; private static final Proxy PROXY = new Proxy(); + // Estimate to how big the proxy is. Note this includes object stream overhead, so it is a bit conservative + private static final int PROXY_SIZE = SerializationUtils.serialize(PROXY).length; private NoopPayload() { + // Hidden on purpose } @Override @@ -40,6 +44,11 @@ public final class NoopPayload extends Payload implements ControlMessage { return 0; } + @Override + public int serializedSize() { + return PROXY_SIZE; + } + @Override protected Object writeReplace() { return PROXY; diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/persisted/ServerConfigurationPayload.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/persisted/ServerConfigurationPayload.java index eb668175d6..68a5945b9f 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/persisted/ServerConfigurationPayload.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/persisted/ServerConfigurationPayload.java @@ -89,6 +89,11 @@ public final class ServerConfigurationPayload extends Payload implements Persist @Override public int size() { + return serializedSize(); + } + + @Override + public int serializedSize() { if (serializedSize < 0) { try (ByteArrayOutputStream bos = new ByteArrayOutputStream()) { try (ObjectOutputStream out = new ObjectOutputStream(bos)) { diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/persisted/SimpleReplicatedLogEntry.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/persisted/SimpleReplicatedLogEntry.java index 10bbf35bc3..56a81f57f3 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/persisted/SimpleReplicatedLogEntry.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/persisted/SimpleReplicatedLogEntry.java @@ -14,6 +14,7 @@ import java.io.IOException; import java.io.ObjectInput; import java.io.ObjectOutput; import java.io.Serializable; +import org.apache.commons.lang3.SerializationUtils; import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry; import org.opendaylight.controller.cluster.raft.messages.Payload; @@ -26,7 +27,9 @@ public final class SimpleReplicatedLogEntry implements ReplicatedLogEntry, Seria private static final class Proxy implements Externalizable { private static final long serialVersionUID = 1L; - private ReplicatedLogEntry replicatedLogEntry; + private long index; + private long term; + private Payload data; // checkstyle flags the public modifier as redundant which really doesn't make sense since it clearly isn't // redundant. It is explicitly needed for Java serialization to be able to create instances via reflection. @@ -35,33 +38,34 @@ public final class SimpleReplicatedLogEntry implements ReplicatedLogEntry, Seria // For Externalizable } - Proxy(final ReplicatedLogEntry replicatedLogEntry) { - this.replicatedLogEntry = replicatedLogEntry; - } - - static int estimatedSerializedSize(final ReplicatedLogEntry replicatedLogEntry) { - return 8 /* index */ + 8 /* term */ + replicatedLogEntry.getData().size() - + 400 /* estimated extra padding for class info */; + Proxy(final SimpleReplicatedLogEntry replicatedLogEntry) { + index = replicatedLogEntry.getIndex(); + term = replicatedLogEntry.getTerm(); + data = replicatedLogEntry.getData(); } @Override public void writeExternal(final ObjectOutput out) throws IOException { - out.writeLong(replicatedLogEntry.getIndex()); - out.writeLong(replicatedLogEntry.getTerm()); - out.writeObject(replicatedLogEntry.getData()); + out.writeLong(index); + out.writeLong(term); + out.writeObject(data); } @Override public void readExternal(final ObjectInput in) throws IOException, ClassNotFoundException { - replicatedLogEntry = new SimpleReplicatedLogEntry(in.readLong(), in.readLong(), (Payload) in.readObject()); + index = in.readLong(); + term = in.readLong(); + data = (Payload) in.readObject(); } private Object readResolve() { - return replicatedLogEntry; + return new SimpleReplicatedLogEntry(index, term, data); } } private static final long serialVersionUID = 1L; + // Estimate to how big the proxy is. Note this includes object stream overhead, so it is a bit conservative + private static final int PROXY_SIZE = SerializationUtils.serialize(new Proxy()).length; private final long index; private final long term; @@ -98,7 +102,12 @@ public final class SimpleReplicatedLogEntry implements ReplicatedLogEntry, Seria @Override public int size() { - return getData().size(); + return payload.size(); + } + + @Override + public int serializedSize() { + return PROXY_SIZE + payload.serializedSize(); } @Override @@ -115,10 +124,6 @@ public final class SimpleReplicatedLogEntry implements ReplicatedLogEntry, Seria return new Proxy(this); } - public int estimatedSerializedSize() { - return Proxy.estimatedSerializedSize(this); - } - @Override public int hashCode() { final int prime = 31; diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/persisted/SimpleReplicatedLogEntrySerializer.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/persisted/SimpleReplicatedLogEntrySerializer.java index ca6e6dff30..f1c2fea4a6 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/persisted/SimpleReplicatedLogEntrySerializer.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/persisted/SimpleReplicatedLogEntrySerializer.java @@ -49,7 +49,7 @@ public class SimpleReplicatedLogEntrySerializer extends JSerializer { checkArgument(obj instanceof SimpleReplicatedLogEntry, "Unsupported object type %s", obj.getClass()); SimpleReplicatedLogEntry replicatedLogEntry = (SimpleReplicatedLogEntry)obj; - final int estimatedSerializedSize = replicatedLogEntry.estimatedSerializedSize(); + final int estimatedSerializedSize = replicatedLogEntry.serializedSize(); final ByteArrayOutputStream bos = new ByteArrayOutputStream(estimatedSerializedSize); SerializationUtils.serialize(replicatedLogEntry, bos); diff --git a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/AbstractRaftActorIntegrationTest.java b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/AbstractRaftActorIntegrationTest.java index 1e4859254f..5be865f899 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/AbstractRaftActorIntegrationTest.java +++ b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/AbstractRaftActorIntegrationTest.java @@ -126,7 +126,7 @@ public abstract class AbstractRaftActorIntegrationTest extends AbstractActorTest TestRaftActor(final Builder builder) { super(builder); - this.collectorActor = builder.collectorActor; + collectorActor = builder.collectorActor; } public void startDropMessages(final Class msgClass) { @@ -148,8 +148,7 @@ public abstract class AbstractRaftActorIntegrationTest extends AbstractActorTest @SuppressWarnings({ "rawtypes", "unchecked", "checkstyle:IllegalCatch" }) @Override public void handleCommand(final Object message) { - if (message instanceof MockPayload) { - MockPayload payload = (MockPayload) message; + if (message instanceof MockPayload payload) { super.persistData(collectorActor, new MockIdentifier(payload.toString()), payload, false); return; } @@ -214,13 +213,14 @@ public abstract class AbstractRaftActorIntegrationTest extends AbstractActorTest } public Builder collectorActor(final ActorRef newCollectorActor) { - this.collectorActor = newCollectorActor; + collectorActor = newCollectorActor; return this; } } } - protected static final int SNAPSHOT_CHUNK_SIZE = 100; + // FIXME: this is an arbitrary limit. Document interactions and/or improve them to improve maintainability + protected static final int SNAPSHOT_CHUNK_SIZE = 700; protected final Logger testLog = LoggerFactory.getLogger(getClass()); diff --git a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/AbstractReplicatedLogImplTest.java b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/AbstractReplicatedLogImplTest.java index 983b26da9c..65ac83d0d0 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/AbstractReplicatedLogImplTest.java +++ b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/AbstractReplicatedLogImplTest.java @@ -145,25 +145,32 @@ public class AbstractReplicatedLogImplTest { from = replicatedLogImpl.getFrom(0, 20, ReplicatedLog.NO_MAX_SIZE); assertEquals(4, from.size()); assertEquals("A", from.get(0).getData().toString()); + assertEquals("B", from.get(1).getData().toString()); + assertEquals("C", from.get(2).getData().toString()); assertEquals("D", from.get(3).getData().toString()); + // Pre-calculate sizing information for use with capping + final int sizeB = from.get(1).serializedSize(); + final int sizeC = from.get(2).serializedSize(); + final int sizeD = from.get(3).serializedSize(); + from = replicatedLogImpl.getFrom(1, 2, ReplicatedLog.NO_MAX_SIZE); assertEquals(2, from.size()); assertEquals("B", from.get(0).getData().toString()); assertEquals("C", from.get(1).getData().toString()); - from = replicatedLogImpl.getFrom(1, 3, 2); + from = replicatedLogImpl.getFrom(1, 3, sizeB + sizeC); assertEquals(2, from.size()); assertEquals("B", from.get(0).getData().toString()); assertEquals("C", from.get(1).getData().toString()); - from = replicatedLogImpl.getFrom(1, 3, 3); + from = replicatedLogImpl.getFrom(1, 3, sizeB + sizeC + sizeD); assertEquals(3, from.size()); assertEquals("B", from.get(0).getData().toString()); assertEquals("C", from.get(1).getData().toString()); assertEquals("D", from.get(2).getData().toString()); - from = replicatedLogImpl.getFrom(1, 2, 3); + from = replicatedLogImpl.getFrom(1, 2, sizeB + sizeC + sizeD); assertEquals(2, from.size()); assertEquals("B", from.get(0).getData().toString()); assertEquals("C", from.get(1).getData().toString()); diff --git a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/MockRaftActorContext.java b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/MockRaftActorContext.java index 0d4007913d..6d4ec22e3d 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/MockRaftActorContext.java +++ b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/MockRaftActorContext.java @@ -227,6 +227,11 @@ public class MockRaftActorContext extends RaftActorContextImpl { return size; } + @Override + public int serializedSize() { + return size; + } + @Override public String toString() { return data; diff --git a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/RaftActorDelegatingPersistentDataProviderTest.java b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/RaftActorDelegatingPersistentDataProviderTest.java index 33247d3f0c..9dedc67e68 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/RaftActorDelegatingPersistentDataProviderTest.java +++ b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/RaftActorDelegatingPersistentDataProviderTest.java @@ -105,6 +105,11 @@ public class RaftActorDelegatingPersistentDataProviderTest { return 0; } + @Override + public int serializedSize() { + return 0; + } + @Override protected Object writeReplace() { // Not needed diff --git a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/ReplicationAndSnapshotsWithLaggingFollowerIntegrationTest.java b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/ReplicationAndSnapshotsWithLaggingFollowerIntegrationTest.java index d6a53a0aee..00147a3c0e 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/ReplicationAndSnapshotsWithLaggingFollowerIntegrationTest.java +++ b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/ReplicationAndSnapshotsWithLaggingFollowerIntegrationTest.java @@ -452,7 +452,7 @@ public class ReplicationAndSnapshotsWithLaggingFollowerIntegrationTest extends A setupFollower2(); - MessageCollectorActor.expectMatching(follower2CollectorActor, InstallSnapshot.class, 5); + MessageCollectorActor.expectMatching(follower2CollectorActor, InstallSnapshot.class, 1); follower2Actor.stop(); @@ -613,8 +613,8 @@ public class ReplicationAndSnapshotsWithLaggingFollowerIntegrationTest extends A /** * Resume the lagging follower 2 and verify it receives an install snapshot from the leader. */ - private void verifyInstallSnapshotToLaggingFollower(long lastAppliedIndex, - @Nullable ServerConfigurationPayload expServerConfig) { + private void verifyInstallSnapshotToLaggingFollower(final long lastAppliedIndex, + final @Nullable ServerConfigurationPayload expServerConfig) { testLog.info("verifyInstallSnapshotToLaggingFollower starting"); MessageCollectorActor.clearMessages(leaderCollectorActor); @@ -811,8 +811,8 @@ public class ReplicationAndSnapshotsWithLaggingFollowerIntegrationTest extends A /** * Kill the leader actor, reinstate it and verify the recovered journal. */ - private void verifyLeaderRecoveryAfterReinstatement(long lastIndex, long snapshotIndex, - long firstJournalEntryIndex) { + private void verifyLeaderRecoveryAfterReinstatement(final long lastIndex, final long snapshotIndex, + final long firstJournalEntryIndex) { testLog.info("verifyLeaderRecoveryAfterReinstatement starting: lastIndex: {}, snapshotIndex: {}, " + "firstJournalEntryIndex: {}", lastIndex, snapshotIndex, firstJournalEntryIndex); @@ -845,7 +845,7 @@ public class ReplicationAndSnapshotsWithLaggingFollowerIntegrationTest extends A testLog.info("verifyLeaderRecoveryAfterReinstatement ending"); } - private void sendInitialPayloadsReplicatedToAllFollowers(String... data) { + private void sendInitialPayloadsReplicatedToAllFollowers(final String... data) { // Send the payloads. for (String d: data) { diff --git a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/LeaderTest.java b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/LeaderTest.java index e10f0489e3..f847dbce9e 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/LeaderTest.java +++ b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/LeaderTest.java @@ -1793,7 +1793,8 @@ public class LeaderTest extends AbstractLeaderTest { MockRaftActorContext leaderActorContext = createActorContextWithFollower(); ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval( new FiniteDuration(1000, TimeUnit.SECONDS)); - ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setSnapshotChunkSize(2); + // Note: the size here depends on estimate + ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setSnapshotChunkSize(246); leaderActorContext.setReplicatedLog( new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 4, 1).build()); @@ -2462,7 +2463,7 @@ public class LeaderTest extends AbstractLeaderTest { assertEquals("New votedFor", null, actorContext.getTermInformation().getVotedFor()); } - private class MockConfigParamsImpl extends DefaultConfigParamsImpl { + private static class MockConfigParamsImpl extends DefaultConfigParamsImpl { private final long electionTimeOutIntervalMillis; private final int snapshotChunkSize; diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/persisted/AbortTransactionPayload.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/persisted/AbortTransactionPayload.java index 0931b91ac7..793f2ea062 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/persisted/AbortTransactionPayload.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/persisted/AbortTransactionPayload.java @@ -49,6 +49,7 @@ public final class AbortTransactionPayload extends AbstractIdentifiablePayload extends return serialized.length; } + @Override + public final int serializedSize() { + // TODO: this is not entirely accurate, as the serialization stream has additional overheads: + // - 3 bytes for each block of data <256 bytes + // - 5 bytes for each block of data >=256 bytes + // - each block of data is limited to 1024 bytes as per serialization spec + return size() + externalizableProxySize(); + } + @Override public final String toString() { return MoreObjects.toStringHelper(this).add("identifier", identifier).add("size", size()).toString(); @@ -100,4 +111,10 @@ public abstract class AbstractIdentifiablePayload extends @SuppressWarnings("checkstyle:hiddenField") protected abstract @NonNull AbstractProxy externalizableProxy(byte @NonNull[] serialized); + + protected abstract int externalizableProxySize(); + + protected static final int externalizableProxySize(final Function> constructor) { + return SerializationUtils.serialize(constructor.apply(new byte[0])).length; + } } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/persisted/CloseLocalHistoryPayload.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/persisted/CloseLocalHistoryPayload.java index 1921ff889f..c592513673 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/persisted/CloseLocalHistoryPayload.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/persisted/CloseLocalHistoryPayload.java @@ -49,6 +49,7 @@ public final class CloseLocalHistoryPayload extends AbstractIdentifiablePayload< private static final Logger LOG = LoggerFactory.getLogger(CloseLocalHistoryPayload.class); private static final long serialVersionUID = 1L; + private static final int PROXY_SIZE = externalizableProxySize(Proxy::new); CloseLocalHistoryPayload(final LocalHistoryIdentifier historyId, final byte[] serialized) { super(historyId, serialized); @@ -71,4 +72,9 @@ public final class CloseLocalHistoryPayload extends AbstractIdentifiablePayload< protected Proxy externalizableProxy(final byte[] serialized) { return new Proxy(serialized); } + + @Override + protected int externalizableProxySize() { + return PROXY_SIZE; + } } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/persisted/CommitTransactionPayload.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/persisted/CommitTransactionPayload.java index accb9ef097..d4d08a7292 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/persisted/CommitTransactionPayload.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/persisted/CommitTransactionPayload.java @@ -27,6 +27,7 @@ import java.io.Serializable; import java.io.StreamCorruptedException; import java.util.AbstractMap.SimpleImmutableEntry; import java.util.Map.Entry; +import org.apache.commons.lang3.SerializationUtils; import org.eclipse.jdt.annotation.NonNull; import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier; import org.opendaylight.controller.cluster.datastore.persisted.DataTreeCandidateInputOutput.DataTreeCandidateWithVersion; @@ -116,6 +117,12 @@ public abstract class CommitTransactionPayload extends IdentifiablePayload