Bug 7521: Convert DatastoreSnapshot.ShardSnapshot to store Snapshot 24/50924/9
authorTom Pantelis <tpanteli@brocade.com>
Tue, 24 Jan 2017 09:19:45 +0000 (04:19 -0500)
committerTom Pantelis <tpanteli@brocade.com>
Fri, 17 Feb 2017 14:11:19 +0000 (09:11 -0500)
Converted the new DatastoreSnapshot.ShardSnapshot class to store a
Snapshot instance instead of a serialized byte[] so the Snapshot
can be serialized directly to the file when saved.

The prior DatastoreSnapshotList was deprecated and readResolves to
the new DatastoreSnapshotList.

Change-Id: I656eca93bfed07ae99055c67cc3f195b25ea2b11
Signed-off-by: Tom Pantelis <tpanteli@brocade.com>
23 files changed:
opendaylight/md-sal/sal-akka-raft-example/src/main/java/org/opendaylight/controller/cluster/example/ExampleActor.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/GetSnapshotReplyActor.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActorRecoveryCohort.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActorRecoverySupport.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActorSnapshotMessageSupport.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/client/messages/GetSnapshotReply.java
opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/MockRaftActor.java
opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/RaftActorTest.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardRecoveryCoordinator.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardSnapshotCohort.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/actors/ShardSnapshotActor.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/DatastoreSnapshotList.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/persisted/AbstractVersionedShardDataTreeSnapshot.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/persisted/DatastoreSnapshot.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/persisted/PreBoronShardDataTreeSnapshot.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/persisted/ShardDataTreeSnapshot.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/persisted/ShardSnapshotState.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DatastoreSnapshotRestoreTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DistributedDataStoreIntegrationTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/actors/ShardSnapshotActorTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/messages/DatastoreSnapshotListTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/persisted/ShardDataTreeSnapshotTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/shardmanager/ShardManagerGetSnapshotReplyActorTest.java

index 663d400..5cd4c14 100644 (file)
@@ -197,7 +197,7 @@ public class ExampleActor extends RaftActor implements RaftActorRecoveryCohort,
     }
 
     @Override
-    public byte[] getRestoreFromSnapshot() {
+    public Snapshot getRestoreFromSnapshot() {
         return null;
     }
 
index c94b780..203a612 100644 (file)
@@ -14,7 +14,6 @@ import akka.actor.ReceiveTimeout;
 import akka.actor.UntypedActor;
 import com.google.common.base.Preconditions;
 import java.util.concurrent.TimeoutException;
-import org.apache.commons.lang3.SerializationUtils;
 import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshot;
 import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshotReply;
 import org.opendaylight.controller.cluster.raft.client.messages.GetSnapshotReply;
@@ -53,8 +52,7 @@ class GetSnapshotReplyActor extends UntypedActor {
 
             LOG.debug("{}: Received CaptureSnapshotReply, sending {}", params.id, snapshot);
 
-            params.replyToActor.tell(new GetSnapshotReply(params.id, SerializationUtils.serialize(snapshot)),
-                    getSelf());
+            params.replyToActor.tell(new GetSnapshotReply(params.id, snapshot), getSelf());
             getSelf().tell(PoisonPill.getInstance(), getSelf());
         } else if (message instanceof ReceiveTimeout) {
             LOG.warn("{}: Got ReceiveTimeout for inactivity - did not receive CaptureSnapshotReply within {} ms",
index 9803f1e..f948769 100644 (file)
@@ -49,12 +49,12 @@ public interface RaftActorRecoveryCohort {
     void applyCurrentLogRecoveryBatch();
 
     /**
-     * Returns the state snapshot to restore from on recovery.
+     * Returns the snapshot to restore from on recovery.
      *
-     * @return the snapshot bytes or null if there's no snapshot to restore
+     * @return the snapshot or null if there's no snapshot to restore
      */
     @Nullable
-    byte[] getRestoreFromSnapshot();
+    Snapshot getRestoreFromSnapshot();
 
     /**
      * This method is called during recovery to de-serialize a snapshot that was persisted in the pre-Carbon format.
index df20767..17e3343 100644 (file)
@@ -10,9 +10,6 @@ package org.opendaylight.controller.cluster.raft;
 import akka.persistence.RecoveryCompleted;
 import akka.persistence.SnapshotOffer;
 import com.google.common.base.Stopwatch;
-import java.io.ByteArrayInputStream;
-import java.io.IOException;
-import java.io.ObjectInputStream;
 import java.util.Collections;
 import org.opendaylight.controller.cluster.PersistentDataProvider;
 import org.opendaylight.controller.cluster.raft.base.messages.ApplySnapshot;
@@ -82,7 +79,7 @@ class RaftActorRecoverySupport {
 
     @SuppressWarnings("checkstyle:IllegalCatch")
     private void possiblyRestoreFromSnapshot() {
-        byte[] restoreFromSnapshot = cohort.getRestoreFromSnapshot();
+        Snapshot restoreFromSnapshot = cohort.getRestoreFromSnapshot();
         if (restoreFromSnapshot == null) {
             return;
         }
@@ -93,15 +90,9 @@ class RaftActorRecoverySupport {
             return;
         }
 
-        try (ObjectInputStream ois = new ObjectInputStream(new ByteArrayInputStream(restoreFromSnapshot))) {
-            Snapshot snapshot = (Snapshot) ois.readObject();
+        log.debug("{}: Restore snapshot: {}", context.getId(), restoreFromSnapshot);
 
-            log.debug("{}: Deserialized restore snapshot: {}", context.getId(), snapshot);
-
-            context.getSnapshotManager().apply(new ApplySnapshot(snapshot));
-        } catch (RuntimeException | ClassNotFoundException | IOException e) {
-            log.error("{}: Error deserializing snapshot restore", context.getId(), e);
-        }
+        context.getSnapshotManager().apply(new ApplySnapshot(restoreFromSnapshot));
     }
 
     private ReplicatedLog replicatedLog() {
index 4c723bf..4d68d8e 100644 (file)
@@ -14,7 +14,6 @@ import com.google.common.annotations.VisibleForTesting;
 import java.util.Collections;
 import java.util.Optional;
 import java.util.concurrent.TimeUnit;
-import org.apache.commons.lang3.SerializationUtils;
 import org.opendaylight.controller.cluster.raft.base.messages.ApplySnapshot;
 import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshot;
 import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshotReply;
@@ -125,8 +124,7 @@ class RaftActorSnapshotMessageSupport {
                     context.getTermInformation().getCurrentTerm(), context.getTermInformation().getVotedFor(),
                     context.getPeerServerInfo(true));
 
-            sender.tell(new GetSnapshotReply(context.getId(), SerializationUtils.serialize(snapshot)),
-                    context.getActor());
+            sender.tell(new GetSnapshotReply(context.getId(), snapshot), context.getActor());
         }
     }
 
index 2918c5e..b0c36ac 100644 (file)
@@ -10,6 +10,7 @@ package org.opendaylight.controller.cluster.raft.client.messages;
 import com.google.common.base.Preconditions;
 import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
 import javax.annotation.Nonnull;
+import org.opendaylight.controller.cluster.raft.persisted.Snapshot;
 
 /**
  * Reply to GetSnapshot that returns a serialized Snapshot instance.
@@ -18,9 +19,9 @@ import javax.annotation.Nonnull;
  */
 public class GetSnapshotReply {
     private final String id;
-    private final byte[] snapshot;
+    private final Snapshot snapshot;
 
-    public GetSnapshotReply(@Nonnull String id, @Nonnull byte[] snapshot) {
+    public GetSnapshotReply(@Nonnull String id, @Nonnull Snapshot snapshot) {
         this.id = Preconditions.checkNotNull(id);
         this.snapshot = Preconditions.checkNotNull(snapshot);
     }
@@ -34,12 +35,12 @@ public class GetSnapshotReply {
             + "this is OK since this class is merely a DTO and does not process the byte[] internally. "
             + "Also it would be inefficient to create a return copy as the byte[] could be large.")
     @Nonnull
-    public byte[] getSnapshot() {
+    public Snapshot getSnapshot() {
         return snapshot;
     }
 
     @Override
     public String toString() {
-        return "GetSnapshotReply [id=" + id + ", snapshot.length=" + snapshot.length + "]";
+        return "GetSnapshotReply [id=" + id + ", snapshot=" + snapshot + "]";
     }
 }
index 20adaf1..ae26383 100644 (file)
@@ -46,7 +46,7 @@ public class MockRaftActor extends RaftActor implements RaftActorRecoveryCohort,
     protected final CountDownLatch initializeBehaviorComplete = new CountDownLatch(1);
     private RaftActorRecoverySupport raftActorRecoverySupport;
     private RaftActorSnapshotMessageSupport snapshotMessageSupport;
-    private final byte[] restoreFromSnapshot;
+    private final Snapshot restoreFromSnapshot;
     final CountDownLatch snapshotCommitted = new CountDownLatch(1);
     private final Function<Runnable, Void> pauseLeaderFunction;
 
@@ -263,7 +263,7 @@ public class MockRaftActor extends RaftActor implements RaftActorRecoveryCohort,
     }
 
     @Override
-    public byte[] getRestoreFromSnapshot() {
+    public Snapshot getRestoreFromSnapshot() {
         return restoreFromSnapshot;
     }
 
@@ -288,7 +288,7 @@ public class MockRaftActor extends RaftActor implements RaftActorRecoveryCohort,
         private DataPersistenceProvider dataPersistenceProvider;
         private ActorRef roleChangeNotifier;
         private RaftActorSnapshotMessageSupport snapshotMessageSupport;
-        private byte[] restoreFromSnapshot;
+        private Snapshot restoreFromSnapshot;
         private Optional<Boolean> persistent = Optional.absent();
         private final Class<A> actorClass;
         private Function<Runnable, Void> pauseLeaderFunction;
@@ -333,7 +333,7 @@ public class MockRaftActor extends RaftActor implements RaftActorRecoveryCohort,
             return self();
         }
 
-        public T restoreFromSnapshot(byte[] newRestoreFromSnapshot) {
+        public T restoreFromSnapshot(Snapshot newRestoreFromSnapshot) {
             this.restoreFromSnapshot = newRestoreFromSnapshot;
             return self();
         }
index 3def4f0..db6fe1a 100644 (file)
@@ -52,7 +52,6 @@ import java.util.List;
 import java.util.Map;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
-import org.apache.commons.lang3.SerializationUtils;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
@@ -1050,7 +1049,7 @@ public class RaftActorTest extends AbstractActorTest {
         GetSnapshotReply reply = kit.expectMsgClass(GetSnapshotReply.class);
 
         assertEquals("getId", persistenceId, reply.getId());
-        Snapshot replySnapshot = SerializationUtils.deserialize(reply.getSnapshot());
+        Snapshot replySnapshot = reply.getSnapshot();
         assertEquals("getElectionTerm", term, replySnapshot.getElectionTerm());
         assertEquals("getElectionVotedFor", "member-1", replySnapshot.getElectionVotedFor());
         assertEquals("getLastAppliedIndex", 1L, replySnapshot.getLastAppliedIndex());
@@ -1083,7 +1082,7 @@ public class RaftActorTest extends AbstractActorTest {
         verify(mockRaftActor.snapshotCohortDelegate, never()).createSnapshot(anyObject(), anyObject());
 
         assertEquals("getId", persistenceId, reply.getId());
-        replySnapshot = SerializationUtils.deserialize(reply.getSnapshot());
+        replySnapshot = reply.getSnapshot();
         assertEquals("getElectionTerm", term, replySnapshot.getElectionTerm());
         assertEquals("getElectionVotedFor", "member-1", replySnapshot.getElectionVotedFor());
         assertEquals("getLastAppliedIndex", -1L, replySnapshot.getLastAppliedIndex());
@@ -1122,7 +1121,7 @@ public class RaftActorTest extends AbstractActorTest {
         InMemorySnapshotStore.addSnapshotSavedLatch(persistenceId);
 
         TestActorRef<MockRaftActor> raftActorRef = factory.createTestActor(MockRaftActor.builder().id(persistenceId)
-                .config(config).restoreFromSnapshot(SerializationUtils.serialize(snapshot)).props()
+                .config(config).restoreFromSnapshot(snapshot).props()
                     .withDispatcher(Dispatchers.DefaultDispatcherId()), persistenceId);
         MockRaftActor mockRaftActor = raftActorRef.underlyingActor();
 
@@ -1157,7 +1156,7 @@ public class RaftActorTest extends AbstractActorTest {
         persistenceId = factory.generateActorId("test-actor-");
 
         raftActorRef = factory.createTestActor(MockRaftActor.builder().id(persistenceId)
-                .config(config).restoreFromSnapshot(SerializationUtils.serialize(snapshot))
+                .config(config).restoreFromSnapshot(snapshot)
                 .persistent(Optional.of(Boolean.FALSE)).props()
                     .withDispatcher(Dispatchers.DefaultDispatcherId()), persistenceId);
         mockRaftActor = raftActorRef.underlyingActor();
@@ -1189,7 +1188,7 @@ public class RaftActorTest extends AbstractActorTest {
                 new MockRaftActorContext.MockPayload("B")));
 
         TestActorRef<MockRaftActor> raftActorRef = factory.createTestActor(MockRaftActor.builder().id(persistenceId)
-                .config(config).restoreFromSnapshot(SerializationUtils.serialize(snapshot)).props()
+                .config(config).restoreFromSnapshot(snapshot).props()
                     .withDispatcher(Dispatchers.DefaultDispatcherId()), persistenceId);
         MockRaftActor mockRaftActor = raftActorRef.underlyingActor();
 
index e87f083..634b6f6 100644 (file)
@@ -34,11 +34,11 @@ class ShardRecoveryCoordinator implements RaftActorRecoveryCohort {
     private final ShardDataTree store;
     private final String shardName;
     private final Logger log;
-    private final byte[] restoreFromSnapshot;
+    private final Snapshot restoreFromSnapshot;
 
     private boolean open;
 
-    ShardRecoveryCoordinator(final ShardDataTree store,  final byte[] restoreFromSnapshot, final String shardName,
+    ShardRecoveryCoordinator(final ShardDataTree store,  final Snapshot restoreFromSnapshot, final String shardName,
             final Logger log) {
         this.store = Preconditions.checkNotNull(store);
         this.shardName = Preconditions.checkNotNull(shardName);
@@ -109,7 +109,7 @@ class ShardRecoveryCoordinator implements RaftActorRecoveryCohort {
     }
 
     @Override
-    public byte[] getRestoreFromSnapshot() {
+    public Snapshot getRestoreFromSnapshot() {
         return restoreFromSnapshot;
     }
 
@@ -117,7 +117,7 @@ class ShardRecoveryCoordinator implements RaftActorRecoveryCohort {
     @Deprecated
     public State deserializePreCarbonSnapshot(byte[] from) {
         try {
-            return new ShardSnapshotState(ShardDataTreeSnapshot.deserialize(from));
+            return new ShardSnapshotState(ShardDataTreeSnapshot.deserializePreCarbon(from));
         } catch (IOException e) {
             log.error("{}: failed to deserialize snapshot", shardName, e);
             throw Throwables.propagate(e);
index 0627c0a..93e1d87 100644 (file)
@@ -12,7 +12,7 @@ import akka.actor.ActorRef;
 import com.google.common.base.Preconditions;
 import com.google.common.io.ByteSource;
 import java.io.IOException;
-import java.io.InputStream;
+import java.io.ObjectInputStream;
 import java.io.OutputStream;
 import java.util.Optional;
 import org.opendaylight.controller.cluster.access.concepts.ClientIdentifier;
@@ -95,8 +95,8 @@ class ShardSnapshotCohort implements RaftActorSnapshotCohort {
 
     @Override
     public State deserializeSnapshot(ByteSource snapshotBytes) throws IOException {
-        try (final InputStream is = snapshotBytes.openStream()) {
-            return new ShardSnapshotState(ShardDataTreeSnapshot.deserialize(is));
+        try (final ObjectInputStream in = new ObjectInputStream(snapshotBytes.openStream())) {
+            return new ShardSnapshotState(ShardDataTreeSnapshot.deserialize(in));
         }
     }
 }
index 5eeed91..4796e1c 100644 (file)
@@ -11,14 +11,13 @@ import akka.actor.ActorRef;
 import akka.actor.Props;
 import com.google.common.base.Preconditions;
 import java.io.IOException;
+import java.io.ObjectOutputStream;
 import java.io.OutputStream;
 import java.util.Optional;
 import org.opendaylight.controller.cluster.common.actor.AbstractUntypedActorWithMetering;
 import org.opendaylight.controller.cluster.datastore.persisted.ShardDataTreeSnapshot;
 import org.opendaylight.controller.cluster.datastore.persisted.ShardSnapshotState;
 import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshotReply;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 /**
  * This is an offload actor, which is given an isolated snapshot of the data tree. It performs the potentially
@@ -27,8 +26,6 @@ import org.slf4j.LoggerFactory;
  * @author Robert Varga
  */
 public final class ShardSnapshotActor extends AbstractUntypedActorWithMetering {
-    private static final Logger LOG = LoggerFactory.getLogger(ShardSnapshotActor.class);
-
     // Internal message
     private static final class SerializeSnapshot {
         private final ShardDataTreeSnapshot snapshot;
@@ -71,11 +68,11 @@ public final class ShardSnapshotActor extends AbstractUntypedActorWithMetering {
         }
     }
 
-    private static void onSerializeSnapshot(final SerializeSnapshot request) {
+    private void onSerializeSnapshot(final SerializeSnapshot request) {
         Optional<OutputStream> installSnapshotStream = request.getInstallSnapshotStream();
         if (installSnapshotStream.isPresent()) {
-            try {
-                request.getSnapshot().serialize(installSnapshotStream.get());
+            try (ObjectOutputStream out = new ObjectOutputStream(installSnapshotStream.get())) {
+                request.getSnapshot().serialize(out);
             } catch (IOException e) {
                 // TODO - we should communicate the failure in the CaptureSnapshotReply.
                 LOG.error("Error serializing snapshot", e);
index ca137d7..8400e0c 100644 (file)
@@ -7,8 +7,15 @@
  */
 package org.opendaylight.controller.cluster.datastore.messages;
 
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.io.ObjectInputStream;
 import java.util.ArrayList;
 import java.util.List;
+import org.opendaylight.controller.cluster.datastore.persisted.ShardDataTreeSnapshot;
+import org.opendaylight.controller.cluster.datastore.persisted.ShardSnapshotState;
+import org.opendaylight.controller.cluster.raft.Snapshot;
+import org.opendaylight.controller.cluster.raft.persisted.EmptyState;
 
 /**
  * Stores a list of DatastoreSnapshot instances.
@@ -26,7 +33,7 @@ public class DatastoreSnapshotList extends ArrayList<DatastoreSnapshot> {
         super(snapshots);
     }
 
-    private Object readResolve() {
+    private Object readResolve() throws IOException, ClassNotFoundException {
         List<org.opendaylight.controller.cluster.datastore.persisted.DatastoreSnapshot> snapshots =
                 new ArrayList<>(size());
         for (DatastoreSnapshot legacy: this) {
@@ -37,15 +44,32 @@ public class DatastoreSnapshotList extends ArrayList<DatastoreSnapshot> {
         return new org.opendaylight.controller.cluster.datastore.persisted.DatastoreSnapshotList(snapshots);
     }
 
-    private List<org.opendaylight.controller.cluster.datastore.persisted.DatastoreSnapshot.ShardSnapshot> fromLegacy(
-            List<DatastoreSnapshot.ShardSnapshot> from) {
+    private static List<org.opendaylight.controller.cluster.datastore.persisted.DatastoreSnapshot.ShardSnapshot>
+            fromLegacy(List<DatastoreSnapshot.ShardSnapshot> from) throws IOException, ClassNotFoundException {
         List<org.opendaylight.controller.cluster.datastore.persisted.DatastoreSnapshot.ShardSnapshot> snapshots =
                 new ArrayList<>(from.size());
         for (DatastoreSnapshot.ShardSnapshot legacy: from) {
             snapshots.add(new org.opendaylight.controller.cluster.datastore.persisted.DatastoreSnapshot.ShardSnapshot(
-                    legacy.getName(), legacy.getSnapshot()));
+                    legacy.getName(), deserialize(legacy.getSnapshot())));
         }
 
         return snapshots;
     }
+
+    private static org.opendaylight.controller.cluster.raft.persisted.Snapshot deserialize(byte[] bytes)
+            throws IOException, ClassNotFoundException {
+        try (ObjectInputStream ois = new ObjectInputStream(new ByteArrayInputStream(bytes))) {
+            Snapshot legacy = (Snapshot) ois.readObject();
+
+            org.opendaylight.controller.cluster.raft.persisted.Snapshot.State state = EmptyState.INSTANCE;
+            if (legacy.getState().length > 0) {
+                state = new ShardSnapshotState(ShardDataTreeSnapshot.deserializePreCarbon(legacy.getState()));
+            }
+
+            return org.opendaylight.controller.cluster.raft.persisted.Snapshot.create(
+                    state, legacy.getUnAppliedEntries(), legacy.getLastIndex(),
+                    legacy.getLastTerm(), legacy.getLastAppliedIndex(), legacy.getLastAppliedTerm(),
+                    legacy.getElectionTerm(), legacy.getElectionVotedFor(), legacy.getServerConfiguration());
+        }
+    }
 }
index 190c815..c12403f 100644 (file)
@@ -9,11 +9,10 @@ package org.opendaylight.controller.cluster.datastore.persisted;
 
 import com.google.common.base.Verify;
 import java.io.DataInputStream;
-import java.io.DataOutputStream;
 import java.io.IOException;
+import java.io.ObjectInput;
 import java.io.ObjectInputStream;
-import java.io.ObjectOutputStream;
-import java.io.OutputStream;
+import java.io.ObjectOutput;
 import java.util.Optional;
 import javax.annotation.Nonnull;
 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
@@ -30,7 +29,8 @@ abstract class AbstractVersionedShardDataTreeSnapshot extends ShardDataTreeSnaps
     private static final Logger LOG = LoggerFactory.getLogger(AbstractVersionedShardDataTreeSnapshot.class);
 
     @SuppressWarnings("checkstyle:FallThrough")
-    static ShardDataTreeSnapshot deserialize(final DataInputStream is) throws IOException {
+    @Deprecated
+    static ShardDataTreeSnapshot deserializePreCarbon(final DataInputStream is) throws IOException {
         final PayloadVersion version = PayloadVersion.readFrom(is);
         switch (version) {
             case BORON:
@@ -53,6 +53,30 @@ abstract class AbstractVersionedShardDataTreeSnapshot extends ShardDataTreeSnaps
         throw new IOException("Encountered unhandled version" + version);
     }
 
+    @SuppressWarnings("checkstyle:FallThrough")
+    static ShardDataTreeSnapshot versionedDeserialize(final ObjectInput in) throws IOException {
+        final PayloadVersion version = PayloadVersion.readFrom(in);
+        switch (version) {
+            case BORON:
+                // Boron snapshots use Java Serialization
+                try {
+                    return (ShardDataTreeSnapshot) in.readObject();
+                } catch (ClassNotFoundException e) {
+                    LOG.error("Failed to serialize data tree snapshot", e);
+                    throw new IOException("Snapshot failed to deserialize", e);
+                }
+            case TEST_FUTURE_VERSION:
+            case TEST_PAST_VERSION:
+                // These versions are never returned and this code is effectively dead
+                break;
+            default:
+                throw new IOException("Invalid payload version in snapshot");
+        }
+
+        // Not included as default in above switch to ensure we get warnings when new versions are added
+        throw new IOException("Encountered unhandled version" + version);
+    }
+
     @Override
     public final Optional<NormalizedNode<?, ?>> getRootNode() {
         return Optional.of(Verify.verifyNotNull(rootNode(), "Snapshot %s returned non-present root node", getClass()));
@@ -74,13 +98,11 @@ abstract class AbstractVersionedShardDataTreeSnapshot extends ShardDataTreeSnaps
     @Nonnull
     abstract PayloadVersion version();
 
-    private void versionedSerialize(final DataOutputStream dos, final PayloadVersion version) throws IOException {
+    private void versionedSerialize(final ObjectOutput out, final PayloadVersion version) throws IOException {
         switch (version) {
             case BORON:
                 // Boron snapshots use Java Serialization
-                try (ObjectOutputStream oos = new ObjectOutputStream(dos)) {
-                    oos.writeObject(this);
-                }
+                out.writeObject(this);
                 return;
             case TEST_FUTURE_VERSION:
             case TEST_PAST_VERSION:
@@ -93,11 +115,9 @@ abstract class AbstractVersionedShardDataTreeSnapshot extends ShardDataTreeSnaps
     }
 
     @Override
-    public void serialize(final OutputStream os) throws IOException {
-        try (DataOutputStream dos = new DataOutputStream(os)) {
-            final PayloadVersion version = version();
-            version.writeTo(dos);
-            versionedSerialize(dos, version);
-        }
+    public void serialize(final ObjectOutput out) throws IOException {
+        final PayloadVersion version = version();
+        version.writeTo(out);
+        versionedSerialize(out, version);
     }
 }
index 3c5e86b..9eb8a06 100644 (file)
@@ -10,10 +10,16 @@ package org.opendaylight.controller.cluster.datastore.persisted;
 import com.google.common.base.Preconditions;
 import com.google.common.collect.ImmutableList;
 import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
+import java.io.Externalizable;
+import java.io.IOException;
+import java.io.ObjectInput;
+import java.io.ObjectOutput;
 import java.io.Serializable;
+import java.util.ArrayList;
 import java.util.List;
 import javax.annotation.Nonnull;
 import javax.annotation.Nullable;
+import org.opendaylight.controller.cluster.raft.persisted.Snapshot;
 
 /**
  * Stores a snapshot of the internal state of a data store.
@@ -23,6 +29,52 @@ import javax.annotation.Nullable;
 public class DatastoreSnapshot implements Serializable {
     private static final long serialVersionUID = 1L;
 
+    private static final class Proxy implements Externalizable {
+        private static final long serialVersionUID = 1L;
+
+        private DatastoreSnapshot datastoreSnapshot;
+
+        // checkstyle flags the public modifier as redundant which really doesn't make sense since it clearly isn't
+        // redundant. It is explicitly needed for Java serialization to be able to create instances via reflection.
+        @SuppressWarnings("checkstyle:RedundantModifier")
+        public Proxy() {
+            // For Externalizable
+        }
+
+        Proxy(final DatastoreSnapshot datastoreSnapshot) {
+            this.datastoreSnapshot = datastoreSnapshot;
+        }
+
+        @Override
+        public void writeExternal(ObjectOutput out) throws IOException {
+            out.writeObject(datastoreSnapshot.type);
+            out.writeObject(datastoreSnapshot.shardManagerSnapshot);
+
+            out.writeInt(datastoreSnapshot.shardSnapshots.size());
+            for (ShardSnapshot shardSnapshot: datastoreSnapshot.shardSnapshots) {
+                out.writeObject(shardSnapshot);
+            }
+        }
+
+        @Override
+        public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
+            String type = (String)in.readObject();
+            byte[] shardManagerSnapshot = (byte[]) in.readObject();
+
+            int size = in.readInt();
+            List<ShardSnapshot> shardSnapshots = new ArrayList<>(size);
+            for (int i = 0; i < size; i++) {
+                shardSnapshots.add((ShardSnapshot) in.readObject());
+            }
+
+            datastoreSnapshot = new DatastoreSnapshot(type, shardManagerSnapshot, shardSnapshots);
+        }
+
+        private Object readResolve() {
+            return datastoreSnapshot;
+        }
+    }
+
     private final String type;
     private final byte[] shardManagerSnapshot;
     private final List<ShardSnapshot> shardSnapshots;
@@ -55,13 +107,50 @@ public class DatastoreSnapshot implements Serializable {
         return shardSnapshots;
     }
 
+    @SuppressWarnings("static-method")
+    private Object writeReplace() {
+        return new Proxy(this);
+    }
+
     public static class ShardSnapshot implements Serializable {
         private static final long serialVersionUID = 1L;
 
+        private static final class Proxy implements Externalizable {
+            private static final long serialVersionUID = 1L;
+
+            private ShardSnapshot shardSnapshot;
+
+            // checkstyle flags the public modifier as redundant which really doesn't make sense since it clearly isn't
+            // redundant. It is explicitly needed for Java serialization to be able to create instances via reflection.
+            @SuppressWarnings("checkstyle:RedundantModifier")
+            public Proxy() {
+                // For Externalizable
+            }
+
+            Proxy(final ShardSnapshot shardSnapshot) {
+                this.shardSnapshot = shardSnapshot;
+            }
+
+            @Override
+            public void writeExternal(ObjectOutput out) throws IOException {
+                out.writeObject(shardSnapshot.name);
+                out.writeObject(shardSnapshot.snapshot);
+            }
+
+            @Override
+            public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
+                shardSnapshot = new ShardSnapshot((String)in.readObject(), (Snapshot) in.readObject());
+            }
+
+            private Object readResolve() {
+                return shardSnapshot;
+            }
+        }
+
         private final String name;
-        private final byte[] snapshot;
+        private final Snapshot snapshot;
 
-        public ShardSnapshot(@Nonnull String name, @Nonnull byte[] snapshot) {
+        public ShardSnapshot(@Nonnull String name, @Nonnull Snapshot snapshot) {
             this.name = Preconditions.checkNotNull(name);
             this.snapshot = Preconditions.checkNotNull(snapshot);
         }
@@ -71,12 +160,14 @@ public class DatastoreSnapshot implements Serializable {
             return name;
         }
 
-        @SuppressFBWarnings(value = "EI_EXPOSE_REP", justification = "Exposes a mutable object stored in a field but "
-                + "this is OK since this class is merely a DTO and does not process byte[] internally. "
-                + "Also it would be inefficient to create a return copy as the byte[] could be large.")
         @Nonnull
-        public byte[] getSnapshot() {
+        public Snapshot getSnapshot() {
             return snapshot;
         }
+
+        @SuppressWarnings("static-method")
+        private Object writeReplace() {
+            return new Proxy(this);
+        }
     }
 }
index 7be0d85..2454249 100644 (file)
@@ -8,9 +8,8 @@
 package org.opendaylight.controller.cluster.datastore.persisted;
 
 import com.google.common.annotations.Beta;
-import java.io.DataOutputStream;
 import java.io.IOException;
-import java.io.OutputStream;
+import java.io.ObjectOutput;
 import java.util.Optional;
 import javax.annotation.Nullable;
 import org.opendaylight.controller.cluster.datastore.node.utils.stream.SerializationUtils;
@@ -36,9 +35,7 @@ public final class PreBoronShardDataTreeSnapshot extends ShardDataTreeSnapshot {
     }
 
     @Override
-    public void serialize(OutputStream os) throws IOException {
-        try (final DataOutputStream dos = new DataOutputStream(os)) {
-            SerializationUtils.serializeNormalizedNode(rootNode, dos);
-        }
+    public void serialize(ObjectOutput out) throws IOException {
+        SerializationUtils.serializeNormalizedNode(rootNode, out);
     }
 }
index e82ddb6..85d1143 100644 (file)
@@ -12,7 +12,8 @@ import java.io.ByteArrayInputStream;
 import java.io.DataInputStream;
 import java.io.IOException;
 import java.io.InputStream;
-import java.io.OutputStream;
+import java.io.ObjectInput;
+import java.io.ObjectOutput;
 import java.util.Optional;
 import org.opendaylight.controller.cluster.datastore.node.utils.stream.SerializationUtils;
 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
@@ -33,7 +34,7 @@ public abstract class ShardDataTreeSnapshot {
     }
 
     @Deprecated
-    public static ShardDataTreeSnapshot deserialize(final byte[] bytes) throws IOException {
+    public static ShardDataTreeSnapshot deserializePreCarbon(final byte[] bytes) throws IOException {
         /**
          * Unfortunately versions prior to Boron did not include any way to evolve the snapshot format and contained
          * only the raw data stored in the datastore. Furthermore utilities involved do not check if the array is
@@ -51,7 +52,18 @@ public abstract class ShardDataTreeSnapshot {
 
         try {
             try (InputStream is = new ByteArrayInputStream(bytes)) {
-                return deserialize(is);
+                try (DataInputStream dis = new DataInputStream(is)) {
+                    final ShardDataTreeSnapshot ret = AbstractVersionedShardDataTreeSnapshot.deserializePreCarbon(dis);
+
+                    // Make sure we consume all bytes, otherwise something went very wrong
+                    final int bytesLeft = dis.available();
+                    if (bytesLeft != 0) {
+                        throw new IOException("Deserialization left " + bytesLeft + " in the buffer");
+                    }
+
+
+                    return ret;
+                }
             }
         } catch (IOException e) {
             LOG.debug("Failed to deserialize versioned stream, attempting pre-Lithium ProtoBuf", e);
@@ -59,19 +71,17 @@ public abstract class ShardDataTreeSnapshot {
         }
     }
 
-    public static ShardDataTreeSnapshot deserialize(final InputStream is) throws IOException {
-        try (DataInputStream dis = new DataInputStream(is)) {
-            final ShardDataTreeSnapshot ret = AbstractVersionedShardDataTreeSnapshot.deserialize(dis);
+    public static ShardDataTreeSnapshot deserialize(final ObjectInput in) throws IOException {
+        final ShardDataTreeSnapshot ret = AbstractVersionedShardDataTreeSnapshot.versionedDeserialize(in);
 
-            // Make sure we consume all bytes, otherwise something went very wrong
-            final int bytesLeft = dis.available();
-            if (bytesLeft != 0) {
-                throw new IOException("Deserialization left " + bytesLeft + " in the buffer");
-            }
+        // Make sure we consume all bytes, otherwise something went very wrong
+        final int bytesLeft = in.available();
+        if (bytesLeft != 0) {
+            throw new IOException("Deserialization left " + bytesLeft + " in the buffer");
+        }
 
 
-            return ret;
-        }
+        return ret;
     }
 
     /**
@@ -81,7 +91,7 @@ public abstract class ShardDataTreeSnapshot {
      */
     public abstract Optional<NormalizedNode<?, ?>> getRootNode();
 
-    public abstract void serialize(OutputStream os) throws IOException;
+    public abstract void serialize(ObjectOutput out) throws IOException;
 
     @Deprecated
     private static boolean isLegacyStream(final byte[] bytes) {
index f56d6ce..5275582 100644 (file)
@@ -11,10 +11,8 @@ import com.google.common.base.Preconditions;
 import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
 import java.io.Externalizable;
 import java.io.IOException;
-import java.io.InputStream;
 import java.io.ObjectInput;
 import java.io.ObjectOutput;
-import java.io.OutputStream;
 import javax.annotation.Nonnull;
 import org.opendaylight.controller.cluster.raft.persisted.Snapshot;
 
@@ -44,38 +42,12 @@ public class ShardSnapshotState implements Snapshot.State {
 
         @Override
         public void writeExternal(final ObjectOutput out) throws IOException {
-            snapshotState.snapshot.serialize(toOutputStream(out));
-        }
-
-        private static OutputStream toOutputStream(final ObjectOutput out) {
-            if (out instanceof OutputStream) {
-                return (OutputStream) out;
-            }
-
-            return new OutputStream() {
-                @Override
-                public void write(final int value) throws IOException {
-                    out.write(value);
-                }
-            };
+            snapshotState.snapshot.serialize(out);
         }
 
         @Override
         public void readExternal(final ObjectInput in) throws IOException, ClassNotFoundException {
-            snapshotState = new ShardSnapshotState(ShardDataTreeSnapshot.deserialize(toInputStream(in)));
-        }
-
-        private static InputStream toInputStream(final ObjectInput in) {
-            if (in instanceof InputStream) {
-                return (InputStream) in;
-            }
-
-            return new InputStream() {
-                @Override
-                public int read() throws IOException {
-                    return in.read();
-                }
-            };
+            snapshotState = new ShardSnapshotState(ShardDataTreeSnapshot.deserialize(in));
         }
 
         private Object readResolve() {
index 30957d9..bbbd95f 100644 (file)
@@ -7,7 +7,6 @@
  */
 package org.opendaylight.controller.cluster.datastore;
 
-import static org.junit.Assert.assertArrayEquals;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNotNull;
@@ -16,15 +15,31 @@ import static org.junit.Assert.assertTrue;
 
 import java.io.File;
 import java.io.FileOutputStream;
-import java.util.ArrayList;
-import java.util.List;
+import java.math.BigInteger;
+import java.util.Arrays;
+import java.util.Collections;
 import java.util.Objects;
 import org.apache.commons.lang3.SerializationUtils;
 import org.junit.After;
 import org.junit.Test;
 import org.opendaylight.controller.cluster.datastore.persisted.DatastoreSnapshot;
-import org.opendaylight.controller.cluster.datastore.persisted.DatastoreSnapshot.ShardSnapshot;
 import org.opendaylight.controller.cluster.datastore.persisted.DatastoreSnapshotList;
+import org.opendaylight.controller.cluster.datastore.persisted.MetadataShardDataTreeSnapshot;
+import org.opendaylight.controller.cluster.datastore.persisted.ShardSnapshotState;
+import org.opendaylight.controller.cluster.datastore.shardmanager.ShardManagerSnapshot;
+import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
+import org.opendaylight.controller.cluster.raft.persisted.Snapshot;
+import org.opendaylight.controller.md.cluster.datastore.model.CarsModel;
+import org.opendaylight.controller.md.cluster.datastore.model.PeopleModel;
+import org.opendaylight.controller.md.cluster.datastore.model.SchemaContextHelper;
+import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
+import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
+import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTree;
+import org.opendaylight.yangtools.yang.data.api.schema.tree.TreeType;
+import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
+import org.opendaylight.yangtools.yang.data.impl.schema.tree.InMemoryDataTreeFactory;
+
 
 /**
  * Unit tests for DatastoreSnapshotRestore.
@@ -46,20 +61,20 @@ public class DatastoreSnapshotRestoreTest {
     public void test() throws Exception {
         assertTrue("Failed to mkdir " + restoreDirectoryPath, restoreDirectoryFile.mkdirs());
 
-        List<ShardSnapshot> shardSnapshots = new ArrayList<>();
-        shardSnapshots.add(new ShardSnapshot("cars", new byte[]{1,2}));
-        shardSnapshots.add(new ShardSnapshot("people", new byte[]{3,4}));
-        final DatastoreSnapshot configSnapshot = new DatastoreSnapshot("config", null, shardSnapshots);
+        final DatastoreSnapshot configSnapshot = new DatastoreSnapshot("config",
+                SerializationUtils.serialize(newShardManagerSnapshot("config-one", "config-two")),
+                Arrays.asList(new DatastoreSnapshot.ShardSnapshot("config-one", newSnapshot(CarsModel.BASE_PATH,
+                        CarsModel.newCarsNode(CarsModel.newCarsMapNode(CarsModel.newCarEntry("optima",
+                            BigInteger.valueOf(20000L)),CarsModel.newCarEntry("sportage",
+                                BigInteger.valueOf(30000L)))))),
+                        new DatastoreSnapshot.ShardSnapshot("config-two", newSnapshot(PeopleModel.BASE_PATH,
+                            PeopleModel.emptyContainer()))));
 
-        shardSnapshots = new ArrayList<>();
-        shardSnapshots.add(new ShardSnapshot("cars", new byte[]{5,6}));
-        shardSnapshots.add(new ShardSnapshot("people", new byte[]{7,8}));
-        shardSnapshots.add(new ShardSnapshot("bikes", new byte[]{9,0}));
-        DatastoreSnapshot operSnapshot = new DatastoreSnapshot("oper", null, shardSnapshots);
+        DatastoreSnapshot operSnapshot = new DatastoreSnapshot("oper",
+                null, Arrays.asList(new DatastoreSnapshot.ShardSnapshot("oper-one", newSnapshot(TestModel.TEST_PATH,
+                        ImmutableNodes.containerNode(TestModel.TEST_QNAME)))));
 
-        DatastoreSnapshotList snapshotList = new DatastoreSnapshotList();
-        snapshotList.add(configSnapshot);
-        snapshotList.add(operSnapshot);
+        DatastoreSnapshotList snapshotList = new DatastoreSnapshotList(Arrays.asList(configSnapshot, operSnapshot));
 
         try (FileOutputStream fos = new FileOutputStream(backupFile)) {
             SerializationUtils.serialize(snapshotList, fos);
@@ -67,8 +82,8 @@ public class DatastoreSnapshotRestoreTest {
 
         DatastoreSnapshotRestore instance = DatastoreSnapshotRestore.instance(restoreDirectoryPath);
 
-        verifySnapshot(configSnapshot, instance.getAndRemove("config"));
-        verifySnapshot(operSnapshot, instance.getAndRemove("oper"));
+        assertDatastoreSnapshotEquals(configSnapshot, instance.getAndRemove("config"));
+        assertDatastoreSnapshotEquals(operSnapshot, instance.getAndRemove("oper"));
 
         assertNull("DatastoreSnapshot was not removed", instance.getAndRemove("config"));
 
@@ -79,7 +94,7 @@ public class DatastoreSnapshotRestoreTest {
         assertNull("Expected null DatastoreSnapshot", instance.getAndRemove("oper"));
     }
 
-    private static void verifySnapshot(DatastoreSnapshot expected, DatastoreSnapshot actual) {
+    private static void assertDatastoreSnapshotEquals(DatastoreSnapshot expected, DatastoreSnapshot actual) {
         assertNotNull("DatastoreSnapshot is null", actual);
         assertEquals("getType", expected.getType(), actual.getType());
         assertTrue("ShardManager snapshots don't match", Objects.deepEquals(expected.getShardManagerSnapshot(),
@@ -88,8 +103,35 @@ public class DatastoreSnapshotRestoreTest {
         for (int i = 0; i < expected.getShardSnapshots().size(); i++) {
             assertEquals("ShardSnapshot " + (i + 1) + " name", expected.getShardSnapshots().get(i).getName(),
                     actual.getShardSnapshots().get(i).getName());
-            assertArrayEquals("ShardSnapshot " + (i + 1) + " snapshot",
+            assertSnapshotEquals("ShardSnapshot " + (i + 1) + " snapshot",
                     expected.getShardSnapshots().get(i).getSnapshot(), actual.getShardSnapshots().get(i).getSnapshot());
         }
     }
+
+    private static void assertSnapshotEquals(String prefix, Snapshot expected, Snapshot actual) {
+        assertEquals(prefix + " lastIndex", expected.getLastIndex(), actual.getLastIndex());
+        assertEquals(prefix + " lastTerm", expected.getLastTerm(), actual.getLastTerm());
+        assertEquals(prefix + " lastAppliedIndex", expected.getLastAppliedIndex(), actual.getLastAppliedIndex());
+        assertEquals(prefix + " lastAppliedTerm", expected.getLastAppliedTerm(), actual.getLastAppliedTerm());
+        assertEquals(prefix + " unAppliedEntries", expected.getUnAppliedEntries(), actual.getUnAppliedEntries());
+        assertEquals(prefix + " electionTerm", expected.getElectionTerm(), actual.getElectionTerm());
+        assertEquals(prefix + " electionVotedFor", expected.getElectionVotedFor(), actual.getElectionVotedFor());
+        assertEquals(prefix + " Root node", ((ShardSnapshotState)expected.getState()).getSnapshot().getRootNode(),
+                ((ShardSnapshotState)actual.getState()).getSnapshot().getRootNode());
+    }
+
+    private static ShardManagerSnapshot newShardManagerSnapshot(String... shards) {
+        return ShardManagerSnapshot.forShardList(Arrays.asList(shards));
+    }
+
+    private static Snapshot newSnapshot(YangInstanceIdentifier path, NormalizedNode<?, ?> node)
+            throws Exception {
+        DataTree dataTree = InMemoryDataTreeFactory.getInstance().create(TreeType.OPERATIONAL);
+        dataTree.setSchemaContext(SchemaContextHelper.full());
+        AbstractShardTest.writeToStore(dataTree, path, node);
+        NormalizedNode<?, ?> root = AbstractShardTest.readStore(dataTree, YangInstanceIdentifier.EMPTY);
+
+        return Snapshot.create(new ShardSnapshotState(new MetadataShardDataTreeSnapshot(root)),
+                Collections.<ReplicatedLogEntry>emptyList(), 2, 1, 2, 1, 1, "member-1", null);
+    }
 }
index 5bd67b4..e5b14b0 100644 (file)
@@ -31,7 +31,9 @@ import com.google.common.util.concurrent.MoreExecutors;
 import com.google.common.util.concurrent.Uninterruptibles;
 import com.typesafe.config.ConfigFactory;
 import java.io.ByteArrayOutputStream;
+import java.io.DataOutputStream;
 import java.io.IOException;
+import java.io.ObjectOutputStream;
 import java.math.BigInteger;
 import java.util.ArrayList;
 import java.util.Arrays;
@@ -53,6 +55,7 @@ import org.opendaylight.controller.cluster.datastore.messages.FindLocalShard;
 import org.opendaylight.controller.cluster.datastore.messages.LocalShardFound;
 import org.opendaylight.controller.cluster.datastore.persisted.DatastoreSnapshot;
 import org.opendaylight.controller.cluster.datastore.persisted.MetadataShardDataTreeSnapshot;
+import org.opendaylight.controller.cluster.datastore.persisted.PayloadVersion;
 import org.opendaylight.controller.cluster.datastore.persisted.ShardSnapshotState;
 import org.opendaylight.controller.cluster.datastore.utils.MockDataChangeListener;
 import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
@@ -1271,12 +1274,9 @@ public class DistributedDataStoreIntegrationTest {
                         new ShardSnapshotState(new MetadataShardDataTreeSnapshot(root)),
                         Collections.<ReplicatedLogEntry>emptyList(), 2, 1, 2, 1, 1, "member-1", null);
 
-                restoreFromSnapshot = new DatastoreSnapshot(name, null,
-                        Arrays.asList(
-                                new DatastoreSnapshot.ShardSnapshot("cars",
-                                        org.apache.commons.lang3.SerializationUtils.serialize(carsSnapshot)),
-                                new DatastoreSnapshot.ShardSnapshot("people",
-                                        org.apache.commons.lang3.SerializationUtils.serialize(peopleSnapshot))));
+                restoreFromSnapshot = new DatastoreSnapshot(name, null, Arrays.asList(
+                        new DatastoreSnapshot.ShardSnapshot("cars", carsSnapshot),
+                        new DatastoreSnapshot.ShardSnapshot("people", peopleSnapshot)));
 
                 try (AbstractDataStore dataStore = setupDistributedDataStore(name, "module-shards-member1.conf",
                         true, "cars", "people")) {
@@ -1311,8 +1311,15 @@ public class DistributedDataStoreIntegrationTest {
                 AbstractShardTest.writeToStore(dataTree, CarsModel.BASE_PATH, carsNode);
                 NormalizedNode<?, ?> root = AbstractShardTest.readStore(dataTree, YangInstanceIdentifier.EMPTY);
 
+                MetadataShardDataTreeSnapshot shardSnapshot = new MetadataShardDataTreeSnapshot(root);
                 final ByteArrayOutputStream bos = new ByteArrayOutputStream();
-                new MetadataShardDataTreeSnapshot(root).serialize(bos);
+                try (final DataOutputStream dos = new DataOutputStream(bos)) {
+                    PayloadVersion.BORON.writeTo(dos);
+                    try (ObjectOutputStream oos = new ObjectOutputStream(dos)) {
+                        oos.writeObject(shardSnapshot);
+                    }
+                }
+
                 final org.opendaylight.controller.cluster.raft.Snapshot snapshot =
                     org.opendaylight.controller.cluster.raft.Snapshot.create(bos.toByteArray(),
                             Collections.<ReplicatedLogEntry>emptyList(), 2, 1, 2, 1, 1, "member-1", null);
index 5cf8183..ae94997 100644 (file)
@@ -14,6 +14,7 @@ import akka.actor.ActorRef;
 import akka.testkit.JavaTestKit;
 import java.io.ByteArrayInputStream;
 import java.io.ByteArrayOutputStream;
+import java.io.ObjectInputStream;
 import java.util.Optional;
 import org.junit.Test;
 import org.opendaylight.controller.cluster.datastore.AbstractActorTest;
@@ -47,8 +48,12 @@ public class ShardSnapshotActorTest extends AbstractActorTest {
                 assertEquals("Snapshot", snapshot, ((ShardSnapshotState)reply.getSnapshotState()).getSnapshot());
 
                 if (installSnapshotStream != null) {
-                    final ShardDataTreeSnapshot deserialized = ShardDataTreeSnapshot.deserialize(
-                            new ByteArrayInputStream(installSnapshotStream.toByteArray()));
+                    final ShardDataTreeSnapshot deserialized;
+                    try (final ObjectInputStream in = new ObjectInputStream(new ByteArrayInputStream(
+                            installSnapshotStream.toByteArray()))) {
+                        deserialized = ShardDataTreeSnapshot.deserialize(in);
+                    }
+
                     assertEquals("Deserialized snapshot type", snapshot.getClass(), deserialized.getClass());
 
                     final Optional<NormalizedNode<?, ?>> maybeNode = deserialized.getRootNode();
index 7200901..989ed46 100644 (file)
@@ -7,31 +7,38 @@
  */
 package org.opendaylight.controller.cluster.datastore.messages;
 
-import static org.junit.Assert.assertArrayEquals;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
 
 import java.io.ByteArrayOutputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.io.ObjectOutputStream;
 import java.math.BigInteger;
 import java.util.Arrays;
 import java.util.Collections;
+import java.util.Optional;
 import org.apache.commons.lang.SerializationUtils;
 import org.junit.Test;
 import org.opendaylight.controller.cluster.datastore.AbstractShardTest;
 import org.opendaylight.controller.cluster.datastore.messages.DatastoreSnapshot.ShardSnapshot;
 import org.opendaylight.controller.cluster.datastore.persisted.MetadataShardDataTreeSnapshot;
+import org.opendaylight.controller.cluster.datastore.persisted.PayloadVersion;
+import org.opendaylight.controller.cluster.datastore.persisted.ShardDataTreeSnapshot;
+import org.opendaylight.controller.cluster.datastore.persisted.ShardSnapshotState;
 import org.opendaylight.controller.cluster.datastore.shardmanager.ShardManagerSnapshot;
 import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
 import org.opendaylight.controller.cluster.raft.Snapshot;
+import org.opendaylight.controller.cluster.raft.persisted.EmptyState;
 import org.opendaylight.controller.md.cluster.datastore.model.CarsModel;
 import org.opendaylight.controller.md.cluster.datastore.model.PeopleModel;
 import org.opendaylight.controller.md.cluster.datastore.model.SchemaContextHelper;
-import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTree;
+import org.opendaylight.yangtools.yang.data.api.schema.tree.DataValidationFailedException;
 import org.opendaylight.yangtools.yang.data.api.schema.tree.TreeType;
-import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
 import org.opendaylight.yangtools.yang.data.impl.schema.tree.InMemoryDataTreeFactory;
 
 /**
@@ -43,18 +50,20 @@ import org.opendaylight.yangtools.yang.data.impl.schema.tree.InMemoryDataTreeFac
 public class DatastoreSnapshotListTest {
     @Test
     public void testSerialization() throws Exception {
+        NormalizedNode<?, ?> legacyConfigRoot1 = toRootNode(CarsModel.BASE_PATH,
+                CarsModel.newCarsNode(CarsModel.newCarsMapNode(CarsModel.newCarEntry("optima",
+                        BigInteger.valueOf(20000L)),CarsModel.newCarEntry("sportage",
+                            BigInteger.valueOf(30000L)))));
+
+        NormalizedNode<?, ?> legacyConfigRoot2 = toRootNode(PeopleModel.BASE_PATH, PeopleModel.emptyContainer());
+
         DatastoreSnapshot legacyConfigSnapshot = new DatastoreSnapshot("config",
                 SerializationUtils.serialize(newLegacyShardManagerSnapshot("config-one", "config-two")),
-                Arrays.asList(newLegacyShardSnapshot("config-one", newLegacySnapshot(CarsModel.BASE_PATH,
-                        CarsModel.newCarsNode(CarsModel.newCarsMapNode(CarsModel.newCarEntry("optima",
-                            BigInteger.valueOf(20000L)),CarsModel.newCarEntry("sportage",
-                                BigInteger.valueOf(30000L)))))),
-                    newLegacyShardSnapshot("config-two", newLegacySnapshot(PeopleModel.BASE_PATH,
-                            PeopleModel.emptyContainer()))));
+                Arrays.asList(newLegacyShardSnapshot("config-one", newLegacySnapshot(legacyConfigRoot1)),
+                    newLegacyShardSnapshot("config-two", newLegacySnapshot(legacyConfigRoot2))));
 
         DatastoreSnapshot legacyOperSnapshot = new DatastoreSnapshot("oper",
-                null, Arrays.asList(newLegacyShardSnapshot("oper-one", newLegacySnapshot(TestModel.TEST_PATH,
-                        ImmutableNodes.containerNode(TestModel.TEST_QNAME)))));
+                null, Arrays.asList(newLegacyShardSnapshot("oper-one", newLegacySnapshot(null))));
 
         DatastoreSnapshotList legacy = new DatastoreSnapshotList(Arrays.asList(legacyConfigSnapshot,
                 legacyOperSnapshot));
@@ -64,12 +73,14 @@ public class DatastoreSnapshotListTest {
                 SerializationUtils.clone(legacy);
 
         assertEquals("DatastoreSnapshotList size", 2, cloned.size());
-        assertDatastoreSnapshotEquals(legacyConfigSnapshot, cloned.get(0));
-        assertDatastoreSnapshotEquals(legacyOperSnapshot, cloned.get(1));
+        assertDatastoreSnapshotEquals(legacyConfigSnapshot, cloned.get(0), Optional.of(legacyConfigRoot1),
+                Optional.of(legacyConfigRoot2));
+        assertDatastoreSnapshotEquals(legacyOperSnapshot, cloned.get(1), Optional.empty());
     }
 
     private void assertDatastoreSnapshotEquals(DatastoreSnapshot legacy,
-            org.opendaylight.controller.cluster.datastore.persisted.DatastoreSnapshot actual) {
+            org.opendaylight.controller.cluster.datastore.persisted.DatastoreSnapshot actual,
+            Optional<NormalizedNode<?, ?>>... shardRoots) throws IOException {
         assertEquals("Type", legacy.getType(), actual.getType());
 
         if (legacy.getShardManagerSnapshot() == null) {
@@ -91,11 +102,12 @@ public class DatastoreSnapshotListTest {
                 actualShardSnapshot = actual.getShardSnapshots().get(i);
             assertEquals("Shard name", legacyShardSnapshot.getName(), actualShardSnapshot.getName());
             assertSnapshotEquals((Snapshot) SerializationUtils.deserialize(legacyShardSnapshot.getSnapshot()),
-                    (Snapshot) SerializationUtils.deserialize(actualShardSnapshot.getSnapshot()));
+                    shardRoots[i], actualShardSnapshot.getSnapshot());
         }
     }
 
-    private static void assertSnapshotEquals(Snapshot expected, Snapshot actual) {
+    private static void assertSnapshotEquals(Snapshot expected, Optional<NormalizedNode<?, ?>> expRoot,
+            org.opendaylight.controller.cluster.raft.persisted.Snapshot actual) throws IOException {
         assertEquals("lastIndex", expected.getLastIndex(), actual.getLastIndex());
         assertEquals("lastTerm", expected.getLastTerm(), actual.getLastTerm());
         assertEquals("lastAppliedIndex", expected.getLastAppliedIndex(), actual.getLastAppliedIndex());
@@ -103,7 +115,15 @@ public class DatastoreSnapshotListTest {
         assertEquals("unAppliedEntries", expected.getUnAppliedEntries(), actual.getUnAppliedEntries());
         assertEquals("electionTerm", expected.getElectionTerm(), actual.getElectionTerm());
         assertEquals("electionVotedFor", expected.getElectionVotedFor(), actual.getElectionVotedFor());
-        assertArrayEquals("state", expected.getState(), actual.getState());
+
+        if (expRoot.isPresent()) {
+            ShardDataTreeSnapshot actualSnapshot = ((ShardSnapshotState)actual.getState()).getSnapshot();
+            assertEquals("ShardDataTreeSnapshot type", MetadataShardDataTreeSnapshot.class, actualSnapshot.getClass());
+            assertTrue("Expected root node present", actualSnapshot.getRootNode().isPresent());
+            assertEquals("Root node", expRoot.get(), actualSnapshot.getRootNode().get());
+        } else {
+            assertEquals("State type", EmptyState.class, actual.getState().getClass());
+        }
     }
 
     private static ShardManagerSnapshot newLegacyShardManagerSnapshot(String... shards) {
@@ -115,16 +135,28 @@ public class DatastoreSnapshotListTest {
         return new DatastoreSnapshot.ShardSnapshot(name, SerializationUtils.serialize(snapshot));
     }
 
-    private static Snapshot newLegacySnapshot(YangInstanceIdentifier path, NormalizedNode<?, ?> node)
+    private static Snapshot newLegacySnapshot(NormalizedNode<?, ?> root)
             throws Exception {
-        DataTree dataTree = InMemoryDataTreeFactory.getInstance().create(TreeType.OPERATIONAL);
-        dataTree.setSchemaContext(SchemaContextHelper.full());
-        AbstractShardTest.writeToStore(dataTree, path, node);
-        NormalizedNode<?, ?> root = AbstractShardTest.readStore(dataTree, YangInstanceIdentifier.EMPTY);
-
         final ByteArrayOutputStream bos = new ByteArrayOutputStream();
-        new MetadataShardDataTreeSnapshot(root).serialize(bos);
+        if (root != null) {
+            MetadataShardDataTreeSnapshot snapshot = new MetadataShardDataTreeSnapshot(root);
+            try (final DataOutputStream dos = new DataOutputStream(bos)) {
+                PayloadVersion.BORON.writeTo(dos);
+                try (ObjectOutputStream oos = new ObjectOutputStream(dos)) {
+                    oos.writeObject(snapshot);
+                }
+            }
+        }
+
         return Snapshot.create(bos.toByteArray(), Collections.<ReplicatedLogEntry>emptyList(), 2, 1, 2, 1, 1,
                 "member-1", null);
     }
+
+    private static NormalizedNode<?, ?> toRootNode(YangInstanceIdentifier path, NormalizedNode<?, ?> node)
+            throws DataValidationFailedException {
+        DataTree dataTree = InMemoryDataTreeFactory.getInstance().create(TreeType.OPERATIONAL);
+        dataTree.setSchemaContext(SchemaContextHelper.full());
+        AbstractShardTest.writeToStore(dataTree, path, node);
+        return AbstractShardTest.readStore(dataTree, YangInstanceIdentifier.EMPTY);
+    }
 }
index e909e79..d999620 100644 (file)
@@ -15,10 +15,13 @@ import java.io.ByteArrayOutputStream;
 import java.io.Externalizable;
 import java.io.IOException;
 import java.io.ObjectInput;
+import java.io.ObjectInputStream;
 import java.io.ObjectOutput;
+import java.io.ObjectOutputStream;
 import java.util.Map;
 import java.util.Optional;
 import org.junit.Test;
+import org.opendaylight.controller.cluster.datastore.node.utils.stream.SerializationUtils;
 import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
@@ -40,10 +43,14 @@ public class ShardDataTreeSnapshotTest {
 
         MetadataShardDataTreeSnapshot snapshot = new MetadataShardDataTreeSnapshot(expectedNode);
         ByteArrayOutputStream bos = new ByteArrayOutputStream();
-        snapshot.serialize(bos);
+        try (final ObjectOutputStream out = new ObjectOutputStream(bos)) {
+            snapshot.serialize(out);
+        }
 
-        ShardDataTreeSnapshot deserialized = ShardDataTreeSnapshot.deserialize(
-                new ByteArrayInputStream(bos.toByteArray()));
+        ShardDataTreeSnapshot deserialized;
+        try (final ObjectInputStream in = new ObjectInputStream(new ByteArrayInputStream(bos.toByteArray()))) {
+            deserialized = ShardDataTreeSnapshot.deserialize(in);
+        }
 
         Optional<NormalizedNode<?, ?>> actualNode = deserialized.getRootNode();
         assertEquals("rootNode present", true, actualNode.isPresent());
@@ -62,10 +69,14 @@ public class ShardDataTreeSnapshotTest {
                 ImmutableMap.of(TestShardDataTreeSnapshotMetadata.class, new TestShardDataTreeSnapshotMetadata("test"));
         MetadataShardDataTreeSnapshot snapshot = new MetadataShardDataTreeSnapshot(expectedNode, expMetadata);
         ByteArrayOutputStream bos = new ByteArrayOutputStream();
-        snapshot.serialize(bos);
+        try (final ObjectOutputStream out = new ObjectOutputStream(bos)) {
+            snapshot.serialize(out);
+        }
 
-        ShardDataTreeSnapshot deserialized = ShardDataTreeSnapshot.deserialize(
-                new ByteArrayInputStream(bos.toByteArray()));
+        ShardDataTreeSnapshot deserialized;
+        try (final ObjectInputStream in = new ObjectInputStream(new ByteArrayInputStream(bos.toByteArray()))) {
+            deserialized = ShardDataTreeSnapshot.deserialize(in);
+        }
 
         Optional<NormalizedNode<?, ?>> actualNode = deserialized.getRootNode();
         assertEquals("rootNode present", true, actualNode.isPresent());
@@ -81,11 +92,9 @@ public class ShardDataTreeSnapshotTest {
                 .withNodeIdentifier(new YangInstanceIdentifier.NodeIdentifier(TestModel.TEST_QNAME))
                 .withChild(ImmutableNodes.leafNode(TestModel.DESC_QNAME, "foo")).build();
 
-        PreBoronShardDataTreeSnapshot snapshot = new PreBoronShardDataTreeSnapshot(expectedNode);
-        ByteArrayOutputStream bos = new ByteArrayOutputStream();
-        snapshot.serialize(bos);
+        byte[] serialized = SerializationUtils.serializeNormalizedNode(expectedNode);
 
-        ShardDataTreeSnapshot deserialized = ShardDataTreeSnapshot.deserialize(bos.toByteArray());
+        ShardDataTreeSnapshot deserialized = ShardDataTreeSnapshot.deserializePreCarbon(serialized);
 
         Optional<NormalizedNode<?, ?>> actualNode = deserialized.getRootNode();
         assertEquals("rootNode present", true, actualNode.isPresent());
index f2b11b0..5085cdd 100644 (file)
@@ -15,6 +15,7 @@ import akka.actor.Status.Failure;
 import akka.actor.Terminated;
 import akka.testkit.JavaTestKit;
 import java.util.Arrays;
+import java.util.Collections;
 import java.util.List;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
@@ -24,7 +25,10 @@ import org.opendaylight.controller.cluster.datastore.AbstractActorTest;
 import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
 import org.opendaylight.controller.cluster.datastore.persisted.DatastoreSnapshot;
 import org.opendaylight.controller.cluster.datastore.persisted.DatastoreSnapshot.ShardSnapshot;
+import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
 import org.opendaylight.controller.cluster.raft.client.messages.GetSnapshotReply;
+import org.opendaylight.controller.cluster.raft.persisted.ByteState;
+import org.opendaylight.controller.cluster.raft.persisted.Snapshot;
 import scala.concurrent.duration.Duration;
 import scala.concurrent.duration.FiniteDuration;
 
@@ -47,19 +51,22 @@ public class ShardManagerGetSnapshotReplyActorTest extends AbstractActorTest {
 
         kit.watch(replyActor);
 
-        byte[] shard1Snapshot = new byte[]{1,2,3};
+        ByteState shard1SnapshotState = ByteState.of(new byte[]{1,2,3});
         replyActor.tell(new GetSnapshotReply(ShardIdentifier.create("shard1", MEMBER_1, "config").toString(),
-            shard1Snapshot), ActorRef.noSender());
+                Snapshot.create(shard1SnapshotState, Collections.<ReplicatedLogEntry>emptyList(),
+                        2, 1, 2, 1, 1, "member-1", null)), ActorRef.noSender());
 
-        byte[] shard2Snapshot = new byte[]{4,5,6};
+        ByteState shard2SnapshotState = ByteState.of(new byte[]{4,5,6});
         replyActor.tell(new GetSnapshotReply(ShardIdentifier.create("shard2", MEMBER_1, "config").toString(),
-            shard2Snapshot), ActorRef.noSender());
+                Snapshot.create(shard2SnapshotState, Collections.<ReplicatedLogEntry>emptyList(),
+                        2, 1, 2, 1, 1, "member-1", null)), ActorRef.noSender());
 
         kit.expectNoMsg(FiniteDuration.create(500, TimeUnit.MILLISECONDS));
 
-        byte[] shard3Snapshot = new byte[]{7,8,9};
+        ByteState shard3SnapshotState = ByteState.of(new byte[]{7,8,9});
         replyActor.tell(new GetSnapshotReply(ShardIdentifier.create("shard3", MEMBER_1, "config").toString(),
-            shard3Snapshot), ActorRef.noSender());
+                Snapshot.create(shard3SnapshotState, Collections.<ReplicatedLogEntry>emptyList(),
+                        2, 1, 2, 1, 1, "member-1", null)), ActorRef.noSender());
 
         DatastoreSnapshot datastoreSnapshot = kit.expectMsgClass(DatastoreSnapshot.class);
 
@@ -68,11 +75,14 @@ public class ShardManagerGetSnapshotReplyActorTest extends AbstractActorTest {
         List<ShardSnapshot> shardSnapshots = datastoreSnapshot.getShardSnapshots();
         assertEquals("ShardSnapshot size", 3, shardSnapshots.size());
         assertEquals("ShardSnapshot 1 getName", "shard1", shardSnapshots.get(0).getName());
-        assertArrayEquals("ShardSnapshot 1 getSnapshot", shard1Snapshot, shardSnapshots.get(0).getSnapshot());
+        assertEquals("ShardSnapshot 1 getSnapshot", shard1SnapshotState,
+                shardSnapshots.get(0).getSnapshot().getState());
         assertEquals("ShardSnapshot 2 getName", "shard2", shardSnapshots.get(1).getName());
-        assertArrayEquals("ShardSnapshot 2 getSnapshot", shard2Snapshot, shardSnapshots.get(1).getSnapshot());
+        assertEquals("ShardSnapshot 2 getSnapshot", shard2SnapshotState,
+                shardSnapshots.get(1).getSnapshot().getState());
         assertEquals("ShardSnapshot 3 getName", "shard3", shardSnapshots.get(2).getName());
-        assertArrayEquals("ShardSnapshot 3 getSnapshot", shard3Snapshot, shardSnapshots.get(2).getSnapshot());
+        assertEquals("ShardSnapshot 3 getSnapshot", shard3SnapshotState,
+                shardSnapshots.get(2).getSnapshot().getState());
 
         kit.expectMsgClass(Terminated.class);
     }
@@ -89,7 +99,8 @@ public class ShardManagerGetSnapshotReplyActorTest extends AbstractActorTest {
         kit.watch(replyActor);
 
         replyActor.tell(new GetSnapshotReply(ShardIdentifier.create("shard1", MEMBER_1, "config").toString(),
-            new byte[]{1,2,3}), ActorRef.noSender());
+                Snapshot.create(ByteState.of(new byte[]{1,2,3}), Collections.<ReplicatedLogEntry>emptyList(),
+                        2, 1, 2, 1, 1, "member-1", null)), ActorRef.noSender());
 
         replyActor.tell(new Failure(new RuntimeException()), ActorRef.noSender());
 

©2013 OpenDaylight, A Linux Foundation Collaborative Project. All Rights Reserved.
OpenDaylight is a registered trademark of The OpenDaylight Project, Inc.
Linux Foundation and OpenDaylight are registered trademarks of the Linux Foundation.
Linux is a registered trademark of Linus Torvalds.