Bump persisted PayloadVersion 70/82770/4
authorRobert Varga <robert.varga@pantheon.tech>
Wed, 29 May 2019 16:23:04 +0000 (18:23 +0200)
committerRobert Varga <robert.varga@pantheon.tech>
Fri, 28 Jun 2019 13:50:52 +0000 (15:50 +0200)
Since the NormalizedNodeStream format has changed, shard persisted
state is affected.

This patch bumps PayloadVersion and applies that bump to snapshots
and CommitTransactionPayload.

On recovery, a snapshot's need to migrate is reflected in its state,
and it is examined just as MigratedSerializables are.
CommitTransactionPayload is not examined, as understanding its
stream version would require deserializing at least its header.

JIRA: CONTROLLER-1888
Change-Id: I678527be4487ee1729123ba8b9dcd2269e6cf262
Signed-off-by: Robert Varga <robert.varga@pantheon.tech>
(cherry picked from commit 2658d126f67512fcaac684d2ccb7197fe12f61f4)

12 files changed:
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActorRecoverySupport.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/persisted/Snapshot.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardSnapshotCohort.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/persisted/AbstractVersionedShardDataTreeSnapshot.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/persisted/DataTreeCandidateInputOutput.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/persisted/MetadataShardDataTreeSnapshot.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/persisted/PayloadVersion.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/persisted/ShardDataTreeSnapshot.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/persisted/ShardSnapshotState.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/actors/ShardSnapshotActorTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/persisted/CommitTransactionPayloadTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/persisted/ShardDataTreeSnapshotTest.java

index d94fd293be594da1c687b2b57101b4c70dbc8786..873b8514a2e30038f4327a01507df063b2468211 100644 (file)
@@ -19,6 +19,7 @@ import org.opendaylight.controller.cluster.raft.persisted.EmptyState;
 import org.opendaylight.controller.cluster.raft.persisted.MigratedSerializable;
 import org.opendaylight.controller.cluster.raft.persisted.ServerConfigurationPayload;
 import org.opendaylight.controller.cluster.raft.persisted.Snapshot;
+import org.opendaylight.controller.cluster.raft.persisted.Snapshot.State;
 import org.opendaylight.controller.cluster.raft.persisted.UpdateElectionTerm;
 import org.opendaylight.controller.cluster.raft.protobuff.client.messages.PersistentPayload;
 import org.slf4j.Logger;
@@ -136,11 +137,15 @@ class RaftActorRecoverySupport {
         context.setCommitIndex(snapshot.getLastAppliedIndex());
         context.getTermInformation().update(snapshot.getElectionTerm(), snapshot.getElectionVotedFor());
 
-        Stopwatch timer = Stopwatch.createStarted();
+        final Stopwatch timer = Stopwatch.createStarted();
 
         // Apply the snapshot to the actors state
-        if (!(snapshot.getState() instanceof EmptyState)) {
-            cohort.applyRecoverySnapshot(snapshot.getState());
+        final State snapshotState = snapshot.getState();
+        if (snapshotState.needsMigration()) {
+            hasMigratedDataRecovered = true;
+        }
+        if (!(snapshotState instanceof EmptyState)) {
+            cohort.applyRecoverySnapshot(snapshotState);
         }
 
         if (snapshot.getServerConfiguration() != null) {
@@ -149,8 +154,8 @@ class RaftActorRecoverySupport {
 
         timer.stop();
         log.info("Recovery snapshot applied for {} in {}: snapshotIndex={}, snapshotTerm={}, journal-size={}",
-                context.getId(), timer.toString(), replicatedLog().getSnapshotIndex(),
-                replicatedLog().getSnapshotTerm(), replicatedLog().size());
+                context.getId(), timer, replicatedLog().getSnapshotIndex(), replicatedLog().getSnapshotTerm(),
+                replicatedLog().size());
     }
 
     private void onRecoveredJournalLogEntry(final ReplicatedLogEntry logEntry) {
index 42a4a8de810bd5a2b4d04d872b2e00620ff9bd07..091009e2bd31e3898af14552874bf6cc17597898 100644 (file)
@@ -31,6 +31,15 @@ public class Snapshot implements Serializable {
      * @author Thomas Pantelis
      */
     public interface State extends Serializable {
+        /**
+         * Indicate whether the snapshot requires migration, i.e. a new snapshot should be created after recovery.
+         * Default implementation returns false, i.e. do not re-snapshot.
+         *
+         * @return True if complete recovery based upon this snapshot should trigger a new snapshot.
+         */
+        default boolean needsMigration() {
+            return false;
+        }
     }
 
     private static final class Proxy implements Externalizable {
index 3ed3a48770054dc709d7076b9c79bd8c30de1bf1..e936d4389b8b33f78e958bafc1437aef1940f229 100644 (file)
@@ -99,7 +99,7 @@ final class ShardSnapshotCohort implements RaftActorSnapshotCohort {
     @Override
     public State deserializeSnapshot(final ByteSource snapshotBytes) throws IOException {
         try (ObjectInputStream in = new ObjectInputStream(snapshotBytes.openStream())) {
-            return new ShardSnapshotState(ShardDataTreeSnapshot.deserialize(in));
+            return ShardDataTreeSnapshot.deserialize(in);
         }
     }
 }
index 7ebb0055f2da2c1fcbbd9f8843ca2abc0c65e560..e6f2b9dacfde1678cf8fa1c02efd6d4066fc940e 100644 (file)
@@ -27,27 +27,30 @@ abstract class AbstractVersionedShardDataTreeSnapshot extends ShardDataTreeSnaps
     private static final Logger LOG = LoggerFactory.getLogger(AbstractVersionedShardDataTreeSnapshot.class);
 
     @SuppressWarnings("checkstyle:FallThrough")
-    static ShardDataTreeSnapshot versionedDeserialize(final ObjectInput in) throws IOException {
+    @Nonnull static ShardSnapshotState versionedDeserialize(final ObjectInput in) throws IOException {
         final PayloadVersion version = PayloadVersion.readFrom(in);
         switch (version) {
             case BORON:
-                // Boron snapshots use Java Serialization
-                try {
-                    return (ShardDataTreeSnapshot) in.readObject();
-                } catch (ClassNotFoundException e) {
-                    LOG.error("Failed to serialize data tree snapshot", e);
-                    throw new IOException("Snapshot failed to deserialize", e);
-                }
+                return new ShardSnapshotState(readSnapshot(in), true);
+            case SODIUM:
+                return new ShardSnapshotState(readSnapshot(in), false);
             case TEST_FUTURE_VERSION:
             case TEST_PAST_VERSION:
                 // These versions are never returned and this code is effectively dead
-                break;
             default:
-                throw new IOException("Invalid payload version in snapshot");
+                // Not included as default in above switch to ensure we get warnings when new versions are added
+                throw new IOException("Encountered unhandled version" + version);
         }
+    }
 
-        // Not included as default in above switch to ensure we get warnings when new versions are added
-        throw new IOException("Encountered unhandled version" + version);
+    // Boron and Sodium snapshots use Java Serialization, but differ in stream format
+    @Nonnull private static ShardDataTreeSnapshot readSnapshot(final ObjectInput in) throws IOException {
+        try {
+            return (ShardDataTreeSnapshot) in.readObject();
+        } catch (ClassNotFoundException e) {
+            LOG.error("Failed to serialize data tree snapshot", e);
+            throw new IOException("Snapshot failed to deserialize", e);
+        }
     }
 
     @Override
@@ -74,7 +77,8 @@ abstract class AbstractVersionedShardDataTreeSnapshot extends ShardDataTreeSnaps
     private void versionedSerialize(final ObjectOutput out, final PayloadVersion version) throws IOException {
         switch (version) {
             case BORON:
-                // Boron snapshots use Java Serialization
+            case SODIUM:
+                // Boron and Sodium snapshots use Java Serialization, but differ in stream format
                 out.writeObject(this);
                 return;
             case TEST_FUTURE_VERSION:
index bc1fca165544174c96043136a9f98e176c57e777..cb833c148329a713f371ec11cf9cf595b018fd38 100644 (file)
@@ -129,7 +129,6 @@ public final class DataTreeCandidateInputOutput {
         return DataTreeCandidates.newDataTreeCandidate(rootPath, rootNode);
     }
 
-
     private static void writeChildren(final NormalizedNodeDataOutput out,
             final Collection<DataTreeCandidateNode> children) throws IOException {
         out.writeInt(children.size());
@@ -172,8 +171,10 @@ public final class DataTreeCandidateInputOutput {
         }
     }
 
-    public static void writeDataTreeCandidate(final DataOutput out, DataTreeCandidate candidate) throws IOException {
-        try (NormalizedNodeDataOutput writer = NormalizedNodeInputOutput.newDataOutput(out)) {
+    public static void writeDataTreeCandidate(final DataOutput out, final DataTreeCandidate candidate)
+            throws IOException {
+        try (NormalizedNodeDataOutput writer = NormalizedNodeInputOutput.newDataOutput(out,
+                PayloadVersion.current().getStreamVersion())) {
             writer.writeYangInstanceIdentifier(candidate.getRootPath());
 
             final DataTreeCandidateNode node = candidate.getRootNode();
index 320ae61cd32f75e89f92514ceeb1bb575e037435..a5131c2ab21769dcbd163a8a0d20cf9add1c67ee 100644 (file)
@@ -7,9 +7,11 @@
  */
 package org.opendaylight.controller.cluster.datastore.persisted;
 
+import static com.google.common.base.Preconditions.checkArgument;
+import static java.util.Objects.requireNonNull;
+
 import com.google.common.annotations.Beta;
 import com.google.common.base.MoreObjects;
-import com.google.common.base.Preconditions;
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.ImmutableMap.Builder;
 import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
@@ -64,7 +66,7 @@ public final class MetadataShardDataTreeSnapshot extends AbstractVersionedShardD
         @Override
         public void readExternal(final ObjectInput in) throws IOException, ClassNotFoundException {
             final int metaSize = in.readInt();
-            Preconditions.checkArgument(metaSize >= 0, "Invalid negative metadata map length %s", metaSize);
+            checkArgument(metaSize >= 0, "Invalid negative metadata map length %s", metaSize);
 
             // Default pre-allocate is 4, which should be fine
             final Builder<Class<? extends ShardDataTreeSnapshotMetadata<?>>, ShardDataTreeSnapshotMetadata<?>>
@@ -103,7 +105,7 @@ public final class MetadataShardDataTreeSnapshot extends AbstractVersionedShardD
 
     public MetadataShardDataTreeSnapshot(final NormalizedNode<?, ?> rootNode,
             final Map<Class<? extends ShardDataTreeSnapshotMetadata<?>>, ShardDataTreeSnapshotMetadata<?>> metadata) {
-        this.rootNode = Preconditions.checkNotNull(rootNode);
+        this.rootNode = requireNonNull(rootNode);
         this.metadata = ImmutableMap.copyOf(metadata);
     }
 
@@ -118,7 +120,7 @@ public final class MetadataShardDataTreeSnapshot extends AbstractVersionedShardD
 
     @Override
     PayloadVersion version() {
-        return PayloadVersion.BORON;
+        return PayloadVersion.SODIUM;
     }
 
     private Object writeReplace() {
index 1dbbcba2e5e741ed107b36d44e28cf338f1ddc4d..97b3ab5ed6f9c445a315f4211d801b773060c02f 100644 (file)
@@ -14,6 +14,7 @@ import java.io.DataInput;
 import java.io.DataOutput;
 import java.io.IOException;
 import javax.annotation.Nonnull;
+import org.opendaylight.controller.cluster.datastore.node.utils.stream.NormalizedNodeStreamVersion;
 import org.opendaylight.yangtools.concepts.WritableObject;
 
 /**
@@ -34,25 +35,51 @@ import org.opendaylight.yangtools.concepts.WritableObject;
  */
 @Beta
 public enum PayloadVersion implements WritableObject {
-    // NOTE: enumeration values need to be sorted in asceding order of their version to keep Comparable working
+    // NOTE: enumeration values need to be sorted in ascending order of their version to keep Comparable working
 
     /**
      * Version which is older than any other version. This version exists purely for testing purposes.
      */
     @VisibleForTesting
-    TEST_PAST_VERSION(0),
+    TEST_PAST_VERSION(0) {
+        @Override
+        public NormalizedNodeStreamVersion getStreamVersion() {
+            throw new UnsupportedOperationException();
+        }
+    },
 
     /**
      * Initial ABI version, as shipped with Boron Simultaneous release.
      */
-    // We seed the initial version to be the same as DataStoreVersions.BORON-VERSION for compatibility reasons.
-    BORON(5),
+    // We seed the initial version to be the same as DataStoreVersions.BORON_VERSION for compatibility reasons.
+    BORON(5) {
+        @Override
+        public NormalizedNodeStreamVersion getStreamVersion() {
+            return NormalizedNodeStreamVersion.LITHIUM;
+        }
+    },
+
+    /**
+     * Revised payload version. Payloads remain the same as {@link #BORON}, but messages bearing QNames in any shape
+     * are using {@link NormalizedNodeStreamVersion#SODIUM}, which improves encoding.
+     */
+    SODIUM(6) {
+        @Override
+        public NormalizedNodeStreamVersion getStreamVersion() {
+            return NormalizedNodeStreamVersion.SODIUM;
+        }
+    },
 
     /**
      * Version which is newer than any other version. This version exists purely for testing purposes.
      */
     @VisibleForTesting
-    TEST_FUTURE_VERSION(65535);
+    TEST_FUTURE_VERSION(65535) {
+        @Override
+        public NormalizedNodeStreamVersion getStreamVersion() {
+            throw new UnsupportedOperationException();
+        }
+    };
 
     private final short value;
 
@@ -70,6 +97,14 @@ public enum PayloadVersion implements WritableObject {
         return value;
     }
 
+    /**
+     * Return the NormalizedNode stream version corresponding to this particular ABI.
+     *
+     * @return Stream Version to use for this ABI version
+     */
+    @Nonnull
+    public abstract NormalizedNodeStreamVersion getStreamVersion();
+
     /**
      * Return the codebase-native persistence version. This version is the default version allocated to messages
      * at runtime. Conversion to previous versions may incur additional overhead (such as object allocation).
@@ -78,7 +113,7 @@ public enum PayloadVersion implements WritableObject {
      */
     @Nonnull
     public static PayloadVersion current() {
-        return BORON;
+        return SODIUM;
     }
 
     /**
@@ -101,8 +136,10 @@ public enum PayloadVersion implements WritableObject {
                 throw new PastVersionException(version, BORON);
             case 5:
                 return BORON;
+            case 6:
+                return SODIUM;
             default:
-                throw new FutureVersionException(version, BORON);
+                throw new FutureVersionException(version, SODIUM);
         }
     }
 
@@ -128,5 +165,4 @@ public enum PayloadVersion implements WritableObject {
             throw new IOException("Unsupported version", e);
         }
     }
-
 }
index 7a8bd4648bad5ff0b49bc0bed74d6b47c8d55653..9c1622bb1bd64ce6dee60520800315b29a9e2d85 100644 (file)
@@ -12,6 +12,7 @@ import java.io.IOException;
 import java.io.ObjectInput;
 import java.io.ObjectOutput;
 import java.util.Optional;
+import org.eclipse.jdt.annotation.NonNull;
 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
 
 /**
@@ -25,8 +26,8 @@ public abstract class ShardDataTreeSnapshot {
         // Hidden to prevent subclassing from outside of this package
     }
 
-    public static ShardDataTreeSnapshot deserialize(final ObjectInput in) throws IOException {
-        final ShardDataTreeSnapshot ret = AbstractVersionedShardDataTreeSnapshot.versionedDeserialize(in);
+    public static @NonNull ShardSnapshotState deserialize(final ObjectInput in) throws IOException {
+        final ShardSnapshotState ret = AbstractVersionedShardDataTreeSnapshot.versionedDeserialize(in);
 
         // Make sure we consume all bytes, otherwise something went very wrong
         final int bytesLeft = in.available();
@@ -34,7 +35,6 @@ public abstract class ShardDataTreeSnapshot {
             throw new IOException("Deserialization left " + bytesLeft + " in the buffer");
         }
 
-
         return ret;
     }
 
index 8f22c0ec2dc2b2fe5bed0033b2bd18657263c27d..95953f920046e6d34e19e1530d5f4dfe7e1b2c5d 100644 (file)
@@ -7,7 +7,8 @@
  */
 package org.opendaylight.controller.cluster.datastore.persisted;
 
-import com.google.common.base.Preconditions;
+import static java.util.Objects.requireNonNull;
+
 import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
 import java.io.Externalizable;
 import java.io.IOException;
@@ -47,7 +48,7 @@ public class ShardSnapshotState implements Snapshot.State {
 
         @Override
         public void readExternal(final ObjectInput in) throws IOException {
-            snapshotState = new ShardSnapshotState(ShardDataTreeSnapshot.deserialize(in));
+            snapshotState = ShardDataTreeSnapshot.deserialize(in);
         }
 
         private Object readResolve() {
@@ -59,9 +60,15 @@ public class ShardSnapshotState implements Snapshot.State {
             + "implements writeReplace to delegate serialization to a Proxy class and thus instances of this class "
             + "aren't serialized. FindBugs does not recognize this.")
     private final ShardDataTreeSnapshot snapshot;
+    private final boolean migrated;
+
+    ShardSnapshotState(@Nonnull final ShardDataTreeSnapshot snapshot, final boolean migrated) {
+        this.snapshot = requireNonNull(snapshot);
+        this.migrated = migrated;
+    }
 
     public ShardSnapshotState(@Nonnull final ShardDataTreeSnapshot snapshot) {
-        this.snapshot = Preconditions.checkNotNull(snapshot);
+        this(snapshot, false);
     }
 
     @Nonnull
@@ -69,6 +76,11 @@ public class ShardSnapshotState implements Snapshot.State {
         return snapshot;
     }
 
+    @Override
+    public boolean needsMigration() {
+        return migrated;
+    }
+
     private Object writeReplace() {
         return new Proxy(this);
     }
index 5878675fdf84fe1e122cd858d0469ad604ce1769..6a0def0901ac70d413af19e201793fb4311d75b8 100644 (file)
@@ -52,7 +52,7 @@ public class ShardSnapshotActorTest extends AbstractActorTest {
             final ShardDataTreeSnapshot deserialized;
             try (ObjectInputStream in = new ObjectInputStream(new ByteArrayInputStream(
                 installSnapshotStream.toByteArray()))) {
-                deserialized = ShardDataTreeSnapshot.deserialize(in);
+                deserialized = ShardDataTreeSnapshot.deserialize(in).getSnapshot();
             }
 
             assertEquals("Deserialized snapshot type", snapshot.getClass(), deserialized.getClass());
index 09273b363cf37c9420ea7fa76715d277cead61c1..0886d94fee0286ab0234cd4269e85794a00e0e8a 100644 (file)
@@ -107,7 +107,7 @@ public class CommitTransactionPayloadTest extends AbstractTest {
     @Test
     public void testCandidateSerialization() throws IOException {
         final CommitTransactionPayload payload = CommitTransactionPayload.create(nextTransactionId(), candidate);
-        assertEquals("payload size", 181, payload.size());
+        assertEquals("payload size", 169, payload.size());
     }
 
     @Test
index 333f3e9b247528a5d79805359f44c1965d7e9288..9e05cf4860d5e3faaf7d16144e675f5cc207a3d8 100644 (file)
@@ -49,7 +49,7 @@ public class ShardDataTreeSnapshotTest {
 
         ShardDataTreeSnapshot deserialized;
         try (ObjectInputStream in = new ObjectInputStream(new ByteArrayInputStream(bos.toByteArray()))) {
-            deserialized = ShardDataTreeSnapshot.deserialize(in);
+            deserialized = ShardDataTreeSnapshot.deserialize(in).getSnapshot();
         }
 
         Optional<NormalizedNode<?, ?>> actualNode = deserialized.getRootNode();
@@ -75,7 +75,7 @@ public class ShardDataTreeSnapshotTest {
 
         ShardDataTreeSnapshot deserialized;
         try (ObjectInputStream in = new ObjectInputStream(new ByteArrayInputStream(bos.toByteArray()))) {
-            deserialized = ShardDataTreeSnapshot.deserialize(in);
+            deserialized = ShardDataTreeSnapshot.deserialize(in).getSnapshot();
         }
 
         Optional<NormalizedNode<?, ?>> actualNode = deserialized.getRootNode();
@@ -91,7 +91,7 @@ public class ShardDataTreeSnapshotTest {
 
         private final String data;
 
-        TestShardDataTreeSnapshotMetadata(String data) {
+        TestShardDataTreeSnapshotMetadata(final String data) {
             this.data = data;
         }
 
@@ -111,7 +111,7 @@ public class ShardDataTreeSnapshotTest {
         }
 
         @Override
-        public boolean equals(Object obj) {
+        public boolean equals(final Object obj) {
             return obj instanceof TestShardDataTreeSnapshotMetadata
                     && data.equals(((TestShardDataTreeSnapshotMetadata)obj).data);
         }
@@ -123,17 +123,17 @@ public class ShardDataTreeSnapshotTest {
             public Proxy() {
             }
 
-            Proxy(String data) {
+            Proxy(final String data) {
                 this.data = data;
             }
 
             @Override
-            public void writeExternal(ObjectOutput out) throws IOException {
+            public void writeExternal(final ObjectOutput out) throws IOException {
                 out.writeObject(data);
             }
 
             @Override
-            public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
+            public void readExternal(final ObjectInput in) throws IOException, ClassNotFoundException {
                 data = (String) in.readObject();
             }