public KeyValue() {
}
- public KeyValue(String key, String value) {
+ public KeyValue(final String key, final String value) {
this.key = key;
this.value = value;
}
}
@Override
- public String toString() {
- return "KeyValue{" + "key='" + key + '\'' + ", value='" + value + '\'' + '}';
+ public int size() {
+ return value.length() + key.length();
}
@Override
- public int size() {
- return value.length() + key.length();
+ public int serializedSize() {
+ // Should be a better estimate
+ return size();
+ }
+
+ @Override
+ public String toString() {
+ return "KeyValue{" + "key='" + key + '\'' + ", value='" + value + '\'' + '}';
}
@Override
this.snapshotTerm = snapshotTerm;
this.logContext = logContext;
- this.journal = new ArrayList<>(unAppliedEntries.size());
+ journal = new ArrayList<>(unAppliedEntries.size());
for (ReplicatedLogEntry entry: unAppliedEntries) {
append(entry);
}
long totalSize = 0;
for (int i = fromIndex; i < toIndex; i++) {
ReplicatedLogEntry entry = journal.get(i);
- totalSize += entry.size();
+ totalSize += entry.serializedSize();
if (totalSize <= maxDataSize) {
retList.add(entry);
} else {
*/
int size();
+ /**
+ * Return the estimate of serialized size of this entry when passed through serialization. The estimate needs to
+ * be reasonably accurate and should err on the side of caution and report a slightly-higher size in face of
+ * uncertainty.
+ *
+ * @return An estimate of serialized size.
+ */
+ int serializedSize();
+
/**
* Checks if persistence is pending for this entry.
*
// If the first entry's size exceeds the max data size threshold, it will be returned from the call above. If
// that is the case, then we need to slice it into smaller chunks.
- if (entries.size() != 1 || entries.get(0).getData().size() <= maxDataSize) {
+ if (entries.size() != 1 || entries.get(0).getData().serializedSize() <= maxDataSize) {
// Don't need to slice.
return entries;
}
*/
public abstract int size();
+ /**
+ * Return the estimate of serialized size of this payload when passed through serialization. The estimate needs to
+ * be reasonably accurate and should err on the side of caution and report a slightly-higher size in face of
+ * uncertainty.
+ *
+ * @return An estimate of serialized size.
+ */
+ public abstract int serializedSize();
+
/**
* Return the serialization proxy for this object.
*
import akka.dispatch.ControlMessage;
import java.io.Serializable;
+import org.apache.commons.lang3.SerializationUtils;
import org.opendaylight.controller.cluster.raft.messages.Payload;
/**
private static final long serialVersionUID = 1L;
private static final Proxy PROXY = new Proxy();
+ // Estimate to how big the proxy is. Note this includes object stream overhead, so it is a bit conservative
+ private static final int PROXY_SIZE = SerializationUtils.serialize(PROXY).length;
private NoopPayload() {
+ // Hidden on purpose
}
@Override
return 0;
}
+ @Override
+ public int serializedSize() {
+ return PROXY_SIZE;
+ }
+
@Override
protected Object writeReplace() {
return PROXY;
@Override
public int size() {
+ return serializedSize();
+ }
+
+ @Override
+ public int serializedSize() {
if (serializedSize < 0) {
try (ByteArrayOutputStream bos = new ByteArrayOutputStream()) {
try (ObjectOutputStream out = new ObjectOutputStream(bos)) {
import java.io.ObjectInput;
import java.io.ObjectOutput;
import java.io.Serializable;
+import org.apache.commons.lang3.SerializationUtils;
import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
import org.opendaylight.controller.cluster.raft.messages.Payload;
private static final class Proxy implements Externalizable {
private static final long serialVersionUID = 1L;
- private ReplicatedLogEntry replicatedLogEntry;
+ private long index;
+ private long term;
+ private Payload data;
// checkstyle flags the public modifier as redundant which really doesn't make sense since it clearly isn't
// redundant. It is explicitly needed for Java serialization to be able to create instances via reflection.
// For Externalizable
}
- Proxy(final ReplicatedLogEntry replicatedLogEntry) {
- this.replicatedLogEntry = replicatedLogEntry;
- }
-
- static int estimatedSerializedSize(final ReplicatedLogEntry replicatedLogEntry) {
- return 8 /* index */ + 8 /* term */ + replicatedLogEntry.getData().size()
- + 400 /* estimated extra padding for class info */;
+ Proxy(final SimpleReplicatedLogEntry replicatedLogEntry) {
+ index = replicatedLogEntry.getIndex();
+ term = replicatedLogEntry.getTerm();
+ data = replicatedLogEntry.getData();
}
@Override
public void writeExternal(final ObjectOutput out) throws IOException {
- out.writeLong(replicatedLogEntry.getIndex());
- out.writeLong(replicatedLogEntry.getTerm());
- out.writeObject(replicatedLogEntry.getData());
+ out.writeLong(index);
+ out.writeLong(term);
+ out.writeObject(data);
}
@Override
public void readExternal(final ObjectInput in) throws IOException, ClassNotFoundException {
- replicatedLogEntry = new SimpleReplicatedLogEntry(in.readLong(), in.readLong(), (Payload) in.readObject());
+ index = in.readLong();
+ term = in.readLong();
+ data = (Payload) in.readObject();
}
private Object readResolve() {
- return replicatedLogEntry;
+ return new SimpleReplicatedLogEntry(index, term, data);
}
}
private static final long serialVersionUID = 1L;
+ // Estimate to how big the proxy is. Note this includes object stream overhead, so it is a bit conservative
+ private static final int PROXY_SIZE = SerializationUtils.serialize(new Proxy()).length;
private final long index;
private final long term;
@Override
public int size() {
- return getData().size();
+ return payload.size();
+ }
+
+ @Override
+ public int serializedSize() {
+ return PROXY_SIZE + payload.serializedSize();
}
@Override
return new Proxy(this);
}
- public int estimatedSerializedSize() {
- return Proxy.estimatedSerializedSize(this);
- }
-
@Override
public int hashCode() {
final int prime = 31;
checkArgument(obj instanceof SimpleReplicatedLogEntry, "Unsupported object type %s", obj.getClass());
SimpleReplicatedLogEntry replicatedLogEntry = (SimpleReplicatedLogEntry)obj;
- final int estimatedSerializedSize = replicatedLogEntry.estimatedSerializedSize();
+ final int estimatedSerializedSize = replicatedLogEntry.serializedSize();
final ByteArrayOutputStream bos = new ByteArrayOutputStream(estimatedSerializedSize);
SerializationUtils.serialize(replicatedLogEntry, bos);
TestRaftActor(final Builder builder) {
super(builder);
- this.collectorActor = builder.collectorActor;
+ collectorActor = builder.collectorActor;
}
public void startDropMessages(final Class<?> msgClass) {
@SuppressWarnings({ "rawtypes", "unchecked", "checkstyle:IllegalCatch" })
@Override
public void handleCommand(final Object message) {
- if (message instanceof MockPayload) {
- MockPayload payload = (MockPayload) message;
+ if (message instanceof MockPayload payload) {
super.persistData(collectorActor, new MockIdentifier(payload.toString()), payload, false);
return;
}
}
public Builder collectorActor(final ActorRef newCollectorActor) {
- this.collectorActor = newCollectorActor;
+ collectorActor = newCollectorActor;
return this;
}
}
}
- protected static final int SNAPSHOT_CHUNK_SIZE = 100;
+ // FIXME: this is an arbitrary limit. Document interactions and/or improve them to improve maintainability
+ protected static final int SNAPSHOT_CHUNK_SIZE = 700;
protected final Logger testLog = LoggerFactory.getLogger(getClass());
from = replicatedLogImpl.getFrom(0, 20, ReplicatedLog.NO_MAX_SIZE);
assertEquals(4, from.size());
assertEquals("A", from.get(0).getData().toString());
+ assertEquals("B", from.get(1).getData().toString());
+ assertEquals("C", from.get(2).getData().toString());
assertEquals("D", from.get(3).getData().toString());
+ // Pre-calculate sizing information for use with capping
+ final int sizeB = from.get(1).serializedSize();
+ final int sizeC = from.get(2).serializedSize();
+ final int sizeD = from.get(3).serializedSize();
+
from = replicatedLogImpl.getFrom(1, 2, ReplicatedLog.NO_MAX_SIZE);
assertEquals(2, from.size());
assertEquals("B", from.get(0).getData().toString());
assertEquals("C", from.get(1).getData().toString());
- from = replicatedLogImpl.getFrom(1, 3, 2);
+ from = replicatedLogImpl.getFrom(1, 3, sizeB + sizeC);
assertEquals(2, from.size());
assertEquals("B", from.get(0).getData().toString());
assertEquals("C", from.get(1).getData().toString());
- from = replicatedLogImpl.getFrom(1, 3, 3);
+ from = replicatedLogImpl.getFrom(1, 3, sizeB + sizeC + sizeD);
assertEquals(3, from.size());
assertEquals("B", from.get(0).getData().toString());
assertEquals("C", from.get(1).getData().toString());
assertEquals("D", from.get(2).getData().toString());
- from = replicatedLogImpl.getFrom(1, 2, 3);
+ from = replicatedLogImpl.getFrom(1, 2, sizeB + sizeC + sizeD);
assertEquals(2, from.size());
assertEquals("B", from.get(0).getData().toString());
assertEquals("C", from.get(1).getData().toString());
return size;
}
+ @Override
+ public int serializedSize() {
+ return size;
+ }
+
@Override
public String toString() {
return data;
return 0;
}
+ @Override
+ public int serializedSize() {
+ return 0;
+ }
+
@Override
protected Object writeReplace() {
// Not needed
setupFollower2();
- MessageCollectorActor.expectMatching(follower2CollectorActor, InstallSnapshot.class, 5);
+ MessageCollectorActor.expectMatching(follower2CollectorActor, InstallSnapshot.class, 1);
follower2Actor.stop();
/**
* Resume the lagging follower 2 and verify it receives an install snapshot from the leader.
*/
- private void verifyInstallSnapshotToLaggingFollower(long lastAppliedIndex,
- @Nullable ServerConfigurationPayload expServerConfig) {
+ private void verifyInstallSnapshotToLaggingFollower(final long lastAppliedIndex,
+ final @Nullable ServerConfigurationPayload expServerConfig) {
testLog.info("verifyInstallSnapshotToLaggingFollower starting");
MessageCollectorActor.clearMessages(leaderCollectorActor);
/**
* Kill the leader actor, reinstate it and verify the recovered journal.
*/
- private void verifyLeaderRecoveryAfterReinstatement(long lastIndex, long snapshotIndex,
- long firstJournalEntryIndex) {
+ private void verifyLeaderRecoveryAfterReinstatement(final long lastIndex, final long snapshotIndex,
+ final long firstJournalEntryIndex) {
testLog.info("verifyLeaderRecoveryAfterReinstatement starting: lastIndex: {}, snapshotIndex: {}, "
+ "firstJournalEntryIndex: {}", lastIndex, snapshotIndex, firstJournalEntryIndex);
testLog.info("verifyLeaderRecoveryAfterReinstatement ending");
}
- private void sendInitialPayloadsReplicatedToAllFollowers(String... data) {
+ private void sendInitialPayloadsReplicatedToAllFollowers(final String... data) {
// Send the payloads.
for (String d: data) {
MockRaftActorContext leaderActorContext = createActorContextWithFollower();
((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
new FiniteDuration(1000, TimeUnit.SECONDS));
- ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setSnapshotChunkSize(2);
+ // Note: the size here depends on estimate
+ ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setSnapshotChunkSize(246);
leaderActorContext.setReplicatedLog(
new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 4, 1).build());
assertEquals("New votedFor", null, actorContext.getTermInformation().getVotedFor());
}
- private class MockConfigParamsImpl extends DefaultConfigParamsImpl {
+ private static class MockConfigParamsImpl extends DefaultConfigParamsImpl {
private final long electionTimeOutIntervalMillis;
private final int snapshotChunkSize;
private static final Logger LOG = LoggerFactory.getLogger(AbortTransactionPayload.class);
private static final long serialVersionUID = 1L;
+ private static final int PROXY_SIZE = externalizableProxySize(Proxy::new);
AbortTransactionPayload(final TransactionIdentifier transactionId, final byte[] serialized) {
super(transactionId, serialized);
protected Proxy externalizableProxy(final byte[] serialized) {
return new Proxy(serialized);
}
+
+ @Override
+ protected int externalizableProxySize() {
+ return PROXY_SIZE;
+ }
}
import java.io.ObjectInput;
import java.io.ObjectOutput;
import java.io.Serializable;
+import java.util.function.Function;
+import org.apache.commons.lang3.SerializationUtils;
import org.eclipse.jdt.annotation.NonNull;
import org.opendaylight.controller.cluster.raft.messages.IdentifiablePayload;
import org.opendaylight.yangtools.concepts.Identifiable;
return serialized.length;
}
+ @Override
+ public final int serializedSize() {
+ // TODO: this is not entirely accurate, as the serialization stream has additional overheads:
+ // - 3 bytes for each block of data <256 bytes
+ // - 5 bytes for each block of data >=256 bytes
+ // - each block of data is limited to 1024 bytes as per serialization spec
+ return size() + externalizableProxySize();
+ }
+
@Override
public final String toString() {
return MoreObjects.toStringHelper(this).add("identifier", identifier).add("size", size()).toString();
@SuppressWarnings("checkstyle:hiddenField")
protected abstract @NonNull AbstractProxy<T> externalizableProxy(byte @NonNull[] serialized);
+
+ protected abstract int externalizableProxySize();
+
+ protected static final int externalizableProxySize(final Function<byte[], ? extends AbstractProxy<?>> constructor) {
+ return SerializationUtils.serialize(constructor.apply(new byte[0])).length;
+ }
}
private static final Logger LOG = LoggerFactory.getLogger(CloseLocalHistoryPayload.class);
private static final long serialVersionUID = 1L;
+ private static final int PROXY_SIZE = externalizableProxySize(Proxy::new);
CloseLocalHistoryPayload(final LocalHistoryIdentifier historyId, final byte[] serialized) {
super(historyId, serialized);
protected Proxy externalizableProxy(final byte[] serialized) {
return new Proxy(serialized);
}
+
+ @Override
+ protected int externalizableProxySize() {
+ return PROXY_SIZE;
+ }
}
import java.io.StreamCorruptedException;
import java.util.AbstractMap.SimpleImmutableEntry;
import java.util.Map.Entry;
+import org.apache.commons.lang3.SerializationUtils;
import org.eclipse.jdt.annotation.NonNull;
import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
import org.opendaylight.controller.cluster.datastore.persisted.DataTreeCandidateInputOutput.DataTreeCandidateWithVersion;
}
}
+ @Override
+ public final int serializedSize() {
+ // TODO: this is not entirely accurate as the the byte[] can be chunked by the serialization stream
+ return ProxySizeHolder.PROXY_SIZE + size();
+ }
+
/**
* The cached candidate needs to be cleared after it is done applying to the DataTree, otherwise it would be keeping
* deserialized in memory which are not needed anymore leading to wasted memory. This lets the payload know that
}
}
+ // Exists to break initialization dependency between CommitTransactionPayload/Simple/Proxy
+ private static final class ProxySizeHolder {
+ static final int PROXY_SIZE = SerializationUtils.serialize(new Proxy(new Simple(new byte[0]))).length;
+
+ private ProxySizeHolder() {
+ // Hidden on purpose
+ }
+ }
+
private static final class Proxy implements Externalizable {
private static final long serialVersionUID = 1L;
private static final Logger LOG = LoggerFactory.getLogger(CreateLocalHistoryPayload.class);
private static final long serialVersionUID = 1L;
+ private static final int PROXY_SIZE = externalizableProxySize(Proxy::new);
CreateLocalHistoryPayload(final LocalHistoryIdentifier historyId, final byte[] serialized) {
super(historyId, serialized);
protected Proxy externalizableProxy(final byte[] serialized) {
return new Proxy(serialized);
}
+
+ @Override
+ protected int externalizableProxySize() {
+ return PROXY_SIZE;
+ }
}
private static final Logger LOG = LoggerFactory.getLogger(DisableTrackingPayload.class);
private static final long serialVersionUID = 1L;
+ private static final int PROXY_SIZE = externalizableProxySize(Proxy::new);
DisableTrackingPayload(final ClientIdentifier clientId, final byte[] serialized) {
super(clientId, serialized);
protected Proxy externalizableProxy(final byte[] serialized) {
return new Proxy(serialized);
}
+
+ @Override
+ protected int externalizableProxySize() {
+ return PROXY_SIZE;
+ }
}
private static final Logger LOG = LoggerFactory.getLogger(PurgeLocalHistoryPayload.class);
private static final long serialVersionUID = 1L;
+ private static final int PROXY_SIZE = externalizableProxySize(Proxy::new);
PurgeLocalHistoryPayload(final LocalHistoryIdentifier historyId, final byte[] serialized) {
super(historyId, serialized);
protected Proxy externalizableProxy(final byte[] serialized) {
return new Proxy(serialized);
}
+
+ @Override
+ protected int externalizableProxySize() {
+ return PROXY_SIZE;
+ }
}
private static final Logger LOG = LoggerFactory.getLogger(PurgeTransactionPayload.class);
private static final long serialVersionUID = 1L;
+ private static final int PROXY_SIZE = externalizableProxySize(Proxy::new);
PurgeTransactionPayload(final TransactionIdentifier transactionId, final byte[] serialized) {
super(transactionId, serialized);
protected Proxy externalizableProxy(final byte[] serialized) {
return new Proxy(serialized);
}
+
+ @Override
+ protected int externalizableProxySize() {
+ return PROXY_SIZE;
+ }
}
private static final Logger LOG = LoggerFactory.getLogger(SkipTransactionsPayload.class);
private static final long serialVersionUID = 1L;
+ private static final int PROXY_SIZE = externalizableProxySize(Proxy::new);
@SuppressFBWarnings(value = "SE_BAD_FIELD", justification = "Handled via externalizable proxy")
private final @NonNull ImmutableUnsignedLongSet transactionIds;
protected Proxy externalizableProxy(final byte[] serialized) {
return new Proxy(serialized);
}
+
+ @Override
+ protected int externalizableProxySize() {
+ return PROXY_SIZE;
+ }
}