Since the NormalizedNodeStream format has changed, shard persisted
state is affected.
This patch bumps PayloadVersion and applies that bump to snapshots
and CommitTransactionPayload.
On recovery, a snapshot's need to migrate is reflected in its state,
and it is examined just as MigratedSerializables are.
CommitTransactionPayload is not examined, as understanding its
stream version would require deserializing at least its header.
JIRA: CONTROLLER-1888
Change-Id: I678527be4487ee1729123ba8b9dcd2269e6cf262
Signed-off-by: Robert Varga <robert.varga@pantheon.tech>
12 files changed:
import org.opendaylight.controller.cluster.raft.persisted.MigratedSerializable;
import org.opendaylight.controller.cluster.raft.persisted.ServerConfigurationPayload;
import org.opendaylight.controller.cluster.raft.persisted.Snapshot;
import org.opendaylight.controller.cluster.raft.persisted.MigratedSerializable;
import org.opendaylight.controller.cluster.raft.persisted.ServerConfigurationPayload;
import org.opendaylight.controller.cluster.raft.persisted.Snapshot;
+import org.opendaylight.controller.cluster.raft.persisted.Snapshot.State;
import org.opendaylight.controller.cluster.raft.persisted.UpdateElectionTerm;
import org.opendaylight.controller.cluster.raft.protobuff.client.messages.PersistentPayload;
import org.slf4j.Logger;
import org.opendaylight.controller.cluster.raft.persisted.UpdateElectionTerm;
import org.opendaylight.controller.cluster.raft.protobuff.client.messages.PersistentPayload;
import org.slf4j.Logger;
context.setCommitIndex(snapshot.getLastAppliedIndex());
context.getTermInformation().update(snapshot.getElectionTerm(), snapshot.getElectionVotedFor());
context.setCommitIndex(snapshot.getLastAppliedIndex());
context.getTermInformation().update(snapshot.getElectionTerm(), snapshot.getElectionVotedFor());
- Stopwatch timer = Stopwatch.createStarted();
+ final Stopwatch timer = Stopwatch.createStarted();
// Apply the snapshot to the actors state
// Apply the snapshot to the actors state
- if (!(snapshot.getState() instanceof EmptyState)) {
- cohort.applyRecoverySnapshot(snapshot.getState());
+ final State snapshotState = snapshot.getState();
+ if (snapshotState.needsMigration()) {
+ hasMigratedDataRecovered = true;
+ }
+ if (!(snapshotState instanceof EmptyState)) {
+ cohort.applyRecoverySnapshot(snapshotState);
}
if (snapshot.getServerConfiguration() != null) {
}
if (snapshot.getServerConfiguration() != null) {
timer.stop();
log.info("Recovery snapshot applied for {} in {}: snapshotIndex={}, snapshotTerm={}, journal-size={}",
timer.stop();
log.info("Recovery snapshot applied for {} in {}: snapshotIndex={}, snapshotTerm={}, journal-size={}",
- context.getId(), timer.toString(), replicatedLog().getSnapshotIndex(),
- replicatedLog().getSnapshotTerm(), replicatedLog().size());
+ context.getId(), timer, replicatedLog().getSnapshotIndex(), replicatedLog().getSnapshotTerm(),
+ replicatedLog().size());
}
private void onRecoveredJournalLogEntry(final ReplicatedLogEntry logEntry) {
}
private void onRecoveredJournalLogEntry(final ReplicatedLogEntry logEntry) {
* @author Thomas Pantelis
*/
public interface State extends Serializable {
* @author Thomas Pantelis
*/
public interface State extends Serializable {
+ /**
+ * Indicate whether the snapshot requires migration, i.e. a new snapshot should be created after recovery.
+ * Default implementation returns false, i.e. do not re-snapshot.
+ *
+ * @return True if complete recovery based upon this snapshot should trigger a new snapshot.
+ */
+ default boolean needsMigration() {
+ return false;
+ }
}
private static final class Proxy implements Externalizable {
}
private static final class Proxy implements Externalizable {
@Override
public State deserializeSnapshot(final ByteSource snapshotBytes) throws IOException {
try (ObjectInputStream in = new ObjectInputStream(snapshotBytes.openStream())) {
@Override
public State deserializeSnapshot(final ByteSource snapshotBytes) throws IOException {
try (ObjectInputStream in = new ObjectInputStream(snapshotBytes.openStream())) {
- return new ShardSnapshotState(ShardDataTreeSnapshot.deserialize(in));
+ return ShardDataTreeSnapshot.deserialize(in);
private static final Logger LOG = LoggerFactory.getLogger(AbstractVersionedShardDataTreeSnapshot.class);
@SuppressWarnings("checkstyle:FallThrough")
private static final Logger LOG = LoggerFactory.getLogger(AbstractVersionedShardDataTreeSnapshot.class);
@SuppressWarnings("checkstyle:FallThrough")
- static ShardDataTreeSnapshot versionedDeserialize(final ObjectInput in) throws IOException {
+ static @NonNull ShardSnapshotState versionedDeserialize(final ObjectInput in) throws IOException {
final PayloadVersion version = PayloadVersion.readFrom(in);
switch (version) {
case BORON:
final PayloadVersion version = PayloadVersion.readFrom(in);
switch (version) {
case BORON:
- // Boron snapshots use Java Serialization
- try {
- return (ShardDataTreeSnapshot) in.readObject();
- } catch (ClassNotFoundException e) {
- LOG.error("Failed to serialize data tree snapshot", e);
- throw new IOException("Snapshot failed to deserialize", e);
- }
+ return new ShardSnapshotState(readSnapshot(in), true);
+ case SODIUM:
+ return new ShardSnapshotState(readSnapshot(in), false);
case TEST_FUTURE_VERSION:
case TEST_PAST_VERSION:
// These versions are never returned and this code is effectively dead
case TEST_FUTURE_VERSION:
case TEST_PAST_VERSION:
// These versions are never returned and this code is effectively dead
- throw new IOException("Invalid payload version in snapshot");
+ // Not included as default in above switch to ensure we get warnings when new versions are added
+ throw new IOException("Encountered unhandled version" + version);
- // Not included as default in above switch to ensure we get warnings when new versions are added
- throw new IOException("Encountered unhandled version" + version);
+ // Boron and Sodium snapshots use Java Serialization, but differ in stream format
+ private static @NonNull ShardDataTreeSnapshot readSnapshot(final ObjectInput in) throws IOException {
+ try {
+ return (ShardDataTreeSnapshot) in.readObject();
+ } catch (ClassNotFoundException e) {
+ LOG.error("Failed to serialize data tree snapshot", e);
+ throw new IOException("Snapshot failed to deserialize", e);
+ }
private void versionedSerialize(final ObjectOutput out, final PayloadVersion version) throws IOException {
switch (version) {
case BORON:
private void versionedSerialize(final ObjectOutput out, final PayloadVersion version) throws IOException {
switch (version) {
case BORON:
- // Boron snapshots use Java Serialization
+ case SODIUM:
+ // Boron and Sodium snapshots use Java Serialization, but differ in stream format
out.writeObject(this);
return;
case TEST_FUTURE_VERSION:
out.writeObject(this);
return;
case TEST_FUTURE_VERSION:
return DataTreeCandidates.newDataTreeCandidate(rootPath, rootNode);
}
return DataTreeCandidates.newDataTreeCandidate(rootPath, rootNode);
}
private static void writeChildren(final NormalizedNodeDataOutput out,
final Collection<DataTreeCandidateNode> children) throws IOException {
out.writeInt(children.size());
private static void writeChildren(final NormalizedNodeDataOutput out,
final Collection<DataTreeCandidateNode> children) throws IOException {
out.writeInt(children.size());
public static void writeDataTreeCandidate(final DataOutput out, final DataTreeCandidate candidate)
throws IOException {
public static void writeDataTreeCandidate(final DataOutput out, final DataTreeCandidate candidate)
throws IOException {
- try (NormalizedNodeDataOutput writer = NormalizedNodeInputOutput.newDataOutput(out)) {
+ try (NormalizedNodeDataOutput writer = NormalizedNodeInputOutput.newDataOutput(out,
+ PayloadVersion.current().getStreamVersion())) {
writer.writeYangInstanceIdentifier(candidate.getRootPath());
final DataTreeCandidateNode node = candidate.getRootNode();
writer.writeYangInstanceIdentifier(candidate.getRootPath());
final DataTreeCandidateNode node = candidate.getRootNode();
*/
package org.opendaylight.controller.cluster.datastore.persisted;
*/
package org.opendaylight.controller.cluster.datastore.persisted;
+import static com.google.common.base.Preconditions.checkArgument;
+import static java.util.Objects.requireNonNull;
+
import com.google.common.annotations.Beta;
import com.google.common.base.MoreObjects;
import com.google.common.annotations.Beta;
import com.google.common.base.MoreObjects;
-import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableMap.Builder;
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableMap.Builder;
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
@Override
public void readExternal(final ObjectInput in) throws IOException, ClassNotFoundException {
final int metaSize = in.readInt();
@Override
public void readExternal(final ObjectInput in) throws IOException, ClassNotFoundException {
final int metaSize = in.readInt();
- Preconditions.checkArgument(metaSize >= 0, "Invalid negative metadata map length %s", metaSize);
+ checkArgument(metaSize >= 0, "Invalid negative metadata map length %s", metaSize);
// Default pre-allocate is 4, which should be fine
final Builder<Class<? extends ShardDataTreeSnapshotMetadata<?>>, ShardDataTreeSnapshotMetadata<?>>
// Default pre-allocate is 4, which should be fine
final Builder<Class<? extends ShardDataTreeSnapshotMetadata<?>>, ShardDataTreeSnapshotMetadata<?>>
public MetadataShardDataTreeSnapshot(final NormalizedNode<?, ?> rootNode,
final Map<Class<? extends ShardDataTreeSnapshotMetadata<?>>, ShardDataTreeSnapshotMetadata<?>> metadata) {
public MetadataShardDataTreeSnapshot(final NormalizedNode<?, ?> rootNode,
final Map<Class<? extends ShardDataTreeSnapshotMetadata<?>>, ShardDataTreeSnapshotMetadata<?>> metadata) {
- this.rootNode = Preconditions.checkNotNull(rootNode);
+ this.rootNode = requireNonNull(rootNode);
this.metadata = ImmutableMap.copyOf(metadata);
}
this.metadata = ImmutableMap.copyOf(metadata);
}
@Override
PayloadVersion version() {
@Override
PayloadVersion version() {
- return PayloadVersion.BORON;
+ return PayloadVersion.SODIUM;
}
private Object writeReplace() {
}
private Object writeReplace() {
import java.io.DataOutput;
import java.io.IOException;
import org.eclipse.jdt.annotation.NonNull;
import java.io.DataOutput;
import java.io.IOException;
import org.eclipse.jdt.annotation.NonNull;
+import org.opendaylight.controller.cluster.datastore.node.utils.stream.NormalizedNodeStreamVersion;
import org.opendaylight.yangtools.concepts.WritableObject;
/**
import org.opendaylight.yangtools.concepts.WritableObject;
/**
*/
@Beta
public enum PayloadVersion implements WritableObject {
*/
@Beta
public enum PayloadVersion implements WritableObject {
- // NOTE: enumeration values need to be sorted in asceding order of their version to keep Comparable working
+ // NOTE: enumeration values need to be sorted in ascending order of their version to keep Comparable working
/**
* Version which is older than any other version. This version exists purely for testing purposes.
*/
@VisibleForTesting
/**
* Version which is older than any other version. This version exists purely for testing purposes.
*/
@VisibleForTesting
+ TEST_PAST_VERSION(0) {
+ @Override
+ public NormalizedNodeStreamVersion getStreamVersion() {
+ throw new UnsupportedOperationException();
+ }
+ },
/**
* Initial ABI version, as shipped with Boron Simultaneous release.
*/
/**
* Initial ABI version, as shipped with Boron Simultaneous release.
*/
- // We seed the initial version to be the same as DataStoreVersions.BORON-VERSION for compatibility reasons.
- BORON(5),
+ // We seed the initial version to be the same as DataStoreVersions.BORON_VERSION for compatibility reasons.
+ BORON(5) {
+ @Override
+ public NormalizedNodeStreamVersion getStreamVersion() {
+ return NormalizedNodeStreamVersion.LITHIUM;
+ }
+ },
+
+ /**
+ * Revised payload version. Payloads remain the same as {@link #BORON}, but messages bearing QNames in any shape
+ * are using {@link NormalizedNodeStreamVersion#SODIUM}, which improves encoding.
+ */
+ SODIUM(6) {
+ @Override
+ public NormalizedNodeStreamVersion getStreamVersion() {
+ return NormalizedNodeStreamVersion.SODIUM;
+ }
+ },
/**
* Version which is newer than any other version. This version exists purely for testing purposes.
*/
@VisibleForTesting
/**
* Version which is newer than any other version. This version exists purely for testing purposes.
*/
@VisibleForTesting
- TEST_FUTURE_VERSION(65535);
+ TEST_FUTURE_VERSION(65535) {
+ @Override
+ public NormalizedNodeStreamVersion getStreamVersion() {
+ throw new UnsupportedOperationException();
+ }
+ };
private final short value;
private final short value;
+ /**
+ * Return the NormalizedNode stream version corresponding to this particular ABI.
+ *
+ * @return Stream Version to use for this ABI version
+ */
+ public abstract @NonNull NormalizedNodeStreamVersion getStreamVersion();
+
/**
* Return the codebase-native persistence version. This version is the default version allocated to messages
* at runtime. Conversion to previous versions may incur additional overhead (such as object allocation).
/**
* Return the codebase-native persistence version. This version is the default version allocated to messages
* at runtime. Conversion to previous versions may incur additional overhead (such as object allocation).
* @return Current {@link PayloadVersion}
*/
public static @NonNull PayloadVersion current() {
* @return Current {@link PayloadVersion}
*/
public static @NonNull PayloadVersion current() {
throw new PastVersionException(version, BORON);
case 5:
return BORON;
throw new PastVersionException(version, BORON);
case 5:
return BORON;
+ case 6:
+ return SODIUM;
- throw new FutureVersionException(version, BORON);
+ throw new FutureVersionException(version, SODIUM);
throw new IOException("Unsupported version", e);
}
}
throw new IOException("Unsupported version", e);
}
}
import java.io.ObjectInput;
import java.io.ObjectOutput;
import java.util.Optional;
import java.io.ObjectInput;
import java.io.ObjectOutput;
import java.util.Optional;
+import org.eclipse.jdt.annotation.NonNull;
import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
/**
import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
/**
// Hidden to prevent subclassing from outside of this package
}
// Hidden to prevent subclassing from outside of this package
}
- public static ShardDataTreeSnapshot deserialize(final ObjectInput in) throws IOException {
- final ShardDataTreeSnapshot ret = AbstractVersionedShardDataTreeSnapshot.versionedDeserialize(in);
+ public static @NonNull ShardSnapshotState deserialize(final ObjectInput in) throws IOException {
+ final ShardSnapshotState ret = AbstractVersionedShardDataTreeSnapshot.versionedDeserialize(in);
// Make sure we consume all bytes, otherwise something went very wrong
final int bytesLeft = in.available();
// Make sure we consume all bytes, otherwise something went very wrong
final int bytesLeft = in.available();
throw new IOException("Deserialization left " + bytesLeft + " in the buffer");
}
throw new IOException("Deserialization left " + bytesLeft + " in the buffer");
}
@Override
public void readExternal(final ObjectInput in) throws IOException {
@Override
public void readExternal(final ObjectInput in) throws IOException {
- snapshotState = new ShardSnapshotState(ShardDataTreeSnapshot.deserialize(in));
+ snapshotState = ShardDataTreeSnapshot.deserialize(in);
}
private Object readResolve() {
}
private Object readResolve() {
@SuppressFBWarnings(value = "SE_BAD_FIELD", justification = "This field is not Serializable but this class "
+ "implements writeReplace to delegate serialization to a Proxy class and thus instances of this class "
+ "aren't serialized. FindBugs does not recognize this.")
@SuppressFBWarnings(value = "SE_BAD_FIELD", justification = "This field is not Serializable but this class "
+ "implements writeReplace to delegate serialization to a Proxy class and thus instances of this class "
+ "aren't serialized. FindBugs does not recognize this.")
- private final ShardDataTreeSnapshot snapshot;
+ private final @NonNull ShardDataTreeSnapshot snapshot;
+ private final boolean migrated;
- public ShardSnapshotState(final @NonNull ShardDataTreeSnapshot snapshot) {
+ ShardSnapshotState(final @NonNull ShardDataTreeSnapshot snapshot, final boolean migrated) {
this.snapshot = requireNonNull(snapshot);
this.snapshot = requireNonNull(snapshot);
+ this.migrated = migrated;
+ }
+
+ public ShardSnapshotState(final @NonNull ShardDataTreeSnapshot snapshot) {
+ this(snapshot, false);
}
public @NonNull ShardDataTreeSnapshot getSnapshot() {
return snapshot;
}
}
public @NonNull ShardDataTreeSnapshot getSnapshot() {
return snapshot;
}
+ @Override
+ public boolean needsMigration() {
+ return migrated;
+ }
+
private Object writeReplace() {
return new Proxy(this);
}
private Object writeReplace() {
return new Proxy(this);
}
final ShardDataTreeSnapshot deserialized;
try (ObjectInputStream in = new ObjectInputStream(new ByteArrayInputStream(
installSnapshotStream.toByteArray()))) {
final ShardDataTreeSnapshot deserialized;
try (ObjectInputStream in = new ObjectInputStream(new ByteArrayInputStream(
installSnapshotStream.toByteArray()))) {
- deserialized = ShardDataTreeSnapshot.deserialize(in);
+ deserialized = ShardDataTreeSnapshot.deserialize(in).getSnapshot();
}
assertEquals("Deserialized snapshot type", snapshot.getClass(), deserialized.getClass());
}
assertEquals("Deserialized snapshot type", snapshot.getClass(), deserialized.getClass());
@Test
public void testCandidateSerialization() throws IOException {
final CommitTransactionPayload payload = CommitTransactionPayload.create(nextTransactionId(), candidate);
@Test
public void testCandidateSerialization() throws IOException {
final CommitTransactionPayload payload = CommitTransactionPayload.create(nextTransactionId(), candidate);
- assertEquals("payload size", 181, payload.size());
+ assertEquals("payload size", 169, payload.size());
ShardDataTreeSnapshot deserialized;
try (ObjectInputStream in = new ObjectInputStream(new ByteArrayInputStream(bos.toByteArray()))) {
ShardDataTreeSnapshot deserialized;
try (ObjectInputStream in = new ObjectInputStream(new ByteArrayInputStream(bos.toByteArray()))) {
- deserialized = ShardDataTreeSnapshot.deserialize(in);
+ deserialized = ShardDataTreeSnapshot.deserialize(in).getSnapshot();
}
Optional<NormalizedNode<?, ?>> actualNode = deserialized.getRootNode();
}
Optional<NormalizedNode<?, ?>> actualNode = deserialized.getRootNode();
ShardDataTreeSnapshot deserialized;
try (ObjectInputStream in = new ObjectInputStream(new ByteArrayInputStream(bos.toByteArray()))) {
ShardDataTreeSnapshot deserialized;
try (ObjectInputStream in = new ObjectInputStream(new ByteArrayInputStream(bos.toByteArray()))) {
- deserialized = ShardDataTreeSnapshot.deserialize(in);
+ deserialized = ShardDataTreeSnapshot.deserialize(in).getSnapshot();
}
Optional<NormalizedNode<?, ?>> actualNode = deserialized.getRootNode();
}
Optional<NormalizedNode<?, ?>> actualNode = deserialized.getRootNode();
private final String data;
private final String data;
- TestShardDataTreeSnapshotMetadata(String data) {
+ TestShardDataTreeSnapshotMetadata(final String data) {
- public boolean equals(Object obj) {
+ public boolean equals(final Object obj) {
return obj instanceof TestShardDataTreeSnapshotMetadata
&& data.equals(((TestShardDataTreeSnapshotMetadata)obj).data);
}
return obj instanceof TestShardDataTreeSnapshotMetadata
&& data.equals(((TestShardDataTreeSnapshotMetadata)obj).data);
}
+ Proxy(final String data) {
this.data = data;
}
@Override
this.data = data;
}
@Override
- public void writeExternal(ObjectOutput out) throws IOException {
+ public void writeExternal(final ObjectOutput out) throws IOException {
out.writeObject(data);
}
@Override
out.writeObject(data);
}
@Override
- public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
+ public void readExternal(final ObjectInput in) throws IOException, ClassNotFoundException {
data = (String) in.readObject();
}
data = (String) in.readObject();
}