BUG-5280: centralize ShardSnapshot operations 85/42785/11
authorRobert Varga <rovarga@cisco.com>
Fri, 29 Jul 2016 13:58:49 +0000 (15:58 +0200)
committerTom Pantelis <tpanteli@brocade.com>
Mon, 1 Aug 2016 20:08:31 +0000 (20:08 +0000)
Current shard snapshotting mechanism does not allow for evolution
of the snapshot contents and contains only the root node. Also
the serialization and deserialization operations are scattered
in multiple places, making coordinated changes a bit troublesome.

This patch introduces a versioned snapshot abstraction and moves
serdes operations into a single place. A new serialization format
is introduced, which holds the root node and some additional
metadata. No concrete metadata is defined in this patch, but this
will be used to transfer frontend protocol state from shard leader
to shard follower.

It also moves the act of creating a snapshot into ShardDataTree
and creates a dedicated actor to handle the snapshotting task,
which is used for all snapshot requests for a particular Shard.
Also makes the actor message internal to the ShardSnapshotActor,
providing a convenience method to create and dispatch it.

Change-Id: I6d9680b6ef08672c363092a649255013980c0bd6
Signed-off-by: Robert Varga <rovarga@cisco.com>
19 files changed:
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardDataTree.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardReadTransaction.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardRecoveryCoordinator.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardSnapshotCohort.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/actors/ShardSnapshotActor.java [new file with mode: 0644]
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/CreateSnapshot.java [deleted file]
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/persisted/AbstractVersionedShardDataTreeSnapshot.java [new file with mode: 0644]
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/persisted/MetadataShardDataTreeSnapshot.java [new file with mode: 0644]
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/persisted/PreBoronShardDataTreeSnapshot.java [new file with mode: 0644]
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/persisted/ShardDataTreeSnapshot.java [new file with mode: 0644]
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/persisted/ShardDataTreeSnapshotMetadata.java [new file with mode: 0644]
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/AbstractShardTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DistributedDataStoreIntegrationTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardRecoveryCoordinatorTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTransactionTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/actors/ShardSnapshotActorTest.java [new file with mode: 0644]
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/persisted/ShardDataTreeSnapshotTest.java [new file with mode: 0644]

index 0c65bda..bd44f0b 100644 (file)
@@ -174,8 +174,8 @@ public class Shard extends RaftActor {
                 new Dispatchers(context().system().dispatchers()).getDispatcherPath(
                         Dispatchers.DispatcherType.Transaction), self(), getContext(), shardMBean);
 
-        snapshotCohort = new ShardSnapshotCohort(builder.getId().getMemberName(), transactionActorFactory, store,
-            LOG, this.name);
+        snapshotCohort = ShardSnapshotCohort.create(getContext(), builder.getId().getMemberName(), store, LOG,
+            this.name);
 
         messageRetrySupport = new ShardTransactionMessageRetrySupport(this);
     }
index edf05f2..e4be06e 100644 (file)
@@ -16,6 +16,8 @@ import java.util.Map.Entry;
 import javax.annotation.concurrent.NotThreadSafe;
 import org.opendaylight.controller.cluster.access.concepts.LocalHistoryIdentifier;
 import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
+import org.opendaylight.controller.cluster.datastore.persisted.ShardDataTreeSnapshot;
+import org.opendaylight.controller.cluster.datastore.persisted.MetadataShardDataTreeSnapshot;
 import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker.DataChangeScope;
 import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeListener;
 import org.opendaylight.controller.md.sal.dom.api.DOMDataTreeChangeListener;
@@ -84,6 +86,18 @@ public class ShardDataTree extends ShardDataTreeTransactionParent {
         this.schemaContext = Preconditions.checkNotNull(schemaContext);
     }
 
+    ShardDataTreeSnapshot takeRecoverySnapshot() {
+        return new MetadataShardDataTreeSnapshot(dataTree.takeSnapshot().readNode(YangInstanceIdentifier.EMPTY).get());
+    }
+
+    void applyRecoveryTransaction(final ReadWriteShardDataTreeTransaction transaction) throws DataValidationFailedException {
+        final DataTreeModification snapshot = transaction.getSnapshot();
+        snapshot.ready();
+
+        dataTree.validate(snapshot);
+        dataTree.commit(dataTree.prepare(snapshot));
+    }
+
     private ShardDataTreeTransactionChain ensureTransactionChain(final LocalHistoryIdentifier localHistoryIdentifier) {
         ShardDataTreeTransactionChain chain = transactionChains.get(localHistoryIdentifier);
         if (chain == null) {
index ec526d4..7986886 100644 (file)
@@ -9,17 +9,10 @@
 package org.opendaylight.controller.cluster.datastore;
 
 import akka.actor.ActorRef;
-import akka.actor.PoisonPill;
-import com.google.common.base.Optional;
 import com.google.common.base.Preconditions;
 import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shard.ShardStats;
-import org.opendaylight.controller.cluster.datastore.messages.CreateSnapshot;
 import org.opendaylight.controller.cluster.datastore.messages.DataExists;
 import org.opendaylight.controller.cluster.datastore.messages.ReadData;
-import org.opendaylight.controller.cluster.datastore.utils.SerializationUtils;
-import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshotReply;
-import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
-import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
 
 /**
  * @author: syedbahm
@@ -36,33 +29,15 @@ public class ShardReadTransaction extends ShardTransaction {
 
     @Override
     public void handleReceive(Object message) {
-        if (message instanceof CreateSnapshot) {
-            createSnapshot();
-        } else if(ReadData.isSerializedType(message)) {
+        if (ReadData.isSerializedType(message)) {
             readData(transaction, ReadData.fromSerializable(message));
-        } else if(DataExists.isSerializedType(message)) {
+        } else if (DataExists.isSerializedType(message)) {
             dataExists(transaction, DataExists.fromSerializable(message));
         } else {
             super.handleReceive(message);
         }
     }
 
-    private void createSnapshot() {
-
-        // This is a special message sent by the shard to send back a serialized snapshot of the whole
-        // data store tree. This transaction was created for that purpose only so we can
-        // self-destruct after sending the reply.
-
-        final ActorRef sender = getSender();
-        final ActorRef self = getSelf();
-        final Optional<NormalizedNode<?, ?>> result = transaction.getSnapshot().readNode(YangInstanceIdentifier.EMPTY);
-
-        byte[] serialized = SerializationUtils.serializeNormalizedNode(result.get());
-        sender.tell(new CaptureSnapshotReply(serialized), self);
-
-        self.tell(PoisonPill.getInstance(), self);
-    }
-
     @Override
     protected AbstractShardDataTreeTransaction<?> getDOMStoreTransaction() {
         return transaction;
index 8aacec8..c533759 100644 (file)
@@ -8,16 +8,17 @@
 package org.opendaylight.controller.cluster.datastore;
 
 import com.google.common.base.Preconditions;
+import com.google.common.base.Throwables;
 import java.io.File;
 import java.io.IOException;
 import java.util.Map.Entry;
 import java.util.Optional;
-import org.opendaylight.controller.cluster.datastore.persisted.DataTreeCandidateSupplier;
 import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
+import org.opendaylight.controller.cluster.datastore.persisted.ShardDataTreeSnapshot;
+import org.opendaylight.controller.cluster.datastore.persisted.DataTreeCandidateSupplier;
 import org.opendaylight.controller.cluster.datastore.utils.DataTreeModificationOutput;
 import org.opendaylight.controller.cluster.datastore.utils.NormalizedNodeXMLOutput;
 import org.opendaylight.controller.cluster.datastore.utils.PruningDataTreeModification;
-import org.opendaylight.controller.cluster.datastore.utils.SerializationUtils;
 import org.opendaylight.controller.cluster.raft.RaftActorRecoveryCohort;
 import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload;
 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
@@ -116,10 +117,20 @@ class ShardRecoveryCoordinator implements RaftActorRecoveryCohort {
     public void applyRecoverySnapshot(final byte[] snapshotBytes) {
         log.debug("{}: Applying recovered snapshot", shardName);
 
-        final NormalizedNode<?, ?> node = SerializationUtils.deserializeNormalizedNode(snapshotBytes);
+        final ShardDataTreeSnapshot snapshot;
+        try {
+            snapshot = ShardDataTreeSnapshot.deserialize(snapshotBytes);
+        } catch (IOException e) {
+            log.error("{}: failed to deserialize snapshot", e);
+            throw Throwables.propagate(e);
+        }
+
         final PruningDataTreeModification tx = new PruningDataTreeModification(store.newModification(),
                 store.getDataTree(), store.getSchemaContext());
+
+        final NormalizedNode<?, ?> node = snapshot.getRootNode().orElse(null);
         tx.write(YangInstanceIdentifier.EMPTY, node);
+
         try {
             commitTransaction(tx);
         } catch (Exception e) {
index d3d8409..7812d70 100644 (file)
@@ -7,8 +7,11 @@
  */
 package org.opendaylight.controller.cluster.datastore;
 
+import akka.actor.ActorContext;
 import akka.actor.ActorRef;
 import com.google.common.base.Preconditions;
+import java.io.IOException;
+import java.util.Optional;
 import java.util.concurrent.ExecutionException;
 import org.opendaylight.controller.cluster.access.concepts.ClientIdentifier;
 import org.opendaylight.controller.cluster.access.concepts.FrontendIdentifier;
@@ -16,8 +19,8 @@ import org.opendaylight.controller.cluster.access.concepts.FrontendType;
 import org.opendaylight.controller.cluster.access.concepts.LocalHistoryIdentifier;
 import org.opendaylight.controller.cluster.access.concepts.MemberName;
 import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
-import org.opendaylight.controller.cluster.datastore.messages.CreateSnapshot;
-import org.opendaylight.controller.cluster.datastore.utils.SerializationUtils;
+import org.opendaylight.controller.cluster.datastore.actors.ShardSnapshotActor;
+import org.opendaylight.controller.cluster.datastore.persisted.ShardDataTreeSnapshot;
 import org.opendaylight.controller.cluster.raft.RaftActorSnapshotCohort;
 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
@@ -30,67 +33,68 @@ import org.slf4j.Logger;
  */
 class ShardSnapshotCohort implements RaftActorSnapshotCohort {
     private static final FrontendType SNAPSHOT_APPLY = FrontendType.forName("snapshot-apply");
-    private static final FrontendType SNAPSHOT_READ = FrontendType.forName("snapshot-read");
 
-    private final ShardTransactionActorFactory transactionActorFactory;
     private final LocalHistoryIdentifier applyHistoryId;
-    private final LocalHistoryIdentifier readHistoryId;
+    private final ActorRef snapshotActor;
     private final ShardDataTree store;
     private final String logId;
     private final Logger log;
 
     private long applyCounter;
-    private long readCounter;
 
-    ShardSnapshotCohort(MemberName memberName, ShardTransactionActorFactory transactionActorFactory, ShardDataTree store,
-            Logger log, String logId) {
-        this.transactionActorFactory = Preconditions.checkNotNull(transactionActorFactory);
+    private ShardSnapshotCohort(final LocalHistoryIdentifier applyHistoryId, final ActorRef snapshotActor,
+            final ShardDataTree store, final Logger log, final String logId) {
+        this.applyHistoryId = Preconditions.checkNotNull(applyHistoryId);
+        this.snapshotActor = Preconditions.checkNotNull(snapshotActor);
         this.store = Preconditions.checkNotNull(store);
         this.log = log;
         this.logId = logId;
-
-        this.applyHistoryId = new LocalHistoryIdentifier(ClientIdentifier.create(
-            FrontendIdentifier.create(memberName, SNAPSHOT_APPLY), 0), 0);
-        this.readHistoryId = new LocalHistoryIdentifier(ClientIdentifier.create(
-            FrontendIdentifier.create(memberName, SNAPSHOT_READ), 0), 0);
     }
 
-    @Override
-    public void createSnapshot(ActorRef actorRef) {
-        // Create a transaction actor. We are really going to treat the transaction as a worker
-        // so that this actor does not get block building the snapshot. THe transaction actor will
-        // after processing the CreateSnapshot message.
+    static ShardSnapshotCohort create(final ActorContext actorContext, final MemberName memberName,
+            final ShardDataTree store, final Logger log, final String logId) {
+        final LocalHistoryIdentifier applyHistoryId = new LocalHistoryIdentifier(ClientIdentifier.create(
+            FrontendIdentifier.create(memberName, SNAPSHOT_APPLY), 0), 0);
+        final String snapshotActorName = "shard-" + memberName.getName() + ':' + "snapshot-read";
 
-        ActorRef createSnapshotTransaction = transactionActorFactory.newShardTransaction(
-                TransactionType.READ_ONLY, new TransactionIdentifier(readHistoryId, readCounter++));
+        // Create a snapshot actor. This actor will act as a worker to offload snapshot serialization for all
+        // requests.
+        final ActorRef snapshotActor = actorContext.actorOf(ShardSnapshotActor.props(), snapshotActorName);
 
-        createSnapshotTransaction.tell(CreateSnapshot.INSTANCE, actorRef);
+        return new ShardSnapshotCohort(applyHistoryId, snapshotActor, store, log, logId);
     }
 
     @Override
-    public void applySnapshot(byte[] snapshotBytes) {
-        // Since this will be done only on Recovery or when this actor is a Follower
-        // we can safely commit everything in here. We not need to worry about event notifications
-        // as they would have already been disabled on the follower
+    public void createSnapshot(final ActorRef actorRef) {
+        // Forward the request to the snapshot actor
+        ShardSnapshotActor.requestSnapshot(snapshotActor, store.takeRecoverySnapshot(), actorRef);
+    }
 
-        log.info("{}: Applying snapshot", logId);
+    private void deserializeAndApplySnapshot(final byte[] snapshotBytes) {
+        final ShardDataTreeSnapshot snapshot;
+        try {
+            snapshot = ShardDataTreeSnapshot.deserialize(snapshotBytes);
+        } catch (IOException e) {
+            log.error("{}: Failed to deserialize snapshot", logId, e);
+            return;
+        }
 
         try {
-            ReadWriteShardDataTreeTransaction transaction = store.newReadWriteTransaction(
+            final ReadWriteShardDataTreeTransaction transaction = store.newReadWriteTransaction(
                 new TransactionIdentifier(applyHistoryId, applyCounter++));
 
-            NormalizedNode<?, ?> node = SerializationUtils.deserializeNormalizedNode(snapshotBytes);
-
             // delete everything first
             transaction.getSnapshot().delete(YangInstanceIdentifier.EMPTY);
 
-            // Add everything from the remote node back
-            transaction.getSnapshot().write(YangInstanceIdentifier.EMPTY, node);
-            syncCommitTransaction(transaction);
-        } catch (InterruptedException | ExecutionException e) {
+            final Optional<NormalizedNode<?, ?>> maybeNode = snapshot.getRootNode();
+            if (maybeNode.isPresent()) {
+                // Add everything from the remote node back
+                transaction.getSnapshot().write(YangInstanceIdentifier.EMPTY, maybeNode.get());
+            }
+
+            store.applyRecoveryTransaction(transaction);
+        } catch (Exception e) {
             log.error("{}: An exception occurred when applying snapshot", logId, e);
-        } finally {
-            log.info("{}: Done applying snapshot", logId);
         }
 
     }
@@ -101,4 +105,15 @@ class ShardSnapshotCohort implements RaftActorSnapshotCohort {
         commitCohort.preCommit().get();
         commitCohort.commit().get();
     }
+
+    @Override
+    public void applySnapshot(final byte[] snapshotBytes) {
+        // Since this will be done only on Recovery or when this actor is a Follower
+        // we can safely commit everything in here. We not need to worry about event notifications
+        // as they would have already been disabled on the follower
+
+        log.info("{}: Applying snapshot", logId);
+        deserializeAndApplySnapshot(snapshotBytes);
+        log.info("{}: Done applying snapshot", logId);
+    }
 }
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/actors/ShardSnapshotActor.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/actors/ShardSnapshotActor.java
new file mode 100644 (file)
index 0000000..00bb424
--- /dev/null
@@ -0,0 +1,68 @@
+/*
+ * Copyright (c) 2016 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.datastore.actors;
+
+import akka.actor.ActorRef;
+import akka.actor.Props;
+import com.google.common.base.Preconditions;
+import org.opendaylight.controller.cluster.common.actor.AbstractUntypedActorWithMetering;
+import org.opendaylight.controller.cluster.datastore.persisted.ShardDataTreeSnapshot;
+import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshotReply;
+
+/**
+ * This is an offload actor, which is given an isolated snapshot of the data tree. It performs the potentially
+ * time-consuming operation of serializing the snapshot.
+ *
+ * @author Robert Varga
+ */
+public final class ShardSnapshotActor extends AbstractUntypedActorWithMetering {
+    // Internal message
+    private static final class SerializeSnapshot {
+        private final ShardDataTreeSnapshot snapshot;
+        private final ActorRef replyTo;
+
+        SerializeSnapshot(final ShardDataTreeSnapshot snapshot, final ActorRef replyTo) {
+            this.snapshot = Preconditions.checkNotNull(snapshot);
+            this.replyTo = Preconditions.checkNotNull(replyTo);
+        }
+
+        ShardDataTreeSnapshot getSnapshot() {
+            return snapshot;
+        }
+
+        ActorRef getReplyTo() {
+            return replyTo;
+        }
+    }
+
+    //actor name override used for metering. This does not change the "real" actor name
+    private static final String ACTOR_NAME_FOR_METERING = "shard-snapshot";
+
+    private ShardSnapshotActor() {
+        super(ACTOR_NAME_FOR_METERING);
+    }
+
+    @Override
+    protected void handleReceive(final Object message) throws Exception {
+        if (message instanceof SerializeSnapshot) {
+            final SerializeSnapshot request = (SerializeSnapshot) message;
+            request.getReplyTo().tell(new CaptureSnapshotReply(request.getSnapshot().serialize()), ActorRef.noSender());
+        } else {
+            unknownMessage(message);
+        }
+    }
+
+    public static void requestSnapshot(final ActorRef snapshotActor, ShardDataTreeSnapshot snapshot,
+            final ActorRef replyTo) {
+        snapshotActor.tell(new SerializeSnapshot(snapshot, replyTo), ActorRef.noSender());
+    }
+
+    public static Props props() {
+        return Props.create(ShardSnapshotActor.class);
+    }
+}
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/CreateSnapshot.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/CreateSnapshot.java
deleted file mode 100644 (file)
index c0d19af..0000000
+++ /dev/null
@@ -1,19 +0,0 @@
-/*
- * Copyright (c) 2015 Brocade Communications Systems, Inc. and others.  All rights reserved.
- *
- * This program and the accompanying materials are made available under the
- * terms of the Eclipse Public License v1.0 which accompanies this distribution,
- * and is available at http://www.eclipse.org/legal/epl-v10.html
- */
-package org.opendaylight.controller.cluster.datastore.messages;
-
-/**
- * Message sent to a transaction actor to create a snapshot of the data store.
- *
- * @author Thomas Pantelis
- */
-public class CreateSnapshot {
-    // Note: This class does not need to Serializable as it's only sent locally.
-
-    public static final CreateSnapshot INSTANCE = new CreateSnapshot();
-}
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/persisted/AbstractVersionedShardDataTreeSnapshot.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/persisted/AbstractVersionedShardDataTreeSnapshot.java
new file mode 100644 (file)
index 0000000..0d92eac
--- /dev/null
@@ -0,0 +1,101 @@
+/*
+ * Copyright (c) 2016 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.datastore.persisted;
+
+import com.google.common.base.Verify;
+import java.io.ByteArrayOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.util.Optional;
+import javax.annotation.Nonnull;
+import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * An internally-versioned {@link ShardDataTreeSnapshot}. This class is an intermediate implementation-private
+ * class.
+ *
+ * @author Robert Varga
+ */
+abstract class AbstractVersionedShardDataTreeSnapshot extends ShardDataTreeSnapshot {
+    private static final Logger LOG = LoggerFactory.getLogger(AbstractVersionedShardDataTreeSnapshot.class);
+
+    static ShardDataTreeSnapshot deserialize(final DataInputStream is) throws IOException {
+        final PayloadVersion version = PayloadVersion.readFrom(is);
+        switch (version) {
+            case BORON:
+                // Boron snapshots use Java Serialization
+                try (final ObjectInputStream ois = new ObjectInputStream(is)) {
+                    return (ShardDataTreeSnapshot) ois.readObject();
+                } catch (ClassNotFoundException e) {
+                    LOG.error("Failed to serialize data tree snapshot", e);
+                    throw new IOException("Snapshot failed to deserialize", e);
+                }
+            case TEST_FUTURE_VERSION:
+            case TEST_PAST_VERSION:
+                // These versions are never returned and this code is effectively dead
+                break;
+        }
+
+        // Not included as default in above switch to ensure we get warnings when new versions are added
+        throw new IOException("Encountered unhandled version" + version);
+    }
+
+    @Override
+    public final Optional<NormalizedNode<?, ?>> getRootNode() {
+        return Optional.of(Verify.verifyNotNull(rootNode(), "Snapshot %s returned non-present root node", getClass()));
+    }
+
+    /**
+     * Return the root node.
+     *
+     * @return The root node.
+     */
+    abstract @Nonnull NormalizedNode<?, ?> rootNode();
+
+    /**
+     * Return the snapshot payload version. Implementations of this method should return a constant.
+     *
+     * @return Snapshot payload version
+     */
+    abstract @Nonnull PayloadVersion version();
+
+    private void versionedSerialize(final DataOutputStream dos, final PayloadVersion version) throws IOException {
+        switch (version) {
+            case BORON:
+                // Boron snapshots use Java Serialization
+                try (ObjectOutputStream oos = new ObjectOutputStream(dos)) {
+                    oos.writeObject(this);
+                }
+                return;
+            case TEST_FUTURE_VERSION:
+            case TEST_PAST_VERSION:
+                break;
+
+        }
+
+        throw new IOException("Encountered unhandled version" + version);
+    }
+
+    @Override
+    public final byte[] serialize() throws IOException {
+        try (final ByteArrayOutputStream bos = new ByteArrayOutputStream()) {
+            try (final DataOutputStream dos = new DataOutputStream(bos)) {
+                final PayloadVersion version = version();
+                version.writeTo(dos);
+                versionedSerialize(dos, version);
+            }
+
+            return bos.toByteArray();
+        }
+    }
+}
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/persisted/MetadataShardDataTreeSnapshot.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/persisted/MetadataShardDataTreeSnapshot.java
new file mode 100644 (file)
index 0000000..682c0b7
--- /dev/null
@@ -0,0 +1,111 @@
+/*
+ * Copyright (c) 2016 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.datastore.persisted;
+
+import com.google.common.annotations.Beta;
+import com.google.common.base.Preconditions;
+import com.google.common.base.Verify;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.ImmutableMap.Builder;
+import java.io.Externalizable;
+import java.io.IOException;
+import java.io.ObjectInput;
+import java.io.ObjectOutput;
+import java.io.Serializable;
+import java.util.Map;
+import org.opendaylight.controller.cluster.datastore.utils.SerializationUtils;
+import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
+
+/**
+ * An {@link AbstractVersionedShardDataTreeSnapshot} which contains additional metadata.
+ *
+ * @author Robert Varga
+ */
+@Beta
+public final class MetadataShardDataTreeSnapshot extends AbstractVersionedShardDataTreeSnapshot implements Serializable {
+    private static final class Proxy implements Externalizable {
+        private static final long serialVersionUID = 1L;
+
+        private Map<Class<? extends ShardDataTreeSnapshotMetadata>, ShardDataTreeSnapshotMetadata> metadata;
+        private NormalizedNode<?, ?> rootNode;
+
+        public Proxy() {
+            // For Externalizable
+        }
+
+        Proxy(final MetadataShardDataTreeSnapshot snapshot) {
+            this.rootNode = snapshot.getRootNode().get();
+            this.metadata = snapshot.getMetadata();
+        }
+
+        @Override
+        public void writeExternal(final ObjectOutput out) throws IOException {
+            out.writeInt(metadata.size());
+            for (ShardDataTreeSnapshotMetadata m : metadata.values()) {
+                out.writeObject(m);
+            }
+
+            SerializationUtils.serializeNormalizedNode(rootNode, out);
+        }
+
+        @Override
+        public void readExternal(final ObjectInput in) throws IOException, ClassNotFoundException {
+            final int metaSize = in.readInt();
+            Preconditions.checkArgument(metaSize >= 0, "Invalid negative metadata map length %s", metaSize);
+
+            // Default pre-allocate is 4, which should be fine
+            final Builder<Class<? extends ShardDataTreeSnapshotMetadata>, ShardDataTreeSnapshotMetadata> metaBuilder =
+                    ImmutableMap.builder();
+            for (int i = 0; i < metaSize; ++i) {
+                final ShardDataTreeSnapshotMetadata m = (ShardDataTreeSnapshotMetadata) in.readObject();
+                metaBuilder.put(m.getClass(), m);
+            }
+
+            metadata = metaBuilder.build();
+            rootNode = Verify.verifyNotNull(SerializationUtils.deserializeNormalizedNode(in));
+        }
+
+        private Object readResolve() {
+            return new MetadataShardDataTreeSnapshot(rootNode, metadata);
+        }
+    }
+
+    private static final long serialVersionUID = 1L;
+
+    private final Map<Class<? extends ShardDataTreeSnapshotMetadata>, ShardDataTreeSnapshotMetadata> metadata;
+    private final NormalizedNode<?, ?> rootNode;
+
+    public MetadataShardDataTreeSnapshot(final NormalizedNode<?, ?> rootNode) {
+        this(rootNode, ImmutableMap.of());
+    }
+
+    public MetadataShardDataTreeSnapshot(final NormalizedNode<?, ?> rootNode,
+            final Map<Class<? extends ShardDataTreeSnapshotMetadata>, ShardDataTreeSnapshotMetadata> metadata) {
+        this.rootNode = Preconditions.checkNotNull(rootNode);
+        this.metadata = ImmutableMap.copyOf(metadata);
+    }
+
+    public Map<Class<? extends ShardDataTreeSnapshotMetadata>, ShardDataTreeSnapshotMetadata> getMetadata() {
+        return metadata;
+    }
+
+    @Override
+    NormalizedNode<?, ?> rootNode() {
+        return rootNode;
+    }
+
+    @Override
+    PayloadVersion version() {
+        return PayloadVersion.BORON;
+    }
+
+    private Object writeReplace() {
+        return new Proxy(this);
+    }
+
+}
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/persisted/PreBoronShardDataTreeSnapshot.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/persisted/PreBoronShardDataTreeSnapshot.java
new file mode 100644 (file)
index 0000000..cd0bd65
--- /dev/null
@@ -0,0 +1,39 @@
+/*
+ * Copyright (c) 2016 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.datastore.persisted;
+
+import com.google.common.annotations.Beta;
+import java.util.Optional;
+import javax.annotation.Nullable;
+import org.opendaylight.controller.cluster.datastore.utils.SerializationUtils;
+import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
+
+/**
+ * Legacy data tree snapshot used in versions prior to Boron, which contains only the data.
+ *
+ * @author Robert Varga
+ */
+@Beta
+public final class PreBoronShardDataTreeSnapshot extends ShardDataTreeSnapshot {
+    private final NormalizedNode<?, ?> rootNode;
+
+    @Deprecated
+    public PreBoronShardDataTreeSnapshot(final @Nullable NormalizedNode<?, ?> rootNode) {
+        this.rootNode = rootNode;
+    }
+
+    @Override
+    public Optional<NormalizedNode<?, ?>> getRootNode() {
+        return Optional.ofNullable(rootNode);
+    }
+
+    @Override
+    public byte[] serialize() {
+        return SerializationUtils.serializeNormalizedNode(rootNode);
+    }
+}
\ No newline at end of file
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/persisted/ShardDataTreeSnapshot.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/persisted/ShardDataTreeSnapshot.java
new file mode 100644 (file)
index 0000000..ef90101
--- /dev/null
@@ -0,0 +1,110 @@
+/*
+ * Copyright (c) 2016 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.datastore.persisted;
+
+import com.google.common.annotations.Beta;
+import java.io.ByteArrayInputStream;
+import java.io.DataInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.Optional;
+import javax.annotation.Nonnull;
+import org.opendaylight.controller.cluster.datastore.utils.SerializationUtils;
+import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Abstract base class for snapshots of the ShardDataTree.
+ *
+ * @author Robert Varga
+ */
+@Beta
+public abstract class ShardDataTreeSnapshot {
+    private static final Logger LOG = LoggerFactory.getLogger(ShardDataTreeSnapshot.class);
+
+    ShardDataTreeSnapshot() {
+        // Hidden to prevent subclassing from outside of this package
+    }
+
+    public static ShardDataTreeSnapshot deserialize(final byte[] bytes) throws IOException {
+        /**
+         * Unfortunately versions prior to Boron did not include any way to evolve the snapshot format and contained
+         * only the raw data stored in the datastore. Furthermore utilities involved do not check if the array is
+         * completely consumed, which has a nasty side-effect when coupled with the fact that PayloadVersion writes
+         * a short value.
+         *
+         * Since our versions fit into a single byte, we end up writing the 0 as the first byte, which would be
+         * interpreted as 'not present' by the old snapshot format, which uses writeBoolean/readBoolean. A further
+         * complication is that readBoolean interprets any non-zero value as true, hence we cannot use a wild value
+         * to cause it to fail.
+         */
+        if (isLegacyStream(bytes)) {
+            return deserializeLegacy(bytes);
+        }
+
+        try {
+            try (final InputStream is = new ByteArrayInputStream(bytes)) {
+                try (final DataInputStream dis = new DataInputStream(is)) {
+                    final ShardDataTreeSnapshot ret = AbstractVersionedShardDataTreeSnapshot.deserialize(dis);
+
+                    // Make sure we consume all bytes, otherwise something went very wrong
+                    final int bytesLeft = dis.available();
+                    if (bytesLeft != 0) {
+                        throw new IOException("Deserialization left " + bytesLeft + " in the buffer");
+                    }
+
+
+                    return ret;
+                }
+            }
+        } catch (IOException e) {
+            LOG.debug("Failed to deserialize versioned stream, attempting pre-Lithium ProtoBuf", e);
+            return deserializeLegacy(bytes);
+        }
+    }
+
+    /**
+     * Get the root data node contained in this snapshot.
+     *
+     * @return An optional root node.
+     */
+    public abstract Optional<NormalizedNode<?, ?>> getRootNode();
+
+    /**
+     * Serialize this snapshot into a byte array for persistence.
+     *
+     * @return Serialized snapshot
+     * @throws IOException when a serialization problem occurs
+     */
+    public abstract @Nonnull byte[] serialize() throws IOException;
+
+    private static boolean isLegacyStream(final byte[] bytes) {
+        if (bytes.length < 2) {
+            // Versioned streams have at least two bytes
+            return true;
+        }
+
+        /*
+         * The stream could potentially be a versioned stream. Here we rely on the signature marker available
+         * in org.opendaylight.controller.cluster.datastore.node.utils.stream.TokenTypes.
+         *
+         * For an old stream to be this long, the first byte has to be non-zero and the second byte has to be 0xAB.
+         *
+         * For a versioned stream, that translates to at least version 427 -- giving us at least 421 further versions
+         * before this check breaks.
+         */
+        return bytes[0] != 0 && bytes[1] == (byte)0xAB;
+    }
+
+    @Deprecated
+    private static ShardDataTreeSnapshot deserializeLegacy(final byte[] bytes) {
+        return new PreBoronShardDataTreeSnapshot(SerializationUtils.deserializeNormalizedNode(bytes));
+    }
+}
+
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/persisted/ShardDataTreeSnapshotMetadata.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/persisted/ShardDataTreeSnapshotMetadata.java
new file mode 100644 (file)
index 0000000..a20ec4e
--- /dev/null
@@ -0,0 +1,46 @@
+/*
+ * Copyright (c) 2016 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.datastore.persisted;
+
+import com.google.common.base.Verify;
+import java.io.Externalizable;
+import java.io.Serializable;
+import javax.annotation.Nonnull;
+
+/**
+ * Base class for various bits of metadata attached to a {@link MetadataShardDataTreeSnapshot}. This class is not
+ * an interface because we want to make sure all subclasses implement the externalizable proxy pattern, for which
+ * we need to force {@link #readResolve()} to be abstract.
+ *
+ * All concrete subclasses of this class should be final so as to form a distinct set of possible metadata. Since
+ * metadata is serialized along with {@link MetadataShardDataTreeSnapshot}, this set is part of the serialization format
+ * guarded by {@link PayloadVersion}.
+ *
+ * If a new metadata type is introduced or a type is removed, {@link PayloadVersion} needs to be bumped to ensure
+ * compatibility.
+ *
+ * @author Robert Varga
+ */
+public abstract class ShardDataTreeSnapshotMetadata implements Serializable {
+    private static final long serialVersionUID = 1L;
+
+    ShardDataTreeSnapshotMetadata() {
+        // Prevent subclassing from outside of this package
+    }
+
+    final Object writeReplace() {
+        return Verify.verifyNotNull(externalizableProxy(), "Null externalizable proxy from %s", getClass());
+    }
+
+    /**
+     * Return an Externalizable proxy
+     *
+     * @return Externalizable proxy, may not be null
+     */
+    protected abstract @Nonnull Externalizable externalizableProxy();
+}
index b6f464a..a42a628 100644 (file)
@@ -52,7 +52,7 @@ import org.opendaylight.controller.cluster.datastore.messages.ForwardedReadyTran
 import org.opendaylight.controller.cluster.datastore.modification.MutableCompositeModification;
 import org.opendaylight.controller.cluster.datastore.modification.WriteModification;
 import org.opendaylight.controller.cluster.datastore.persisted.CommitTransactionPayload;
-import org.opendaylight.controller.cluster.datastore.utils.SerializationUtils;
+import org.opendaylight.controller.cluster.datastore.persisted.PreBoronShardDataTreeSnapshot;
 import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
 import org.opendaylight.controller.cluster.raft.Snapshot;
 import org.opendaylight.controller.cluster.raft.TestActorFactory;
@@ -400,7 +400,7 @@ public abstract class AbstractShardTest extends AbstractActorTest{
         final NormalizedNode<?, ?> root = readStore(testStore, YangInstanceIdentifier.EMPTY);
 
         InMemorySnapshotStore.addSnapshot(shardID.toString(), Snapshot.create(
-                SerializationUtils.serializeNormalizedNode(root),
+                new PreBoronShardDataTreeSnapshot(root).serialize(),
                 Collections.<ReplicatedLogEntry>emptyList(), 0, 1, -1, -1));
         return testStore;
     }
index f00472c..1b2657e 100644 (file)
@@ -49,8 +49,8 @@ import org.opendaylight.controller.cluster.datastore.exceptions.NotInitializedEx
 import org.opendaylight.controller.cluster.datastore.messages.DatastoreSnapshot;
 import org.opendaylight.controller.cluster.datastore.messages.FindLocalShard;
 import org.opendaylight.controller.cluster.datastore.messages.LocalShardFound;
+import org.opendaylight.controller.cluster.datastore.persisted.PreBoronShardDataTreeSnapshot;
 import org.opendaylight.controller.cluster.datastore.utils.MockDataChangeListener;
-import org.opendaylight.controller.cluster.datastore.utils.SerializationUtils;
 import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
 import org.opendaylight.controller.cluster.raft.Snapshot;
 import org.opendaylight.controller.cluster.raft.utils.InMemoryJournal;
@@ -1171,7 +1171,7 @@ public class DistributedDataStoreIntegrationTest {
             NormalizedNode<?, ?> root = AbstractShardTest.readStore(dataTree.getDataTree(),
                     YangInstanceIdentifier.EMPTY);
 
-            Snapshot carsSnapshot = Snapshot.create(SerializationUtils.serializeNormalizedNode(root),
+            Snapshot carsSnapshot = Snapshot.create(new PreBoronShardDataTreeSnapshot(root).serialize(),
                     Collections.<ReplicatedLogEntry>emptyList(), 2, 1, 2, 1, 1, "member-1");
 
             NormalizedNode<?, ?> peopleNode = PeopleModel.create();
@@ -1179,7 +1179,7 @@ public class DistributedDataStoreIntegrationTest {
             AbstractShardTest.writeToStore(dataTree, PeopleModel.BASE_PATH, peopleNode);
             root = AbstractShardTest.readStore(dataTree.getDataTree(), YangInstanceIdentifier.EMPTY);
 
-            Snapshot peopleSnapshot = Snapshot.create(SerializationUtils.serializeNormalizedNode(root),
+            Snapshot peopleSnapshot = Snapshot.create(new PreBoronShardDataTreeSnapshot(root).serialize(),
                     Collections.<ReplicatedLogEntry>emptyList(), 2, 1, 2, 1, 1, "member-1");
 
             restoreFromSnapshot = new DatastoreSnapshot(name, null, Arrays.asList(
index 79ea5d4..1ceb2c7 100644 (file)
@@ -15,7 +15,7 @@ import java.io.IOException;
 import org.junit.Before;
 import org.junit.Test;
 import org.opendaylight.controller.cluster.datastore.persisted.CommitTransactionPayload;
-import org.opendaylight.controller.cluster.datastore.utils.SerializationUtils;
+import org.opendaylight.controller.cluster.datastore.persisted.PreBoronShardDataTreeSnapshot;
 import org.opendaylight.controller.md.cluster.datastore.model.CarsModel;
 import org.opendaylight.controller.md.cluster.datastore.model.PeopleModel;
 import org.opendaylight.controller.md.cluster.datastore.model.SchemaContextHelper;
@@ -140,18 +140,9 @@ public class ShardRecoveryCoordinatorTest extends AbstractTest {
         modification.merge(CarsModel.BASE_PATH, CarsModel.create());
         modification.merge(PeopleModel.BASE_PATH, PeopleModel.create());
         modification.ready();
-        final DataTreeCandidateTip prepare = dataTree.prepare(modification);
+        dataTree.commit(dataTree.prepare(modification));
 
-        dataTree.commit(prepare);
-
-        snapshot = dataTree.takeSnapshot();
-
-        modification = snapshot.newModification();
-
-        final Optional<NormalizedNode<?, ?>> optional = modification.readNode(YangInstanceIdentifier.EMPTY);
-
-        final byte[] bytes = SerializationUtils.serializeNormalizedNode(optional.get());
-
-        return bytes;
+        return new PreBoronShardDataTreeSnapshot(dataTree.takeSnapshot().readNode(YangInstanceIdentifier.EMPTY).get())
+                .serialize();
     }
 }
\ No newline at end of file
index 87cfae8..9c6eb25 100644 (file)
@@ -36,6 +36,7 @@ import com.google.common.base.Stopwatch;
 import com.google.common.util.concurrent.Futures;
 import com.google.common.util.concurrent.ListenableFuture;
 import com.google.common.util.concurrent.Uninterruptibles;
+import java.io.IOException;
 import java.util.Collections;
 import java.util.HashSet;
 import java.util.Set;
@@ -78,9 +79,10 @@ import org.opendaylight.controller.cluster.datastore.modification.DeleteModifica
 import org.opendaylight.controller.cluster.datastore.modification.MergeModification;
 import org.opendaylight.controller.cluster.datastore.modification.MutableCompositeModification;
 import org.opendaylight.controller.cluster.datastore.modification.WriteModification;
+import org.opendaylight.controller.cluster.datastore.persisted.PreBoronShardDataTreeSnapshot;
+import org.opendaylight.controller.cluster.datastore.persisted.ShardDataTreeSnapshot;
 import org.opendaylight.controller.cluster.datastore.utils.MockDataChangeListener;
 import org.opendaylight.controller.cluster.datastore.utils.MockDataTreeChangeListener;
-import org.opendaylight.controller.cluster.datastore.utils.SerializationUtils;
 import org.opendaylight.controller.cluster.notifications.RegisterRoleChangeListener;
 import org.opendaylight.controller.cluster.notifications.RegisterRoleChangeListenerReply;
 import org.opendaylight.controller.cluster.raft.RaftActorContext;
@@ -425,7 +427,7 @@ public class ShardTest extends AbstractShardTest {
         final YangInstanceIdentifier root = YangInstanceIdentifier.EMPTY;
         final NormalizedNode<?,?> expected = readStore(store, root);
 
-        final Snapshot snapshot = Snapshot.create(SerializationUtils.serializeNormalizedNode(expected),
+        final Snapshot snapshot = Snapshot.create(new PreBoronShardDataTreeSnapshot(expected).serialize(),
                 Collections.<ReplicatedLogEntry>emptyList(), 1, 2, 3, 4);
 
         shard.underlyingActor().getRaftActorSnapshotCohort().applySnapshot(snapshot.getState());
@@ -2122,7 +2124,7 @@ public class ShardTest extends AbstractShardTest {
             awaitAndValidateSnapshot(expectedRoot);
         }
 
-        private void awaitAndValidateSnapshot(NormalizedNode<?,?> expectedRoot) throws InterruptedException {
+        private void awaitAndValidateSnapshot(NormalizedNode<?,?> expectedRoot) throws InterruptedException, IOException {
             assertEquals("Snapshot saved", true, latch.get().await(5, TimeUnit.SECONDS));
 
             assertTrue("Invalid saved snapshot " + savedSnapshot.get(),
@@ -2134,11 +2136,9 @@ public class ShardTest extends AbstractShardTest {
             savedSnapshot.set(null);
         }
 
-        private void verifySnapshot(final Snapshot snapshot, final NormalizedNode<?,?> expectedRoot) {
-
-            final NormalizedNode<?, ?> actual = SerializationUtils.deserializeNormalizedNode(snapshot.getState());
+        private void verifySnapshot(final Snapshot snapshot, final NormalizedNode<?,?> expectedRoot) throws IOException {
+            final NormalizedNode<?, ?> actual = ShardDataTreeSnapshot.deserialize(snapshot.getState()).getRootNode().get();
             assertEquals("Root node", expectedRoot, actual);
-
         }};
     }
 
index 0cb9046..55060d1 100644 (file)
@@ -33,7 +33,6 @@ import org.opendaylight.controller.cluster.datastore.messages.BatchedModificatio
 import org.opendaylight.controller.cluster.datastore.messages.CloseTransaction;
 import org.opendaylight.controller.cluster.datastore.messages.CloseTransactionReply;
 import org.opendaylight.controller.cluster.datastore.messages.CommitTransactionReply;
-import org.opendaylight.controller.cluster.datastore.messages.CreateSnapshot;
 import org.opendaylight.controller.cluster.datastore.messages.DataExists;
 import org.opendaylight.controller.cluster.datastore.messages.DataExistsReply;
 import org.opendaylight.controller.cluster.datastore.messages.ReadData;
@@ -42,8 +41,6 @@ import org.opendaylight.controller.cluster.datastore.messages.ReadyTransactionRe
 import org.opendaylight.controller.cluster.datastore.modification.DeleteModification;
 import org.opendaylight.controller.cluster.datastore.modification.MergeModification;
 import org.opendaylight.controller.cluster.datastore.modification.WriteModification;
-import org.opendaylight.controller.cluster.datastore.utils.SerializationUtils;
-import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshotReply;
 import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
 import org.opendaylight.yangtools.yang.data.api.schema.ContainerNode;
@@ -343,35 +340,6 @@ public class ShardTransactionTest extends AbstractActorTest {
         }};
     }
 
-    @Test
-    public void testOnReceiveCreateSnapshot() throws Exception {
-        new JavaTestKit(getSystem()) {{
-            ShardTest.writeToStore(store.getDataTree(), TestModel.TEST_PATH,
-                    ImmutableNodes.containerNode(TestModel.TEST_QNAME));
-
-            NormalizedNode<?,?> expectedRoot = ShardTest.readStore(store.getDataTree(),
-                    YangInstanceIdentifier.EMPTY);
-
-            final ActorRef transaction = newTransactionActor(TransactionType.READ_ONLY, readOnlyTransaction(),
-                    "testOnReceiveCreateSnapshot");
-
-            watch(transaction);
-
-            transaction.tell(CreateSnapshot.INSTANCE, getRef());
-
-            CaptureSnapshotReply reply = expectMsgClass(duration("3 seconds"), CaptureSnapshotReply.class);
-
-            assertNotNull("getSnapshot is null", reply.getSnapshot());
-
-            NormalizedNode<?,?> actualRoot = SerializationUtils.deserializeNormalizedNode(
-                    reply.getSnapshot());
-
-            assertEquals("Root node", expectedRoot, actualRoot);
-
-            expectTerminated(duration("3 seconds"), transaction);
-        }};
-    }
-
     @Test
     public void testReadWriteTxOnReceiveCloseTransaction() throws Exception {
         new JavaTestKit(getSystem()) {{
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/actors/ShardSnapshotActorTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/actors/ShardSnapshotActorTest.java
new file mode 100644 (file)
index 0000000..47128f0
--- /dev/null
@@ -0,0 +1,64 @@
+/*
+ * Copyright (c) 2016 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.datastore.actors;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+import akka.actor.ActorRef;
+import akka.testkit.JavaTestKit;
+import java.util.Optional;
+import org.junit.Test;
+import org.opendaylight.controller.cluster.datastore.AbstractActorTest;
+import org.opendaylight.controller.cluster.datastore.persisted.ShardDataTreeSnapshot;
+import org.opendaylight.controller.cluster.datastore.persisted.MetadataShardDataTreeSnapshot;
+import org.opendaylight.controller.cluster.datastore.persisted.PreBoronShardDataTreeSnapshot;
+import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshotReply;
+import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
+import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
+import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
+
+public class ShardSnapshotActorTest extends AbstractActorTest {
+    private static final NormalizedNode<?, ?> DATA = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
+
+    private static void testSerializeSnapshot(final String testName, final ShardDataTreeSnapshot snapshot)
+            throws Exception {
+        new JavaTestKit(getSystem()) {{
+
+            final ActorRef snapshotActor = getSystem().actorOf(ShardSnapshotActor.props(), testName);
+            watch(snapshotActor);
+
+            final NormalizedNode<?, ?> expectedRoot = snapshot.getRootNode().get();
+
+            ShardSnapshotActor.requestSnapshot(snapshotActor, snapshot, getRef());
+
+            final CaptureSnapshotReply reply = expectMsgClass(duration("3 seconds"), CaptureSnapshotReply.class);
+            assertNotNull("getSnapshot is null", reply.getSnapshot());
+
+            final ShardDataTreeSnapshot actual = ShardDataTreeSnapshot.deserialize(reply.getSnapshot());
+            assertNotNull(actual);
+            assertEquals(snapshot.getClass(), actual.getClass());
+
+            final Optional<NormalizedNode<?, ?>> maybeNode = actual.getRootNode();
+            assertTrue(maybeNode.isPresent());
+
+            assertEquals("Root node", expectedRoot, maybeNode.get());
+        }};
+    }
+
+    @Test
+    public void testSerializeBoronSnapshot() throws Exception {
+        testSerializeSnapshot("testSerializeBoronSnapshot", new MetadataShardDataTreeSnapshot(DATA));
+    }
+
+    @Deprecated
+    @Test
+    public void testSerializeLegacySnapshot() throws Exception {
+        testSerializeSnapshot("testSerializeLegacySnapshot", new PreBoronShardDataTreeSnapshot(DATA));
+    }
+}
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/persisted/ShardDataTreeSnapshotTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/persisted/ShardDataTreeSnapshotTest.java
new file mode 100644 (file)
index 0000000..f25153f
--- /dev/null
@@ -0,0 +1,137 @@
+/*
+ * Copyright (c) 2016 Brocade Communications Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.datastore.persisted;
+
+import static org.junit.Assert.assertEquals;
+import com.google.common.collect.ImmutableMap;
+import java.io.Externalizable;
+import java.io.IOException;
+import java.io.ObjectInput;
+import java.io.ObjectOutput;
+import java.util.Map;
+import java.util.Optional;
+import org.junit.Test;
+import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
+import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
+import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
+import org.opendaylight.yangtools.yang.data.impl.schema.builder.impl.ImmutableContainerNodeBuilder;
+
+/**
+ * Unit tests for ShardDataTreeSnapshot.
+ *
+ * @author Thomas Pantelis
+ */
+public class ShardDataTreeSnapshotTest {
+
+    @Test
+    public void testShardDataTreeSnapshotWithNoMetadata() throws Exception {
+        NormalizedNode<?, ?> expectedNode = ImmutableContainerNodeBuilder.create().withNodeIdentifier(
+                new YangInstanceIdentifier.NodeIdentifier(TestModel.TEST_QNAME)).
+                withChild(ImmutableNodes.leafNode(TestModel.DESC_QNAME, "foo")).build();
+
+        MetadataShardDataTreeSnapshot snapshot = new MetadataShardDataTreeSnapshot(expectedNode);
+        byte[] serialized = snapshot.serialize();
+
+        ShardDataTreeSnapshot deserialized = ShardDataTreeSnapshot.deserialize(serialized);
+
+        Optional<NormalizedNode<?, ?>> actualNode = deserialized.getRootNode();
+        assertEquals("rootNode present", true, actualNode.isPresent());
+        assertEquals("rootNode", expectedNode, actualNode.get());
+        assertEquals("Deserialized type", MetadataShardDataTreeSnapshot.class, deserialized.getClass());
+        assertEquals("Metadata size", 0, ((MetadataShardDataTreeSnapshot)deserialized).getMetadata().size());
+    }
+
+    @Test
+    public void testShardDataTreeSnapshotWithMetadata() throws Exception {
+        NormalizedNode<?, ?> expectedNode = ImmutableContainerNodeBuilder.create().withNodeIdentifier(
+                new YangInstanceIdentifier.NodeIdentifier(TestModel.TEST_QNAME)).
+                withChild(ImmutableNodes.leafNode(TestModel.DESC_QNAME, "foo")).build();
+
+        Map<Class<? extends ShardDataTreeSnapshotMetadata>, ShardDataTreeSnapshotMetadata> expMetadata =
+                ImmutableMap.of(TestShardDataTreeSnapshotMetadata.class, new TestShardDataTreeSnapshotMetadata("test"));
+        MetadataShardDataTreeSnapshot snapshot = new MetadataShardDataTreeSnapshot(expectedNode, expMetadata);
+        byte[] serialized = snapshot.serialize();
+
+        ShardDataTreeSnapshot deserialized = ShardDataTreeSnapshot.deserialize(serialized);
+
+        Optional<NormalizedNode<?, ?>> actualNode = deserialized.getRootNode();
+        assertEquals("rootNode present", true, actualNode.isPresent());
+        assertEquals("rootNode", expectedNode, actualNode.get());
+        assertEquals("Deserialized type", MetadataShardDataTreeSnapshot.class, deserialized.getClass());
+        assertEquals("Metadata", expMetadata, ((MetadataShardDataTreeSnapshot)deserialized).getMetadata());
+    }
+
+    @Test
+    public void testPreBoronShardDataTreeSnapshot() throws Exception {
+        NormalizedNode<?, ?> expectedNode = ImmutableContainerNodeBuilder.create().withNodeIdentifier(
+                new YangInstanceIdentifier.NodeIdentifier(TestModel.TEST_QNAME)).
+                withChild(ImmutableNodes.leafNode(TestModel.DESC_QNAME, "foo")).build();
+
+        PreBoronShardDataTreeSnapshot snapshot = new PreBoronShardDataTreeSnapshot(expectedNode);
+        byte[] serialized = snapshot.serialize();
+
+        ShardDataTreeSnapshot deserialized = ShardDataTreeSnapshot.deserialize(serialized);
+
+        Optional<NormalizedNode<?, ?>> actualNode = deserialized.getRootNode();
+        assertEquals("rootNode present", true, actualNode.isPresent());
+        assertEquals("rootNode", expectedNode, actualNode.get());
+        assertEquals("Deserialized type", PreBoronShardDataTreeSnapshot.class, deserialized.getClass());
+    }
+
+    static class TestShardDataTreeSnapshotMetadata extends ShardDataTreeSnapshotMetadata {
+        private static final long serialVersionUID = 1L;
+
+        private final String data;
+
+        TestShardDataTreeSnapshotMetadata(String data) {
+            this.data = data;
+        }
+
+        @Override
+        protected Externalizable externalizableProxy() {
+            return new Proxy(data);
+        }
+
+        @Override
+        public int hashCode() {
+            return data.hashCode();
+        }
+
+        @Override
+        public boolean equals(Object obj) {
+            return data.equals(((TestShardDataTreeSnapshotMetadata)obj).data);
+        }
+
+
+        private static class Proxy implements Externalizable {
+            private String data;
+
+            public Proxy() {
+            }
+
+            Proxy(String data) {
+                this.data = data;
+            }
+
+            @Override
+            public void writeExternal(ObjectOutput out) throws IOException {
+                out.writeObject(data);
+            }
+
+            @Override
+            public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
+                data = (String) in.readObject();
+            }
+
+            Object readResolve() {
+                return new TestShardDataTreeSnapshotMetadata(data);
+            }
+        }
+    }
+}