import org.opendaylight.controller.cluster.raft.persisted.MigratedSerializable;
import org.opendaylight.controller.cluster.raft.persisted.ServerConfigurationPayload;
import org.opendaylight.controller.cluster.raft.persisted.Snapshot;
+import org.opendaylight.controller.cluster.raft.persisted.Snapshot.State;
import org.opendaylight.controller.cluster.raft.persisted.UpdateElectionTerm;
import org.opendaylight.controller.cluster.raft.protobuff.client.messages.PersistentPayload;
import org.slf4j.Logger;
context.setCommitIndex(snapshot.getLastAppliedIndex());
context.getTermInformation().update(snapshot.getElectionTerm(), snapshot.getElectionVotedFor());
- Stopwatch timer = Stopwatch.createStarted();
+ final Stopwatch timer = Stopwatch.createStarted();
// Apply the snapshot to the actors state
- if (!(snapshot.getState() instanceof EmptyState)) {
- cohort.applyRecoverySnapshot(snapshot.getState());
+ final State snapshotState = snapshot.getState();
+ if (snapshotState.needsMigration()) {
+ hasMigratedDataRecovered = true;
+ }
+ if (!(snapshotState instanceof EmptyState)) {
+ cohort.applyRecoverySnapshot(snapshotState);
}
if (snapshot.getServerConfiguration() != null) {
timer.stop();
log.info("Recovery snapshot applied for {} in {}: snapshotIndex={}, snapshotTerm={}, journal-size={}",
- context.getId(), timer.toString(), replicatedLog().getSnapshotIndex(),
- replicatedLog().getSnapshotTerm(), replicatedLog().size());
+ context.getId(), timer, replicatedLog().getSnapshotIndex(), replicatedLog().getSnapshotTerm(),
+ replicatedLog().size());
}
private void onRecoveredJournalLogEntry(final ReplicatedLogEntry logEntry) {
* @author Thomas Pantelis
*/
public interface State extends Serializable {
+ /**
+ * Indicate whether the snapshot requires migration, i.e. a new snapshot should be created after recovery.
+ * Default implementation returns false, i.e. do not re-snapshot.
+ *
+ * @return True if complete recovery based upon this snapshot should trigger a new snapshot.
+ */
+ default boolean needsMigration() {
+ return false;
+ }
}
private static final class Proxy implements Externalizable {
@Override
public State deserializeSnapshot(final ByteSource snapshotBytes) throws IOException {
try (ObjectInputStream in = new ObjectInputStream(snapshotBytes.openStream())) {
- return new ShardSnapshotState(ShardDataTreeSnapshot.deserialize(in));
+ return ShardDataTreeSnapshot.deserialize(in);
}
}
}
private static final Logger LOG = LoggerFactory.getLogger(AbstractVersionedShardDataTreeSnapshot.class);
@SuppressWarnings("checkstyle:FallThrough")
- static ShardDataTreeSnapshot versionedDeserialize(final ObjectInput in) throws IOException {
+ static @NonNull ShardSnapshotState versionedDeserialize(final ObjectInput in) throws IOException {
final PayloadVersion version = PayloadVersion.readFrom(in);
switch (version) {
case BORON:
- // Boron snapshots use Java Serialization
- try {
- return (ShardDataTreeSnapshot) in.readObject();
- } catch (ClassNotFoundException e) {
- LOG.error("Failed to serialize data tree snapshot", e);
- throw new IOException("Snapshot failed to deserialize", e);
- }
+ return new ShardSnapshotState(readSnapshot(in), true);
+ case SODIUM:
+ return new ShardSnapshotState(readSnapshot(in), false);
case TEST_FUTURE_VERSION:
case TEST_PAST_VERSION:
// These versions are never returned and this code is effectively dead
- break;
default:
- throw new IOException("Invalid payload version in snapshot");
+ // Not included as default in above switch to ensure we get warnings when new versions are added
+ throw new IOException("Encountered unhandled version" + version);
}
+ }
- // Not included as default in above switch to ensure we get warnings when new versions are added
- throw new IOException("Encountered unhandled version" + version);
+ // Boron and Sodium snapshots use Java Serialization, but differ in stream format
+ private static @NonNull ShardDataTreeSnapshot readSnapshot(final ObjectInput in) throws IOException {
+ try {
+ return (ShardDataTreeSnapshot) in.readObject();
+ } catch (ClassNotFoundException e) {
+ LOG.error("Failed to serialize data tree snapshot", e);
+ throw new IOException("Snapshot failed to deserialize", e);
+ }
}
@Override
private void versionedSerialize(final ObjectOutput out, final PayloadVersion version) throws IOException {
switch (version) {
case BORON:
- // Boron snapshots use Java Serialization
+ case SODIUM:
+ // Boron and Sodium snapshots use Java Serialization, but differ in stream format
out.writeObject(this);
return;
case TEST_FUTURE_VERSION:
return DataTreeCandidates.newDataTreeCandidate(rootPath, rootNode);
}
-
private static void writeChildren(final NormalizedNodeDataOutput out,
final Collection<DataTreeCandidateNode> children) throws IOException {
out.writeInt(children.size());
public static void writeDataTreeCandidate(final DataOutput out, final DataTreeCandidate candidate)
throws IOException {
- try (NormalizedNodeDataOutput writer = NormalizedNodeInputOutput.newDataOutput(out)) {
+ try (NormalizedNodeDataOutput writer = NormalizedNodeInputOutput.newDataOutput(out,
+ PayloadVersion.current().getStreamVersion())) {
writer.writeYangInstanceIdentifier(candidate.getRootPath());
final DataTreeCandidateNode node = candidate.getRootNode();
*/
package org.opendaylight.controller.cluster.datastore.persisted;
+import static com.google.common.base.Preconditions.checkArgument;
+import static java.util.Objects.requireNonNull;
+
import com.google.common.annotations.Beta;
import com.google.common.base.MoreObjects;
-import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableMap.Builder;
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
@Override
public void readExternal(final ObjectInput in) throws IOException, ClassNotFoundException {
final int metaSize = in.readInt();
- Preconditions.checkArgument(metaSize >= 0, "Invalid negative metadata map length %s", metaSize);
+ checkArgument(metaSize >= 0, "Invalid negative metadata map length %s", metaSize);
// Default pre-allocate is 4, which should be fine
final Builder<Class<? extends ShardDataTreeSnapshotMetadata<?>>, ShardDataTreeSnapshotMetadata<?>>
public MetadataShardDataTreeSnapshot(final NormalizedNode<?, ?> rootNode,
final Map<Class<? extends ShardDataTreeSnapshotMetadata<?>>, ShardDataTreeSnapshotMetadata<?>> metadata) {
- this.rootNode = Preconditions.checkNotNull(rootNode);
+ this.rootNode = requireNonNull(rootNode);
this.metadata = ImmutableMap.copyOf(metadata);
}
@Override
PayloadVersion version() {
- return PayloadVersion.BORON;
+ return PayloadVersion.SODIUM;
}
private Object writeReplace() {
import java.io.DataOutput;
import java.io.IOException;
import org.eclipse.jdt.annotation.NonNull;
+import org.opendaylight.controller.cluster.datastore.node.utils.stream.NormalizedNodeStreamVersion;
import org.opendaylight.yangtools.concepts.WritableObject;
/**
*/
@Beta
public enum PayloadVersion implements WritableObject {
- // NOTE: enumeration values need to be sorted in asceding order of their version to keep Comparable working
+ // NOTE: enumeration values need to be sorted in ascending order of their version to keep Comparable working
/**
* Version which is older than any other version. This version exists purely for testing purposes.
*/
@VisibleForTesting
- TEST_PAST_VERSION(0),
+ TEST_PAST_VERSION(0) {
+ @Override
+ public NormalizedNodeStreamVersion getStreamVersion() {
+ throw new UnsupportedOperationException();
+ }
+ },
/**
* Initial ABI version, as shipped with Boron Simultaneous release.
*/
- // We seed the initial version to be the same as DataStoreVersions.BORON-VERSION for compatibility reasons.
- BORON(5),
+ // We seed the initial version to be the same as DataStoreVersions.BORON_VERSION for compatibility reasons.
+ BORON(5) {
+ @Override
+ public NormalizedNodeStreamVersion getStreamVersion() {
+ return NormalizedNodeStreamVersion.LITHIUM;
+ }
+ },
+
+ /**
+ * Revised payload version. Payloads remain the same as {@link #BORON}, but messages bearing QNames in any shape
+ * are using {@link NormalizedNodeStreamVersion#SODIUM}, which improves encoding.
+ */
+ SODIUM(6) {
+ @Override
+ public NormalizedNodeStreamVersion getStreamVersion() {
+ return NormalizedNodeStreamVersion.SODIUM;
+ }
+ },
/**
* Version which is newer than any other version. This version exists purely for testing purposes.
*/
@VisibleForTesting
- TEST_FUTURE_VERSION(65535);
+ TEST_FUTURE_VERSION(65535) {
+ @Override
+ public NormalizedNodeStreamVersion getStreamVersion() {
+ throw new UnsupportedOperationException();
+ }
+ };
private final short value;
return value;
}
+ /**
+ * Return the NormalizedNode stream version corresponding to this particular ABI.
+ *
+ * @return Stream Version to use for this ABI version
+ */
+ public abstract @NonNull NormalizedNodeStreamVersion getStreamVersion();
+
/**
* Return the codebase-native persistence version. This version is the default version allocated to messages
* at runtime. Conversion to previous versions may incur additional overhead (such as object allocation).
* @return Current {@link PayloadVersion}
*/
public static @NonNull PayloadVersion current() {
- return BORON;
+ return SODIUM;
}
/**
throw new PastVersionException(version, BORON);
case 5:
return BORON;
+ case 6:
+ return SODIUM;
default:
- throw new FutureVersionException(version, BORON);
+ throw new FutureVersionException(version, SODIUM);
}
}
throw new IOException("Unsupported version", e);
}
}
-
}
import java.io.ObjectInput;
import java.io.ObjectOutput;
import java.util.Optional;
+import org.eclipse.jdt.annotation.NonNull;
import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
/**
// Hidden to prevent subclassing from outside of this package
}
- public static ShardDataTreeSnapshot deserialize(final ObjectInput in) throws IOException {
- final ShardDataTreeSnapshot ret = AbstractVersionedShardDataTreeSnapshot.versionedDeserialize(in);
+ public static @NonNull ShardSnapshotState deserialize(final ObjectInput in) throws IOException {
+ final ShardSnapshotState ret = AbstractVersionedShardDataTreeSnapshot.versionedDeserialize(in);
// Make sure we consume all bytes, otherwise something went very wrong
final int bytesLeft = in.available();
throw new IOException("Deserialization left " + bytesLeft + " in the buffer");
}
-
return ret;
}
@Override
public void readExternal(final ObjectInput in) throws IOException {
- snapshotState = new ShardSnapshotState(ShardDataTreeSnapshot.deserialize(in));
+ snapshotState = ShardDataTreeSnapshot.deserialize(in);
}
private Object readResolve() {
@SuppressFBWarnings(value = "SE_BAD_FIELD", justification = "This field is not Serializable but this class "
+ "implements writeReplace to delegate serialization to a Proxy class and thus instances of this class "
+ "aren't serialized. FindBugs does not recognize this.")
- private final ShardDataTreeSnapshot snapshot;
+ private final @NonNull ShardDataTreeSnapshot snapshot;
+ private final boolean migrated;
- public ShardSnapshotState(final @NonNull ShardDataTreeSnapshot snapshot) {
+ ShardSnapshotState(final @NonNull ShardDataTreeSnapshot snapshot, final boolean migrated) {
this.snapshot = requireNonNull(snapshot);
+ this.migrated = migrated;
+ }
+
+ public ShardSnapshotState(final @NonNull ShardDataTreeSnapshot snapshot) {
+ this(snapshot, false);
}
public @NonNull ShardDataTreeSnapshot getSnapshot() {
return snapshot;
}
+ @Override
+ public boolean needsMigration() {
+ return migrated;
+ }
+
private Object writeReplace() {
return new Proxy(this);
}
final ShardDataTreeSnapshot deserialized;
try (ObjectInputStream in = new ObjectInputStream(new ByteArrayInputStream(
installSnapshotStream.toByteArray()))) {
- deserialized = ShardDataTreeSnapshot.deserialize(in);
+ deserialized = ShardDataTreeSnapshot.deserialize(in).getSnapshot();
}
assertEquals("Deserialized snapshot type", snapshot.getClass(), deserialized.getClass());
@Test
public void testCandidateSerialization() throws IOException {
final CommitTransactionPayload payload = CommitTransactionPayload.create(nextTransactionId(), candidate);
- assertEquals("payload size", 181, payload.size());
+ assertEquals("payload size", 169, payload.size());
}
@Test
ShardDataTreeSnapshot deserialized;
try (ObjectInputStream in = new ObjectInputStream(new ByteArrayInputStream(bos.toByteArray()))) {
- deserialized = ShardDataTreeSnapshot.deserialize(in);
+ deserialized = ShardDataTreeSnapshot.deserialize(in).getSnapshot();
}
Optional<NormalizedNode<?, ?>> actualNode = deserialized.getRootNode();
ShardDataTreeSnapshot deserialized;
try (ObjectInputStream in = new ObjectInputStream(new ByteArrayInputStream(bos.toByteArray()))) {
- deserialized = ShardDataTreeSnapshot.deserialize(in);
+ deserialized = ShardDataTreeSnapshot.deserialize(in).getSnapshot();
}
Optional<NormalizedNode<?, ?>> actualNode = deserialized.getRootNode();
private final String data;
- TestShardDataTreeSnapshotMetadata(String data) {
+ TestShardDataTreeSnapshotMetadata(final String data) {
this.data = data;
}
}
@Override
- public boolean equals(Object obj) {
+ public boolean equals(final Object obj) {
return obj instanceof TestShardDataTreeSnapshotMetadata
&& data.equals(((TestShardDataTreeSnapshotMetadata)obj).data);
}
public Proxy() {
}
- Proxy(String data) {
+ Proxy(final String data) {
this.data = data;
}
@Override
- public void writeExternal(ObjectOutput out) throws IOException {
+ public void writeExternal(final ObjectOutput out) throws IOException {
out.writeObject(data);
}
@Override
- public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
+ public void readExternal(final ObjectInput in) throws IOException, ClassNotFoundException {
data = (String) in.readObject();
}