Bug 7521: Convert Snapshot to store a State instance 72/50572/9
authorTom Pantelis <tpanteli@brocade.com>
Tue, 17 Jan 2017 18:24:28 +0000 (13:24 -0500)
committerTom Pantelis <tpanteli@brocade.com>
Fri, 10 Feb 2017 12:16:14 +0000 (12:16 +0000)
Created a new Snapshot class that stores the state as a Serializable
State instance instead of a byte[] to lay the groundwork for handling
snapshots > 2G. In this manner, State implementations can serialize
their data directly to the ObjectOutputStream.

The previous Snapshot class was deprecated but, unfortunately, it
can't simply "read resolve" to the new class on de-serialization
as the deserialization code doesn't know how to convert the data
from byte[] to a State instance. Therefore a new method,
deserializePreCarbonSnapshot, was added to RaftActorRecoveryCohort.

The introduction of the State interface necessitated changes throughout
the snapshot capture and snapshot recovery code paths. The Shard
implementations were also changed accordingly.

For follower snapshot install, an optional OutputStream is also passed
to createSnapshot. The new API contract is: if the OutputStream is present,
the implementation must serialize its state to the OutputStream and return
the stream instance in the CaptureSnapshotReply along with the snapshot
State instance. The snapshot State is serialized directly to the snapshot
store while the OutputStream is used to send the state data to follower(s)
in chunks. The deserializeSnapshot method is used to convert the serialized
data back to a State instance on the follower end. The serialization for
snapshot install is passed off so the cost of serialization is not charged
to the raft actor's thread.

The OutputStream is converted to a ByteSource when passed to AbstractLeader.
AbstractLeader still converts it to a byte[] but will be changed to use
the ByteSource's InputStream to chunk the data in a subsequent patch.

Also the SnapshotManager currently passes a ByteArrayOutputStream to
createSnapshot but this will be changed to a FileBackedOutputStream in a
subsequent patch.

Change-Id: I2ea30870b54478d7ef5669335ca4b444539f8d56
Signed-off-by: Tom Pantelis <tpanteli@brocade.com>
54 files changed:
opendaylight/md-sal/sal-akka-raft-example/pom.xml
opendaylight/md-sal/sal-akka-raft-example/src/main/java/org/opendaylight/controller/cluster/example/ExampleActor.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/GetSnapshotReplyActor.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/NoopRaftActorSnapshotCohort.java [new file with mode: 0644]
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActor.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActorRecoveryCohort.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActorRecoverySupport.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActorSnapshotCohort.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActorSnapshotMessageSupport.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/ReplicatedLogImpl.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/Snapshot.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/SnapshotManager.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/SnapshotState.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/base/messages/ApplySnapshot.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/base/messages/CaptureSnapshot.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/base/messages/CaptureSnapshotReply.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/base/messages/SendInstallSnapshot.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractLeader.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/Follower.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/persisted/EmptyState.java [new file with mode: 0644]
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/persisted/Snapshot.java [new file with mode: 0644]
opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/AbstractRaftActorIntegrationTest.java
opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/MigratedMessagesTest.java
opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/MockRaftActor.java
opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/MockRaftActorContext.java
opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/RaftActorRecoverySupportTest.java
opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/RaftActorServerConfigurationSupportTest.java
opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/RaftActorSnapshotMessageSupportTest.java
opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/RaftActorTest.java
opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/RecoveryIntegrationSingleNodeTest.java
opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/ReplicationAndSnapshotsIntegrationTest.java
opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/ReplicationAndSnapshotsWithLaggingFollowerIntegrationTest.java
opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/SnapshotManagerTest.java
opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/SnapshotTest.java
opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/FollowerTest.java
opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/LeaderTest.java
opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/persisted/ByteState.java [new file with mode: 0644]
opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/persisted/EmptyStateTest.java [new file with mode: 0644]
opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/persisted/SnapshotTest.java [new file with mode: 0644]
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardRecoveryCoordinator.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardSnapshotCohort.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/actors/ShardSnapshotActor.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/persisted/AbstractVersionedShardDataTreeSnapshot.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/persisted/PreBoronShardDataTreeSnapshot.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/persisted/ShardDataTreeSnapshot.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/persisted/ShardSnapshotState.java [new file with mode: 0644]
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/AbstractShardTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DistributedDataStoreIntegrationTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DistributedDataStoreRemotingIntegrationTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardRecoveryCoordinatorTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/actors/ShardSnapshotActorTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/persisted/ShardDataTreeSnapshotTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/persisted/ShardSnapshotStateTest.java [new file with mode: 0644]

index 315349a..8667253 100644 (file)
       <groupId>org.slf4j</groupId>
       <artifactId>slf4j-simple</artifactId>
     </dependency>
+
+    <dependency>
+      <groupId>com.google.guava</groupId>
+      <artifactId>guava</artifactId>
+    </dependency>
   </dependencies>
 
   <build>
index 6e8051f..663d400 100644 (file)
@@ -11,15 +11,15 @@ package org.opendaylight.controller.cluster.example;
 import akka.actor.ActorRef;
 import akka.actor.Props;
 import com.google.common.base.Optional;
-import com.google.protobuf.ByteString;
-import java.io.ByteArrayInputStream;
-import java.io.ByteArrayOutputStream;
+import com.google.common.base.Throwables;
+import com.google.common.io.ByteSource;
 import java.io.IOException;
-import java.io.ObjectInputStream;
-import java.io.ObjectOutputStream;
+import java.io.OutputStream;
+import java.io.Serializable;
 import java.util.HashMap;
 import java.util.Map;
 import javax.annotation.Nonnull;
+import org.apache.commons.lang3.SerializationUtils;
 import org.opendaylight.controller.cluster.example.messages.KeyValue;
 import org.opendaylight.controller.cluster.example.messages.KeyValueSaved;
 import org.opendaylight.controller.cluster.example.messages.PrintRole;
@@ -32,6 +32,7 @@ import org.opendaylight.controller.cluster.raft.RaftActorSnapshotCohort;
 import org.opendaylight.controller.cluster.raft.RaftState;
 import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshotReply;
 import org.opendaylight.controller.cluster.raft.behaviors.Leader;
+import org.opendaylight.controller.cluster.raft.persisted.Snapshot;
 import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload;
 import org.opendaylight.yangtools.concepts.Identifier;
 import org.opendaylight.yangtools.util.AbstractStringIdentifier;
@@ -131,21 +132,23 @@ public class ExampleActor extends RaftActor implements RaftActorRecoveryCohort,
     }
 
     @Override
-    public void createSnapshot(ActorRef actorRef) {
-        ByteString bs = null;
+    public void createSnapshot(ActorRef actorRef, java.util.Optional<OutputStream> installSnapshotStream) {
         try {
-            bs = fromObject(state);
+            if (installSnapshotStream.isPresent()) {
+                SerializationUtils.serialize((Serializable) state, installSnapshotStream.get());
+            }
         } catch (Exception e) {
             LOG.error("Exception in creating snapshot", e);
         }
-        getSelf().tell(new CaptureSnapshotReply(bs.toByteArray()), null);
+
+        getSelf().tell(new CaptureSnapshotReply(new MapState(state), installSnapshotStream), null);
     }
 
     @Override
-    public void applySnapshot(byte [] snapshot) {
+    public void applySnapshot(Snapshot.State snapshotState) {
         state.clear();
         try {
-            state.putAll((HashMap<String, String>) toObject(snapshot));
+            state.putAll(((MapState)snapshotState).state);
         } catch (Exception e) {
            LOG.error("Exception in applying snapshot", e);
         }
@@ -154,45 +157,6 @@ public class ExampleActor extends RaftActor implements RaftActorRecoveryCohort,
         }
     }
 
-    private static ByteString fromObject(Object snapshot) throws Exception {
-        ByteArrayOutputStream b = null;
-        ObjectOutputStream o = null;
-        try {
-            b = new ByteArrayOutputStream();
-            o = new ObjectOutputStream(b);
-            o.writeObject(snapshot);
-            byte[] snapshotBytes = b.toByteArray();
-            return ByteString.copyFrom(snapshotBytes);
-        } finally {
-            if (o != null) {
-                o.flush();
-                o.close();
-            }
-            if (b != null) {
-                b.close();
-            }
-        }
-    }
-
-    private static Object toObject(byte [] bs) throws ClassNotFoundException, IOException {
-        Object obj = null;
-        ByteArrayInputStream bis = null;
-        ObjectInputStream ois = null;
-        try {
-            bis = new ByteArrayInputStream(bs);
-            ois = new ObjectInputStream(bis);
-            obj = ois.readObject();
-        } finally {
-            if (bis != null) {
-                bis.close();
-            }
-            if (ois != null) {
-                ois.close();
-            }
-        }
-        return obj;
-    }
-
     @Override protected void onStateChanged() {
 
     }
@@ -224,7 +188,7 @@ public class ExampleActor extends RaftActor implements RaftActorRecoveryCohort,
     }
 
     @Override
-    public void applyRecoverySnapshot(byte[] snapshot) {
+    public void applyRecoverySnapshot(Snapshot.State snapshotState) {
     }
 
     @Override
@@ -236,4 +200,29 @@ public class ExampleActor extends RaftActor implements RaftActorRecoveryCohort,
     public byte[] getRestoreFromSnapshot() {
         return null;
     }
+
+    @Override
+    public Snapshot.State deserializeSnapshot(ByteSource snapshotBytes) {
+        try {
+            return deserializePreCarbonSnapshot(snapshotBytes.read());
+        } catch (IOException e) {
+            throw Throwables.propagate(e);
+        }
+    }
+
+    @SuppressWarnings("unchecked")
+    @Override
+    public Snapshot.State deserializePreCarbonSnapshot(byte[] from) {
+        return new MapState((Map<String, String>) SerializationUtils.deserialize(from));
+    }
+
+    private static class MapState implements Snapshot.State {
+        private static final long serialVersionUID = 1L;
+
+        Map<String, String> state;
+
+        MapState(Map<String, String> state) {
+            this.state = state;
+        }
+    }
 }
index 3aac07f..c94b780 100644 (file)
@@ -19,6 +19,7 @@ import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshot;
 import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshotReply;
 import org.opendaylight.controller.cluster.raft.client.messages.GetSnapshotReply;
 import org.opendaylight.controller.cluster.raft.persisted.ServerConfigurationPayload;
+import org.opendaylight.controller.cluster.raft.persisted.Snapshot;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import scala.concurrent.duration.Duration;
@@ -42,7 +43,8 @@ class GetSnapshotReplyActor extends UntypedActor {
     @Override
     public void onReceive(Object message) {
         if (message instanceof CaptureSnapshotReply) {
-            Snapshot snapshot = Snapshot.create(((CaptureSnapshotReply)message).getSnapshot(),
+            Snapshot snapshot = Snapshot.create(
+                    ((CaptureSnapshotReply)message).getSnapshotState(),
                     params.captureSnapshot.getUnAppliedEntries(),
                     params.captureSnapshot.getLastIndex(), params.captureSnapshot.getLastTerm(),
                     params.captureSnapshot.getLastAppliedIndex(), params.captureSnapshot.getLastAppliedTerm(),
diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/NoopRaftActorSnapshotCohort.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/NoopRaftActorSnapshotCohort.java
new file mode 100644 (file)
index 0000000..42b226f
--- /dev/null
@@ -0,0 +1,41 @@
+/*
+ * Copyright (c) 2017 Brocade Communications Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.raft;
+
+import akka.actor.ActorRef;
+import com.google.common.io.ByteSource;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.util.Optional;
+import org.opendaylight.controller.cluster.raft.persisted.EmptyState;
+import org.opendaylight.controller.cluster.raft.persisted.Snapshot.State;
+
+/**
+ * RaftActorSnapshotCohort implementation that does nothing.
+ *
+ * @author Thomas Pantelis
+ */
+public final class NoopRaftActorSnapshotCohort implements RaftActorSnapshotCohort {
+    public static final NoopRaftActorSnapshotCohort INSTANCE = new NoopRaftActorSnapshotCohort();
+
+    private NoopRaftActorSnapshotCohort() {
+    }
+
+    @Override
+    public void createSnapshot(ActorRef actorRef, Optional<OutputStream> installSnapshotStream) {
+    }
+
+    @Override
+    public void applySnapshot(State snapshotState) {
+    }
+
+    @Override
+    public State deserializeSnapshot(ByteSource snapshotBytes) throws IOException {
+        return EmptyState.INSTANCE;
+    }
+}
index 2d2fce2..b007e2d 100644 (file)
@@ -762,7 +762,7 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
     protected abstract void onRecoveryComplete();
 
     /**
-     * Returns the RaftActorSnapshotCohort to participate in persistence recovery.
+     * Returns the RaftActorSnapshotCohort to participate in snapshot captures.
      */
     @Nonnull
     protected abstract RaftActorSnapshotCohort getRaftActorSnapshotCohort();
index c376047..9803f1e 100644 (file)
@@ -7,7 +7,9 @@
  */
 package org.opendaylight.controller.cluster.raft;
 
+import javax.annotation.Nonnull;
 import javax.annotation.Nullable;
+import org.opendaylight.controller.cluster.raft.persisted.Snapshot;
 import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload;
 
 /**
@@ -36,9 +38,9 @@ public interface RaftActorRecoveryCohort {
     /**
      * This method is called during recovery to reconstruct the state of the actor.
      *
-     * @param snapshotBytes A snapshot of the state of the actor
+     * @param snapshotState A snapshot of the state of the actor
      */
-    void applyRecoverySnapshot(byte[] snapshotBytes);
+    void applyRecoverySnapshot(Snapshot.State snapshotState);
 
     /**
      * This method is called during recovery at the end of a batch to apply the current batched
@@ -53,4 +55,14 @@ public interface RaftActorRecoveryCohort {
      */
     @Nullable
     byte[] getRestoreFromSnapshot();
+
+    /**
+     * This method is called during recovery to de-serialize a snapshot that was persisted in the pre-Carbon format.
+     *
+     * @param from the snaphot bytes
+     * @return a Snapshot.State instance
+     */
+    @Deprecated
+    @Nonnull
+    Snapshot.State deserializePreCarbonSnapshot(byte [] from);
 }
index 5e4e657..df20767 100644 (file)
@@ -18,8 +18,10 @@ import org.opendaylight.controller.cluster.PersistentDataProvider;
 import org.opendaylight.controller.cluster.raft.base.messages.ApplySnapshot;
 import org.opendaylight.controller.cluster.raft.persisted.ApplyJournalEntries;
 import org.opendaylight.controller.cluster.raft.persisted.DeleteEntries;
+import org.opendaylight.controller.cluster.raft.persisted.EmptyState;
 import org.opendaylight.controller.cluster.raft.persisted.MigratedSerializable;
 import org.opendaylight.controller.cluster.raft.persisted.ServerConfigurationPayload;
+import org.opendaylight.controller.cluster.raft.persisted.Snapshot;
 import org.opendaylight.controller.cluster.raft.persisted.UpdateElectionTerm;
 import org.opendaylight.controller.cluster.raft.protobuff.client.messages.PersistentPayload;
 import org.slf4j.Logger;
@@ -113,11 +115,23 @@ class RaftActorRecoverySupport {
     }
 
     private void onRecoveredSnapshot(SnapshotOffer offer) {
-        log.debug("{}: SnapshotOffer called..", context.getId());
+        log.debug("{}: SnapshotOffer called.", context.getId());
 
         initRecoveryTimer();
 
-        Snapshot snapshot = (Snapshot) offer.snapshot();
+        Object snapshotObj = offer.snapshot();
+        Snapshot snapshot;
+        if (snapshotObj instanceof org.opendaylight.controller.cluster.raft.Snapshot) {
+            org.opendaylight.controller.cluster.raft.Snapshot legacy =
+                    (org.opendaylight.controller.cluster.raft.Snapshot)snapshotObj;
+            snapshot = Snapshot.create(cohort.deserializePreCarbonSnapshot(legacy.getState()),
+                    legacy.getUnAppliedEntries(), legacy.getLastIndex(), legacy.getLastTerm(),
+                    legacy.getLastAppliedIndex(), legacy.getLastAppliedTerm(),
+                    legacy.getElectionTerm(), legacy.getElectionVotedFor(), legacy.getServerConfiguration());
+            hasMigratedDataRecovered = true;
+        } else {
+            snapshot = (Snapshot) offer.snapshot();
+        }
 
         for (ReplicatedLogEntry entry: snapshot.getUnAppliedEntries()) {
             if (isMigratedPayload(entry)) {
@@ -129,7 +143,8 @@ class RaftActorRecoverySupport {
             // We may have just transitioned to disabled and have a snapshot containing state data and/or log
             // entries - we don't want to preserve these, only the server config and election term info.
 
-            snapshot = Snapshot.create(new byte[0], Collections.emptyList(), -1, -1, -1, -1,
+            snapshot = Snapshot.create(
+                    EmptyState.INSTANCE, Collections.emptyList(), -1, -1, -1, -1,
                     snapshot.getElectionTerm(), snapshot.getElectionVotedFor(), snapshot.getServerConfiguration());
         }
 
@@ -145,7 +160,9 @@ class RaftActorRecoverySupport {
         Stopwatch timer = Stopwatch.createStarted();
 
         // Apply the snapshot to the actors state
-        cohort.applyRecoverySnapshot(snapshot.getState());
+        if (!(snapshot.getState() instanceof EmptyState)) {
+            cohort.applyRecoverySnapshot(snapshot.getState());
+        }
 
         if (snapshot.getServerConfiguration() != null) {
             context.updatePeerIds(snapshot.getServerConfiguration());
@@ -274,7 +291,8 @@ class RaftActorRecoverySupport {
             // messages. Either way, we persist a snapshot and delete all the messages from the akka journal
             // to clean out unwanted messages.
 
-            Snapshot snapshot = Snapshot.create(new byte[0], Collections.<ReplicatedLogEntry>emptyList(),
+            Snapshot snapshot = Snapshot.create(
+                    EmptyState.INSTANCE, Collections.<ReplicatedLogEntry>emptyList(),
                     -1, -1, -1, -1,
                     context.getTermInformation().getCurrentTerm(), context.getTermInformation().getVotedFor(),
                     context.getPeerServerInfo(true));
index ad68726..a02b295 100644 (file)
@@ -8,6 +8,12 @@
 package org.opendaylight.controller.cluster.raft;
 
 import akka.actor.ActorRef;
+import com.google.common.io.ByteSource;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.util.Optional;
+import javax.annotation.Nonnull;
+import org.opendaylight.controller.cluster.raft.persisted.Snapshot;
 
 /**
  * Interface for a class that participates in raft actor snapshotting.
@@ -21,13 +27,31 @@ public interface RaftActorSnapshotCohort {
      * created. The implementation should send a CaptureSnapshotReply to the given actor.
      *
      * @param actorRef the actor to which to respond
+     * @param installSnapshotStream Optional OutputStream that is present if the snapshot is to also be installed
+     *        on a follower. The implementation must serialize its state to the OutputStream and return the
+     *        installSnapshotStream instance in the CaptureSnapshotReply along with the snapshot State instance.
+     *        The snapshot State is serialized directly to the snapshot store while the OutputStream is used to send
+     *        the state data to follower(s) in chunks. The {@link #deserializeSnapshot} method is used to convert the
+     *        serialized data back to a State instance on the follower end. The serialization for snapshot install is
+     *        passed off so the cost of serialization is not charged to the raft actor's thread.
      */
-    void createSnapshot(ActorRef actorRef);
+    void createSnapshot(@Nonnull ActorRef actorRef, @Nonnull Optional<OutputStream> installSnapshotStream);
 
     /**
      * This method is called to apply a snapshot installed by the leader.
      *
-     * @param snapshotBytes a snapshot of the state of the actor
+     * @param snapshotState a snapshot of the state of the actor
      */
-    void applySnapshot(byte[] snapshotBytes);
+    void applySnapshot(@Nonnull Snapshot.State snapshotState);
+
+    /**
+     * This method is called to de-serialize snapshot data that was previously serialized via {@link #createSnapshot}
+     * to a State instance.
+     *
+     * @param snapshotBytes the ByteSource containing the serialized data
+     * @return the converted snapshot State
+     * @throws IOException if an error occurs accessing the ByteSource or de-serializing
+     */
+    @Nonnull
+    Snapshot.State deserializeSnapshot(@Nonnull ByteSource snapshotBytes) throws IOException;
 }
index 4a17bec..f119a1e 100644 (file)
@@ -12,6 +12,7 @@ import akka.persistence.SaveSnapshotFailure;
 import akka.persistence.SaveSnapshotSuccess;
 import com.google.common.annotations.VisibleForTesting;
 import java.util.Collections;
+import java.util.Optional;
 import java.util.concurrent.TimeUnit;
 import org.apache.commons.lang3.SerializationUtils;
 import org.opendaylight.controller.cluster.raft.base.messages.ApplySnapshot;
@@ -19,6 +20,8 @@ import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshot;
 import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshotReply;
 import org.opendaylight.controller.cluster.raft.client.messages.GetSnapshot;
 import org.opendaylight.controller.cluster.raft.client.messages.GetSnapshotReply;
+import org.opendaylight.controller.cluster.raft.persisted.EmptyState;
+import org.opendaylight.controller.cluster.raft.persisted.Snapshot;
 import org.slf4j.Logger;
 import scala.concurrent.duration.Duration;
 
@@ -46,8 +49,13 @@ class RaftActorSnapshotMessageSupport {
         this.cohort = cohort;
         this.log = context.getLogger();
 
-        context.getSnapshotManager().setCreateSnapshotRunnable(() -> cohort.createSnapshot(context.getActor()));
-        context.getSnapshotManager().setApplySnapshotConsumer(cohort::applySnapshot);
+        context.getSnapshotManager().setCreateSnapshotConsumer(
+            outputStream -> cohort.createSnapshot(context.getActor(), outputStream));
+        context.getSnapshotManager().setSnapshotCohort(cohort);
+    }
+
+    RaftActorSnapshotCohort getSnapshotCohort() {
+        return cohort;
     }
 
     boolean handleSnapshotMessage(Object message, ActorRef sender) {
@@ -58,7 +66,7 @@ class RaftActorSnapshotMessageSupport {
         } else if (message instanceof SaveSnapshotFailure) {
             onSaveSnapshotFailure((SaveSnapshotFailure) message);
         } else if (message instanceof CaptureSnapshotReply) {
-            onCaptureSnapshotReply(((CaptureSnapshotReply) message).getSnapshot());
+            onCaptureSnapshotReply((CaptureSnapshotReply) message);
         } else if (COMMIT_SNAPSHOT.equals(message)) {
             context.getSnapshotManager().commit(-1, -1);
         } else if (message instanceof GetSnapshot) {
@@ -70,11 +78,11 @@ class RaftActorSnapshotMessageSupport {
         return true;
     }
 
-    private void onCaptureSnapshotReply(byte[] snapshotBytes) {
-        log.debug("{}: CaptureSnapshotReply received by actor: snapshot size {}", context.getId(),
-                snapshotBytes.length);
+    private void onCaptureSnapshotReply(CaptureSnapshotReply reply) {
+        log.debug("{}: CaptureSnapshotReply received by actor", context.getId());
 
-        context.getSnapshotManager().persist(snapshotBytes, context.getTotalMemory());
+        context.getSnapshotManager().persist(reply.getSnapshotState(), reply.getInstallSnapshotStream(),
+                context.getTotalMemory());
     }
 
     private void onSaveSnapshotFailure(SaveSnapshotFailure saveSnapshotFailure) {
@@ -103,15 +111,16 @@ class RaftActorSnapshotMessageSupport {
 
         if (context.getPersistenceProvider().isRecoveryApplicable()) {
             CaptureSnapshot captureSnapshot = context.getSnapshotManager().newCaptureSnapshot(
-                    context.getReplicatedLog().last(), -1, false);
+                    context.getReplicatedLog().last(), -1);
 
             ActorRef snapshotReplyActor = context.actorOf(GetSnapshotReplyActor.props(captureSnapshot,
                     ImmutableElectionTerm.copyOf(context.getTermInformation()), sender,
                     snapshotReplyActorTimeout, context.getId(), context.getPeerServerInfo(true)));
 
-            cohort.createSnapshot(snapshotReplyActor);
+            cohort.createSnapshot(snapshotReplyActor, Optional.empty());
         } else {
-            Snapshot snapshot = Snapshot.create(new byte[0], Collections.<ReplicatedLogEntry>emptyList(),
+            Snapshot snapshot = Snapshot.create(
+                    EmptyState.INSTANCE, Collections.<ReplicatedLogEntry>emptyList(),
                     -1, -1, -1, -1,
                     context.getTermInformation().getCurrentTerm(), context.getTermInformation().getVotedFor(),
                     context.getPeerServerInfo(true));
index fe87334..7196fc5 100644 (file)
@@ -14,6 +14,7 @@ import java.util.List;
 import javax.annotation.Nonnull;
 import javax.annotation.Nullable;
 import org.opendaylight.controller.cluster.raft.persisted.DeleteEntries;
+import org.opendaylight.controller.cluster.raft.persisted.Snapshot;
 
 /**
  * Implementation of ReplicatedLog used by the RaftActor.
index 2677baf..93226cc 100644 (file)
@@ -17,7 +17,10 @@ import org.opendaylight.controller.cluster.raft.persisted.ServerConfigurationPay
  *
  * @author Moiz Raja
  * @author Thomas Pantelis
+ *
+ * @deprecated Use {@link org.opendaylight.controller.cluster.raft.persisted.Snapshot} instead.
  */
+@Deprecated
 public class Snapshot implements Serializable {
     private static final long serialVersionUID = -8298574936724056236L;
 
index 79f2ce9..12508ae 100644 (file)
@@ -10,13 +10,21 @@ package org.opendaylight.controller.cluster.raft;
 
 import akka.persistence.SnapshotSelectionCriteria;
 import com.google.common.annotations.VisibleForTesting;
+import com.google.common.io.ByteSource;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.OutputStream;
 import java.util.List;
+import java.util.Optional;
 import java.util.function.Consumer;
+import javax.annotation.Nonnull;
 import org.opendaylight.controller.cluster.raft.base.messages.ApplySnapshot;
 import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshot;
 import org.opendaylight.controller.cluster.raft.base.messages.SendInstallSnapshot;
 import org.opendaylight.controller.cluster.raft.base.messages.SnapshotComplete;
 import org.opendaylight.controller.cluster.raft.behaviors.RaftActorBehavior;
+import org.opendaylight.controller.cluster.raft.persisted.EmptyState;
+import org.opendaylight.controller.cluster.raft.persisted.Snapshot;
 import org.slf4j.Logger;
 
 /**
@@ -48,10 +56,10 @@ public class SnapshotManager implements SnapshotState {
     private CaptureSnapshot captureSnapshot;
     private long lastSequenceNumber = -1;
 
-    private Runnable createSnapshotProcedure;
+    private Consumer<Optional<OutputStream>> createSnapshotProcedure;
 
     private ApplySnapshot applySnapshot;
-    private Consumer<byte[]> applySnapshotProcedure;
+    private RaftActorSnapshotCohort snapshotCohort = NoopRaftActorSnapshotCohort.INSTANCE;
 
     /**
      * Constructs an instance.
@@ -89,8 +97,9 @@ public class SnapshotManager implements SnapshotState {
     }
 
     @Override
-    public void persist(final byte[] snapshotBytes, final long totalMemory) {
-        currentState.persist(snapshotBytes, totalMemory);
+    public void persist(final Snapshot.State state, final Optional<OutputStream> installSnapshotStream,
+            final long totalMemory) {
+        currentState.persist(state, installSnapshotStream, totalMemory);
     }
 
     @Override
@@ -108,12 +117,17 @@ public class SnapshotManager implements SnapshotState {
         return currentState.trimLog(desiredTrimIndex);
     }
 
-    public void setCreateSnapshotRunnable(Runnable createSnapshotProcedure) {
+    void setCreateSnapshotConsumer(Consumer<Optional<OutputStream>> createSnapshotProcedure) {
         this.createSnapshotProcedure = createSnapshotProcedure;
     }
 
-    public void setApplySnapshotConsumer(Consumer<byte[]> applySnapshotProcedure) {
-        this.applySnapshotProcedure = applySnapshotProcedure;
+    void setSnapshotCohort(final RaftActorSnapshotCohort snapshotCohort) {
+        this.snapshotCohort = snapshotCohort;
+    }
+
+    @Nonnull
+    public Snapshot.State convertSnapshot(ByteSource snapshotBytes) throws IOException {
+        return snapshotCohort.deserializeSnapshot(snapshotBytes);
     }
 
     public long getLastSequenceNumber() {
@@ -138,11 +152,9 @@ public class SnapshotManager implements SnapshotState {
      *
      * @param lastLogEntry the last log entry for the snapshot.
      * @param replicatedToAllIndex the index of the last entry replicated to all followers.
-     * @param installSnapshotInitiated true if snapshot is initiated to install on a follower.
      * @return a new CaptureSnapshot instance.
      */
-    public CaptureSnapshot newCaptureSnapshot(ReplicatedLogEntry lastLogEntry, long replicatedToAllIndex,
-            boolean installSnapshotInitiated) {
+    public CaptureSnapshot newCaptureSnapshot(ReplicatedLogEntry lastLogEntry, long replicatedToAllIndex) {
         TermInformationReader lastAppliedTermInfoReader =
                 lastAppliedTermInformationReader.init(context.getReplicatedLog(), context.getLastApplied(),
                         lastLogEntry, hasFollowers());
@@ -169,7 +181,7 @@ public class SnapshotManager implements SnapshotState {
         }
 
         return new CaptureSnapshot(lastLogEntryIndex, lastLogEntryTerm, lastAppliedIndex, lastAppliedTerm,
-                newReplicatedToAllIndex, newReplicatedToAllTerm, unAppliedEntries, installSnapshotInitiated);
+                newReplicatedToAllIndex, newReplicatedToAllTerm, unAppliedEntries);
     }
 
     private class AbstractSnapshotState implements SnapshotState {
@@ -198,7 +210,8 @@ public class SnapshotManager implements SnapshotState {
         }
 
         @Override
-        public void persist(final byte[] snapshotBytes, final long totalMemory) {
+        public void persist(final Snapshot.State state, final Optional<OutputStream> installSnapshotStream,
+                final long totalMemory) {
             log.debug("persist should not be called in state {}", this);
         }
 
@@ -261,9 +274,11 @@ public class SnapshotManager implements SnapshotState {
 
         @SuppressWarnings("checkstyle:IllegalCatch")
         private boolean capture(ReplicatedLogEntry lastLogEntry, long replicatedToAllIndex, String targetFollower) {
-            captureSnapshot = newCaptureSnapshot(lastLogEntry, replicatedToAllIndex, targetFollower != null);
+            captureSnapshot = newCaptureSnapshot(lastLogEntry, replicatedToAllIndex);
 
-            if (captureSnapshot.isInstallSnapshotInitiated()) {
+            OutputStream installSnapshotStream = null;
+            if (targetFollower != null) {
+                installSnapshotStream = new ByteArrayOutputStream();
                 log.info("{}: Initiating snapshot capture {} to install on {}",
                         persistenceId(), captureSnapshot, targetFollower);
             } else {
@@ -277,7 +292,7 @@ public class SnapshotManager implements SnapshotState {
             SnapshotManager.this.currentState = CREATING;
 
             try {
-                createSnapshotProcedure.run();
+                createSnapshotProcedure.accept(Optional.ofNullable(installSnapshotStream));
             } catch (Exception e) {
                 SnapshotManager.this.currentState = IDLE;
                 log.error("Error creating snapshot", e);
@@ -325,11 +340,12 @@ public class SnapshotManager implements SnapshotState {
     private class Creating extends AbstractSnapshotState {
 
         @Override
-        public void persist(final byte[] snapshotBytes, final long totalMemory) {
+        public void persist(final Snapshot.State snapshotState, final Optional<OutputStream> installSnapshotStream,
+                final long totalMemory) {
             // create a snapshot object from the state provided and save it
             // when snapshot is saved async, SaveSnapshotSuccess is raised.
 
-            Snapshot snapshot = Snapshot.create(snapshotBytes,
+            Snapshot snapshot = Snapshot.create(snapshotState,
                     captureSnapshot.getUnAppliedEntries(),
                     captureSnapshot.getLastIndex(), captureSnapshot.getLastTerm(),
                     captureSnapshot.getLastAppliedIndex(), captureSnapshot.getLastAppliedTerm(),
@@ -393,10 +409,20 @@ public class SnapshotManager implements SnapshotState {
                     context.getId(), context.getReplicatedLog().getSnapshotIndex(),
                     context.getReplicatedLog().getSnapshotTerm());
 
-            if (context.getId().equals(currentBehavior.getLeaderId())
-                    && captureSnapshot.isInstallSnapshotInitiated()) {
-                // this would be call straight to the leader and won't initiate in serialization
-                currentBehavior.handleMessage(context.getActor(), new SendInstallSnapshot(snapshot));
+            if (installSnapshotStream.isPresent()) {
+                try {
+                    installSnapshotStream.get().close();
+                } catch (IOException e) {
+                    log.warn("Error closing install snapshot OutputStream", e);
+                }
+
+                if (context.getId().equals(currentBehavior.getLeaderId())) {
+                    ByteSource snapshotBytes = ByteSource.wrap(((ByteArrayOutputStream)installSnapshotStream.get())
+                            .toByteArray());
+
+                    // this would be call straight to the leader and won't initiate in serialization
+                    currentBehavior.handleMessage(context.getActor(), new SendInstallSnapshot(snapshot, snapshotBytes));
+                }
             }
 
             captureSnapshot = null;
@@ -431,8 +457,8 @@ public class SnapshotManager implements SnapshotState {
                         context.updatePeerIds(snapshot.getServerConfiguration());
                     }
 
-                    if (snapshot.getState().length > 0 ) {
-                        applySnapshotProcedure.accept(snapshot.getState());
+                    if (!(snapshot.getState() instanceof EmptyState)) {
+                        snapshotCohort.applySnapshot(snapshot.getState());
                     }
 
                     applySnapshot.getCallback().onSuccess();
index e423737..0a70274 100644 (file)
@@ -8,7 +8,10 @@
 
 package org.opendaylight.controller.cluster.raft;
 
+import java.io.OutputStream;
+import java.util.Optional;
 import org.opendaylight.controller.cluster.raft.base.messages.ApplySnapshot;
+import org.opendaylight.controller.cluster.raft.persisted.Snapshot;
 
 /**
  * Interface for a snapshot phase state.
@@ -53,10 +56,12 @@ public interface SnapshotState {
     /**
      * Persists a snapshot.
      *
-     * @param snapshotBytes the snapshot bytes
+     * @param snapshotState the snapshot State
+     * @param installSnapshotStream Optional OutputStream that is present if the snapshot is to also be installed
+     *        on a follower.
      * @param totalMemory the total memory threshold
      */
-    void persist(byte[] snapshotBytes, long totalMemory);
+    void persist(Snapshot.State snapshotState, Optional<OutputStream> installSnapshotStream, long totalMemory);
 
     /**
      * Commit the snapshot by trimming the log.
index 9fb5554..9cf2a3f 100644 (file)
@@ -10,7 +10,7 @@ package org.opendaylight.controller.cluster.raft.base.messages;
 
 import com.google.common.base.Preconditions;
 import javax.annotation.Nonnull;
-import org.opendaylight.controller.cluster.raft.Snapshot;
+import org.opendaylight.controller.cluster.raft.persisted.Snapshot;
 
 /**
  * Internal message, issued by follower to its actor.
index 14bd3a0..2173534 100644 (file)
@@ -17,25 +17,17 @@ public class CaptureSnapshot {
     private final long lastAppliedTerm;
     private final long lastIndex;
     private final long lastTerm;
-    private final boolean installSnapshotInitiated;
     private final long replicatedToAllIndex;
     private final long replicatedToAllTerm;
     private final List<ReplicatedLogEntry> unAppliedEntries;
 
-    public CaptureSnapshot(long lastIndex, long lastTerm, long lastAppliedIndex, long lastAppliedTerm,
-            long replicatedToAllIndex, long replicatedToAllTerm, List<ReplicatedLogEntry> unAppliedEntries) {
-        this(lastIndex, lastTerm, lastAppliedIndex, lastAppliedTerm, replicatedToAllIndex, replicatedToAllTerm,
-                unAppliedEntries, false);
-    }
-
     public CaptureSnapshot(long lastIndex, long lastTerm, long lastAppliedIndex,
             long lastAppliedTerm, long replicatedToAllIndex, long replicatedToAllTerm,
-            List<ReplicatedLogEntry> unAppliedEntries, boolean installSnapshotInitiated) {
+            List<ReplicatedLogEntry> unAppliedEntries) {
         this.lastIndex = lastIndex;
         this.lastTerm = lastTerm;
         this.lastAppliedIndex = lastAppliedIndex;
         this.lastAppliedTerm = lastAppliedTerm;
-        this.installSnapshotInitiated = installSnapshotInitiated;
         this.replicatedToAllIndex = replicatedToAllIndex;
         this.replicatedToAllTerm = replicatedToAllTerm;
         this.unAppliedEntries = unAppliedEntries != null ? unAppliedEntries :
@@ -58,10 +50,6 @@ public class CaptureSnapshot {
         return lastTerm;
     }
 
-    public boolean isInstallSnapshotInitiated() {
-        return installSnapshotInitiated;
-    }
-
     public long getReplicatedToAllIndex() {
         return replicatedToAllIndex;
     }
@@ -79,7 +67,7 @@ public class CaptureSnapshot {
         StringBuilder builder = new StringBuilder();
         builder.append("CaptureSnapshot [lastAppliedIndex=").append(lastAppliedIndex).append(", lastAppliedTerm=")
                 .append(lastAppliedTerm).append(", lastIndex=").append(lastIndex).append(", lastTerm=")
-                .append(lastTerm).append(", installSnapshotInitiated=").append(installSnapshotInitiated)
+                .append(lastTerm).append(", installSnapshotInitiated=")
                 .append(", replicatedToAllIndex=").append(replicatedToAllIndex).append(", replicatedToAllTerm=")
                 .append(replicatedToAllTerm).append(", unAppliedEntries size=")
                 .append(unAppliedEntries.size()).append("]");
index 1e57301..cc981e5 100644 (file)
@@ -7,22 +7,29 @@
  */
 package org.opendaylight.controller.cluster.raft.base.messages;
 
-import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
+import com.google.common.base.Preconditions;
+import java.io.OutputStream;
+import java.util.Optional;
+import javax.annotation.Nonnull;
+import org.opendaylight.controller.cluster.raft.persisted.Snapshot;
 
 public class CaptureSnapshotReply {
-    private final byte [] snapshot;
+    private final Snapshot.State snapshotState;
+    private final Optional<OutputStream> installSnapshotStream;
 
-    @SuppressFBWarnings(value = "EI_EXPOSE_REP2", justification = "Stores a reference to an externally mutable byte[] "
-            + "object but this is OK since this class is merely a DTO and does not process byte[] internally. "
-            + "Also it would be inefficient to create a copy as the byte[] could be large.")
-    public CaptureSnapshotReply(byte [] snapshot) {
-        this.snapshot = snapshot;
+    public CaptureSnapshotReply(@Nonnull final Snapshot.State snapshotState,
+            @Nonnull final Optional<OutputStream> installSnapshotStream) {
+        this.snapshotState = Preconditions.checkNotNull(snapshotState);
+        this.installSnapshotStream = Preconditions.checkNotNull(installSnapshotStream);
     }
 
-    @SuppressFBWarnings(value = "EI_EXPOSE_REP", justification = "Exposes a mutable object stored in a field but "
-            + "this is OK since this class is merely a DTO and does not process the byte[] internally. "
-            + "Also it would be inefficient to create a return copy as the byte[] could be large.")
-    public byte [] getSnapshot() {
-        return snapshot;
+    @Nonnull
+    public Snapshot.State getSnapshotState() {
+        return snapshotState;
+    }
+
+    @Nonnull
+    public Optional<OutputStream> getInstallSnapshotStream() {
+        return installSnapshotStream;
     }
 }
index de33b8c..e8d58f2 100644 (file)
@@ -9,8 +9,9 @@
 package org.opendaylight.controller.cluster.raft.base.messages;
 
 import com.google.common.base.Preconditions;
+import com.google.common.io.ByteSource;
 import javax.annotation.Nonnull;
-import org.opendaylight.controller.cluster.raft.Snapshot;
+import org.opendaylight.controller.cluster.raft.persisted.Snapshot;
 
 /**
  * Internal message sent from the SnapshotManager to its associated leader when a snapshot capture is complete to
@@ -18,13 +19,19 @@ import org.opendaylight.controller.cluster.raft.Snapshot;
  */
 public final class SendInstallSnapshot {
     private final Snapshot snapshot;
+    private final ByteSource snapshotBytes;
 
-    public SendInstallSnapshot(@Nonnull Snapshot snapshot) {
+    public SendInstallSnapshot(@Nonnull Snapshot snapshot, @Nonnull ByteSource snapshotBytes) {
         this.snapshot = Preconditions.checkNotNull(snapshot);
+        this.snapshotBytes = Preconditions.checkNotNull(snapshotBytes);
     }
 
     @Nonnull
     public Snapshot getSnapshot() {
         return snapshot;
     }
+
+    public ByteSource getSnapshotBytes() {
+        return snapshotBytes;
+    }
 }
index 10c1a15..548b920 100644 (file)
@@ -14,7 +14,9 @@ import akka.actor.Cancellable;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Optional;
 import com.google.common.base.Preconditions;
+import com.google.common.io.ByteSource;
 import com.google.protobuf.ByteString;
+import java.io.IOException;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
@@ -33,7 +35,6 @@ import org.opendaylight.controller.cluster.raft.PeerInfo;
 import org.opendaylight.controller.cluster.raft.RaftActorContext;
 import org.opendaylight.controller.cluster.raft.RaftState;
 import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
-import org.opendaylight.controller.cluster.raft.Snapshot;
 import org.opendaylight.controller.cluster.raft.VotingState;
 import org.opendaylight.controller.cluster.raft.base.messages.CheckConsensusReached;
 import org.opendaylight.controller.cluster.raft.base.messages.Replicate;
@@ -47,6 +48,7 @@ import org.opendaylight.controller.cluster.raft.messages.RaftRPC;
 import org.opendaylight.controller.cluster.raft.messages.RequestVoteReply;
 import org.opendaylight.controller.cluster.raft.messages.UnInitializedFollowerSnapshotReply;
 import org.opendaylight.controller.cluster.raft.persisted.ServerConfigurationPayload;
+import org.opendaylight.controller.cluster.raft.persisted.Snapshot;
 import scala.concurrent.duration.FiniteDuration;
 
 /**
@@ -83,7 +85,7 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior {
     private final Queue<ClientRequestTracker> trackers = new LinkedList<>();
 
     private Cancellable heartbeatSchedule = null;
-    private Optional<SnapshotHolder> snapshot = Optional.absent();
+    private Optional<SnapshotHolder> snapshotHolder = Optional.absent();
     private int minReplicationCount;
 
     protected AbstractLeader(RaftActorContext context, RaftState state,
@@ -92,7 +94,7 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior {
 
         if (initializeFromLeader != null) {
             followerToLog.putAll(initializeFromLeader.followerToLog);
-            snapshot = initializeFromLeader.snapshot;
+            snapshotHolder = initializeFromLeader.snapshotHolder;
             trackers.addAll(initializeFromLeader.trackers);
         } else {
             for (PeerInfo peerInfo: context.getPeers()) {
@@ -165,17 +167,13 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior {
     }
 
     @VisibleForTesting
-    void setSnapshot(@Nullable Snapshot snapshot) {
-        if (snapshot != null) {
-            this.snapshot = Optional.of(new SnapshotHolder(snapshot));
-        } else {
-            this.snapshot = Optional.absent();
-        }
+    void setSnapshot(@Nullable SnapshotHolder snapshotHolder) {
+        this.snapshotHolder = Optional.fromNullable(snapshotHolder);
     }
 
     @VisibleForTesting
     boolean hasSnapshot() {
-        return snapshot.isPresent();
+        return snapshotHolder.isPresent();
     }
 
     @Override
@@ -451,8 +449,8 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior {
             sendHeartBeat();
             scheduleHeartBeat(context.getConfigParams().getHeartBeatInterval());
         } else if (message instanceof SendInstallSnapshot) {
-            // received from RaftActor
-            setSnapshot(((SendInstallSnapshot) message).getSnapshot());
+            SendInstallSnapshot sendInstallSnapshot = (SendInstallSnapshot) message;
+            setSnapshot(new SnapshotHolder(sendInstallSnapshot.getSnapshot(), sendInstallSnapshot.getSnapshotBytes()));
             sendInstallSnapshot();
         } else if (message instanceof Replicate) {
             replicate((Replicate) message);
@@ -497,7 +495,7 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior {
                             + " Setting nextIndex: {}", logName(), reply.getChunkIndex(), followerId,
                             context.getReplicatedLog().getSnapshotIndex() + 1);
 
-                    long followerMatchIndex = snapshot.get().getLastIncludedIndex();
+                    long followerMatchIndex = snapshotHolder.get().getLastIncludedIndex();
                     followerLogInformation.setMatchIndex(followerMatchIndex);
                     followerLogInformation.setNextIndex(followerMatchIndex + 1);
                     followerLogInformation.clearLeaderInstallSnapshotState();
@@ -730,7 +728,7 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior {
      */
     public boolean initiateCaptureSnapshot(String followerId) {
         FollowerLogInformation followerLogInfo = followerToLog.get(followerId);
-        if (snapshot.isPresent()) {
+        if (snapshotHolder.isPresent()) {
             // If a snapshot is present in the memory, most likely another install is in progress no need to capture
             // snapshot. This could happen if another follower needs an install when one is going on.
             final ActorSelection followerActor = context.getPeerActorSelection(followerId);
@@ -783,7 +781,7 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior {
      *  InstallSnapshot should qualify as a heartbeat too.
      */
     private void sendSnapshotChunk(ActorSelection followerActor, FollowerLogInformation followerLogInfo) {
-        if (snapshot.isPresent()) {
+        if (snapshotHolder.isPresent()) {
             LeaderInstallSnapshotState installSnapshotState = followerLogInfo.getInstallSnapshotState();
             if (installSnapshotState == null) {
                 installSnapshotState = new LeaderInstallSnapshotState(context.getConfigParams().getSnapshotChunkSize(),
@@ -792,7 +790,7 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior {
             }
 
             // Ensure the snapshot bytes are set - this is a no-op.
-            installSnapshotState.setSnapshotBytes(snapshot.get().getSnapshotBytes());
+            installSnapshotState.setSnapshotBytes(snapshotHolder.get().getSnapshotBytes());
 
             byte[] nextSnapshotChunk = installSnapshotState.getNextChunk();
 
@@ -807,8 +805,8 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior {
 
             followerActor.tell(
                 new InstallSnapshot(currentTerm(), context.getId(),
-                    snapshot.get().getLastIncludedIndex(),
-                    snapshot.get().getLastIncludedTerm(),
+                    snapshotHolder.get().getLastIncludedIndex(),
+                    snapshotHolder.get().getLastIncludedTerm(),
                     nextSnapshotChunk,
                     nextChunkIndex,
                     installSnapshotState.getTotalChunks(),
@@ -911,15 +909,19 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior {
         return followerToLog.size();
     }
 
-    private static class SnapshotHolder {
+    static class SnapshotHolder {
         private final long lastIncludedTerm;
         private final long lastIncludedIndex;
         private final ByteString snapshotBytes;
 
-        SnapshotHolder(Snapshot snapshot) {
+        SnapshotHolder(Snapshot snapshot, ByteSource snapshotBytes) {
             this.lastIncludedTerm = snapshot.getLastAppliedTerm();
             this.lastIncludedIndex = snapshot.getLastAppliedIndex();
-            this.snapshotBytes = ByteString.copyFrom(snapshot.getState());
+            try {
+                this.snapshotBytes = ByteString.copyFrom(snapshotBytes.read());
+            } catch (IOException e) {
+                throw new RuntimeException("Error reading state", e);
+            }
         }
 
         long getLastIncludedTerm() {
index 6f107e9..727d6a3 100644 (file)
@@ -18,6 +18,8 @@ import akka.cluster.MemberStatus;
 import akka.japi.Procedure;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Stopwatch;
+import com.google.common.io.ByteSource;
+import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Optional;
 import java.util.Set;
@@ -27,7 +29,6 @@ import javax.annotation.Nullable;
 import org.opendaylight.controller.cluster.raft.RaftActorContext;
 import org.opendaylight.controller.cluster.raft.RaftState;
 import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
-import org.opendaylight.controller.cluster.raft.Snapshot;
 import org.opendaylight.controller.cluster.raft.base.messages.ApplySnapshot;
 import org.opendaylight.controller.cluster.raft.base.messages.ElectionTimeout;
 import org.opendaylight.controller.cluster.raft.base.messages.TimeoutNow;
@@ -39,6 +40,7 @@ import org.opendaylight.controller.cluster.raft.messages.RaftRPC;
 import org.opendaylight.controller.cluster.raft.messages.RequestVote;
 import org.opendaylight.controller.cluster.raft.messages.RequestVoteReply;
 import org.opendaylight.controller.cluster.raft.persisted.ServerConfigurationPayload;
+import org.opendaylight.controller.cluster.raft.persisted.Snapshot;
 
 /**
  * The behavior of a RaftActor in the Follower raft state.
@@ -528,7 +530,9 @@ public class Follower extends AbstractRaftActorBehavior {
 
             if (snapshotTracker.addChunk(installSnapshot.getChunkIndex(), installSnapshot.getData(),
                     installSnapshot.getLastChunkHashCode())) {
-                Snapshot snapshot = Snapshot.create(snapshotTracker.getSnapshot(),
+                ByteSource snapshotBytes = ByteSource.wrap(snapshotTracker.getSnapshot());
+                Snapshot snapshot = Snapshot.create(
+                        context.getSnapshotManager().convertSnapshot(snapshotBytes),
                         new ArrayList<>(),
                         installSnapshot.getLastIncludedIndex(),
                         installSnapshot.getLastIncludedTerm(),
@@ -560,7 +564,7 @@ public class Follower extends AbstractRaftActorBehavior {
 
                 sender.tell(reply, actor());
             }
-        } catch (SnapshotTracker.InvalidChunkException e) {
+        } catch (SnapshotTracker.InvalidChunkException | IOException e) {
             log.debug("{}: Exception in InstallSnapshot of follower", logName(), e);
 
             sender.tell(new InstallSnapshotReply(currentTerm(), context.getId(),
diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/persisted/EmptyState.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/persisted/EmptyState.java
new file mode 100644 (file)
index 0000000..40c90fb
--- /dev/null
@@ -0,0 +1,27 @@
+/*
+ * Copyright (c) 2017 Brocade Communications Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.raft.persisted;
+
+/**
+ * Empty Snapshot State implementation.
+ *
+ * @author Thomas Pantelis
+ */
+public class EmptyState implements Snapshot.State {
+    private static final long serialVersionUID = 1L;
+
+    public static final EmptyState INSTANCE = new EmptyState();
+
+    private EmptyState() {
+    }
+
+    @SuppressWarnings("static-method")
+    private Object readResolve() {
+        return INSTANCE;
+    }
+}
diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/persisted/Snapshot.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/persisted/Snapshot.java
new file mode 100644 (file)
index 0000000..1763ad3
--- /dev/null
@@ -0,0 +1,180 @@
+/*
+ * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.raft.persisted;
+
+import java.io.Externalizable;
+import java.io.IOException;
+import java.io.ObjectInput;
+import java.io.ObjectOutput;
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.List;
+import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
+import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload;
+
+/**
+ * Represents a snapshot of the raft data.
+ *
+ * @author Thomas Pantelis
+ */
+public class Snapshot implements Serializable {
+
+    /**
+     * Implementations of this interface are used as the state payload for a snapshot.
+     *
+     * @author Thomas Pantelis
+     */
+    public interface State extends Serializable {
+    }
+
+    private static final class Proxy implements Externalizable {
+        private static final long serialVersionUID = 1L;
+
+        private Snapshot snapshot;
+
+        // checkstyle flags the public modifier as redundant which really doesn't make sense since it clearly isn't
+        // redundant. It is explicitly needed for Java serialization to be able to create instances via reflection.
+        @SuppressWarnings("checkstyle:RedundantModifier")
+        public Proxy() {
+            // For Externalizable
+        }
+
+        Proxy(final Snapshot snapshot) {
+            this.snapshot = snapshot;
+        }
+
+        @Override
+        public void writeExternal(final ObjectOutput out) throws IOException {
+            out.writeLong(snapshot.lastIndex);
+            out.writeLong(snapshot.lastTerm);
+            out.writeLong(snapshot.lastAppliedIndex);
+            out.writeLong(snapshot.lastAppliedTerm);
+            out.writeLong(snapshot.electionTerm);
+            out.writeObject(snapshot.electionVotedFor);
+            out.writeObject(snapshot.serverConfig);
+
+            out.writeInt(snapshot.unAppliedEntries.size());
+            for (ReplicatedLogEntry e: snapshot.unAppliedEntries) {
+                out.writeLong(e.getIndex());
+                out.writeLong(e.getTerm());
+                out.writeObject(e.getData());
+            }
+
+            out.writeObject(snapshot.state);
+        }
+
+        @Override
+        public void readExternal(final ObjectInput in) throws IOException, ClassNotFoundException {
+            long lastIndex = in.readLong();
+            long lastTerm = in.readLong();
+            long lastAppliedIndex = in.readLong();
+            long lastAppliedTerm = in.readLong();
+            long electionTerm = in.readLong();
+            String electionVotedFor = (String) in.readObject();
+            ServerConfigurationPayload serverConfig = (ServerConfigurationPayload) in.readObject();
+
+            int size = in.readInt();
+            List<ReplicatedLogEntry> unAppliedEntries = new ArrayList<>(size);
+            for (int i = 0; i < size; i++) {
+                unAppliedEntries.add(new SimpleReplicatedLogEntry(in.readLong(), in.readLong(),
+                        (Payload) in.readObject()));
+            }
+
+            State state = (State) in.readObject();
+
+            snapshot = Snapshot.create(state, unAppliedEntries, lastIndex, lastTerm, lastAppliedIndex, lastAppliedTerm,
+                    electionTerm, electionVotedFor, serverConfig);
+        }
+
+        private Object readResolve() {
+            return snapshot;
+        }
+    }
+
+    private static final long serialVersionUID = 1L;
+
+    private final State state;
+    private final List<ReplicatedLogEntry> unAppliedEntries;
+    private final long lastIndex;
+    private final long lastTerm;
+    private final long lastAppliedIndex;
+    private final long lastAppliedTerm;
+    private final long electionTerm;
+    private final String electionVotedFor;
+    private final ServerConfigurationPayload serverConfig;
+
+    private Snapshot(State state, List<ReplicatedLogEntry> unAppliedEntries, long lastIndex, long lastTerm,
+            long lastAppliedIndex, long lastAppliedTerm, long electionTerm, String electionVotedFor,
+            ServerConfigurationPayload serverConfig) {
+        this.state = state;
+        this.unAppliedEntries = unAppliedEntries;
+        this.lastIndex = lastIndex;
+        this.lastTerm = lastTerm;
+        this.lastAppliedIndex = lastAppliedIndex;
+        this.lastAppliedTerm = lastAppliedTerm;
+        this.electionTerm = electionTerm;
+        this.electionVotedFor = electionVotedFor;
+        this.serverConfig = serverConfig;
+    }
+
+    public static Snapshot create(State state, List<ReplicatedLogEntry> entries, long lastIndex, long lastTerm,
+            long lastAppliedIndex, long lastAppliedTerm, long electionTerm, String electionVotedFor,
+            ServerConfigurationPayload serverConfig) {
+        return new Snapshot(state, entries, lastIndex, lastTerm, lastAppliedIndex, lastAppliedTerm,
+                electionTerm, electionVotedFor, serverConfig);
+    }
+
+    public State getState() {
+        return state;
+    }
+
+    public List<ReplicatedLogEntry> getUnAppliedEntries() {
+        return unAppliedEntries;
+    }
+
+    public long getLastTerm() {
+        return lastTerm;
+    }
+
+    public long getLastAppliedIndex() {
+        return lastAppliedIndex;
+    }
+
+    public long getLastAppliedTerm() {
+        return lastAppliedTerm;
+    }
+
+    public long getLastIndex() {
+        return this.lastIndex;
+    }
+
+    public long getElectionTerm() {
+        return electionTerm;
+    }
+
+    public String getElectionVotedFor() {
+        return electionVotedFor;
+    }
+
+    public ServerConfigurationPayload getServerConfiguration() {
+        return serverConfig;
+    }
+
+    @SuppressWarnings("static-method")
+    private Object writeReplace() {
+        return new Proxy(this);
+    }
+
+    @Override
+    public String toString() {
+        return "Snapshot [lastIndex=" + lastIndex + ", lastTerm=" + lastTerm + ", lastAppliedIndex=" + lastAppliedIndex
+                + ", lastAppliedTerm=" + lastAppliedTerm + ", unAppliedEntries size=" + unAppliedEntries.size()
+                + ", state=" + state + ", electionTerm=" + electionTerm + ", electionVotedFor="
+                + electionVotedFor + ", ServerConfigPayload="  + serverConfig + "]";
+    }
+}
index 4a05313..eb5c9e5 100644 (file)
@@ -20,18 +20,21 @@ import akka.testkit.JavaTestKit;
 import akka.testkit.TestActorRef;
 import akka.util.Timeout;
 import com.google.common.base.Stopwatch;
-import com.google.common.base.Throwables;
 import com.google.common.collect.ImmutableMap;
 import com.google.common.util.concurrent.Uninterruptibles;
+import java.io.OutputStream;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
 import java.util.Map;
+import java.util.Optional;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.TimeUnit;
 import java.util.function.Consumer;
 import java.util.function.Predicate;
+import org.apache.commons.lang3.SerializationUtils;
 import org.junit.After;
+import org.opendaylight.controller.cluster.raft.MockRaftActor.MockSnapshotState;
 import org.opendaylight.controller.cluster.raft.MockRaftActorContext.MockPayload;
 import org.opendaylight.controller.cluster.raft.base.messages.ApplyState;
 import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshotReply;
@@ -41,6 +44,7 @@ import org.opendaylight.controller.cluster.raft.client.messages.GetOnDemandRaftS
 import org.opendaylight.controller.cluster.raft.client.messages.OnDemandRaftState;
 import org.opendaylight.controller.cluster.raft.persisted.ApplyJournalEntries;
 import org.opendaylight.controller.cluster.raft.persisted.ServerConfigurationPayload;
+import org.opendaylight.controller.cluster.raft.persisted.Snapshot;
 import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload;
 import org.opendaylight.controller.cluster.raft.utils.InMemoryJournal;
 import org.opendaylight.controller.cluster.raft.utils.InMemorySnapshotStore;
@@ -149,12 +153,13 @@ public abstract class AbstractRaftActorIntegrationTest extends AbstractActorTest
 
         @Override
         @SuppressWarnings("checkstyle:IllegalCatch")
-        public void createSnapshot(ActorRef actorRef) {
-            try {
-                actorRef.tell(new CaptureSnapshotReply(RaftActorTest.fromObject(getState()).toByteArray()), actorRef);
-            } catch (Exception e) {
-                Throwables.propagate(e);
+        public void createSnapshot(ActorRef actorRef, Optional<OutputStream> installSnapshotStream) {
+            MockSnapshotState snapshotState = new MockSnapshotState(new ArrayList<>(getState()));
+            if (installSnapshotStream.isPresent()) {
+                SerializationUtils.serialize(snapshotState, installSnapshotStream.get());
             }
+
+            actorRef.tell(new CaptureSnapshotReply(snapshotState, installSnapshotStream), actorRef);
         }
 
         public ActorRef collectorActor() {
@@ -291,7 +296,7 @@ public abstract class AbstractRaftActorIntegrationTest extends AbstractActorTest
         assertEquals(prefix + " Snapshot getLastTerm", lastTerm, snapshot.getLastTerm());
         assertEquals(prefix + " Snapshot getLastIndex", lastIndex, snapshot.getLastIndex());
 
-        List<Object> actualState = (List<Object>)MockRaftActor.toObject(snapshot.getState());
+        List<Object> actualState = ((MockSnapshotState)snapshot.getState()).getState();
         assertEquals(String.format("%s Snapshot getState size. Expected %s: . Actual: %s", prefix, expSnapshotState,
                 actualState), expSnapshotState.size(), actualState.size());
         for (int i = 0; i < expSnapshotState.size(); i++) {
index 9ce8909..460dd4a 100644 (file)
@@ -15,20 +15,30 @@ import akka.testkit.JavaTestKit;
 import akka.testkit.TestActorRef;
 import com.google.common.base.Optional;
 import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Sets;
+import com.google.common.io.ByteSource;
 import com.google.common.util.concurrent.Uninterruptibles;
+import java.io.OutputStream;
+import java.io.Serializable;
 import java.util.Arrays;
 import java.util.HashSet;
 import java.util.List;
 import java.util.concurrent.TimeUnit;
 import java.util.function.Consumer;
+import org.apache.commons.lang3.SerializationUtils;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
+import org.opendaylight.controller.cluster.raft.MockRaftActor.MockSnapshotState;
+import org.opendaylight.controller.cluster.raft.MockRaftActorContext.MockPayload;
 import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshotReply;
 import org.opendaylight.controller.cluster.raft.persisted.ApplyJournalEntries;
+import org.opendaylight.controller.cluster.raft.persisted.ByteState;
 import org.opendaylight.controller.cluster.raft.persisted.ServerConfigurationPayload;
 import org.opendaylight.controller.cluster.raft.persisted.ServerInfo;
 import org.opendaylight.controller.cluster.raft.persisted.SimpleReplicatedLogEntry;
+import org.opendaylight.controller.cluster.raft.persisted.Snapshot;
+import org.opendaylight.controller.cluster.raft.persisted.Snapshot.State;
 import org.opendaylight.controller.cluster.raft.persisted.UpdateElectionTerm;
 import org.opendaylight.controller.cluster.raft.policy.DisableElectionsRaftPolicy;
 import org.opendaylight.controller.cluster.raft.utils.InMemoryJournal;
@@ -103,7 +113,7 @@ public class MigratedMessagesTest extends AbstractActorTest {
         doTestSnapshotAfterStartupWithMigratedMessage(persistenceId, true, snapshot -> {
             assertEquals("getElectionVotedFor", persistenceId, snapshot.getElectionVotedFor());
             assertEquals("getElectionTerm", 5, snapshot.getElectionTerm());
-        });
+        }, ByteState.empty());
 
         TEST_LOG.info("testSnapshotAfterStartupWithMigratedUpdateElectionTermAndPersistenceEnabled ending");
     }
@@ -122,7 +132,7 @@ public class MigratedMessagesTest extends AbstractActorTest {
         doTestSnapshotAfterStartupWithMigratedMessage(persistenceId, false, snapshot -> {
             assertEquals("getElectionVotedFor", persistenceId, snapshot.getElectionVotedFor());
             assertEquals("getElectionTerm", 5, snapshot.getElectionTerm());
-        });
+        }, ByteState.empty());
 
         TEST_LOG.info("testSnapshotAfterStartupWithMigratedUpdateElectionTermAndPersistenceDisabled ending");
     }
@@ -145,7 +155,7 @@ public class MigratedMessagesTest extends AbstractActorTest {
             assertEquals("getLastAppliedTerm", 1, snapshot.getLastAppliedTerm());
             assertEquals("getLastIndex", 0, snapshot.getLastIndex());
             assertEquals("getLastTerm", 1, snapshot.getLastTerm());
-        });
+        }, ByteState.empty());
 
         TEST_LOG.info("testSnapshotAfterStartupWithMigratedApplyJournalEntries ending");
     }
@@ -164,12 +174,17 @@ public class MigratedMessagesTest extends AbstractActorTest {
 
         RaftActorSnapshotCohort snapshotCohort = new RaftActorSnapshotCohort() {
             @Override
-            public void createSnapshot(ActorRef actorRef) {
-                actorRef.tell(new CaptureSnapshotReply(new byte[0]), actorRef);
+            public void createSnapshot(ActorRef actorRef, java.util.Optional<OutputStream> installSnapshotStream) {
+                actorRef.tell(new CaptureSnapshotReply(ByteState.empty(), installSnapshotStream), actorRef);
             }
 
             @Override
-            public void applySnapshot(byte[] snapshotBytes) {
+            public void applySnapshot(Snapshot.State snapshotState) {
+            }
+
+            @Override
+            public State deserializeSnapshot(ByteSource snapshotBytes) {
+                throw new UnsupportedOperationException();
             }
         };
 
@@ -204,7 +219,7 @@ public class MigratedMessagesTest extends AbstractActorTest {
             assertEquals("Unapplied entry term", 1, snapshot.getUnAppliedEntries().get(0).getTerm());
             assertEquals("Unapplied entry index", 0, snapshot.getUnAppliedEntries().get(0).getIndex());
             assertEquals("Unapplied entry data", expPayload, snapshot.getUnAppliedEntries().get(0).getData());
-        });
+        }, ByteState.empty());
 
         TEST_LOG.info("testSnapshotAfterStartupWithMigratedReplicatedLogEntry ending");
     }
@@ -231,14 +246,53 @@ public class MigratedMessagesTest extends AbstractActorTest {
                 assertEquals("getElectionTerm", 1, snapshot.getElectionTerm());
                 assertEquals("getServerConfiguration", new HashSet<>(expectedServerConfig.getServerConfig()),
                         new HashSet<>(snapshot.getServerConfiguration().getServerConfig()));
-            });
+            }, ByteState.empty());
 
         return actor;
     }
 
+    @Test
+    public void testSnapshotAfterStartupWithMigratedSnapshot() throws Exception {
+        TEST_LOG.info("testSnapshotAfterStartupWithMigratedSnapshot starting");
+
+        String persistenceId = factory.generateActorId("test-actor-");
+
+        List<Object> snapshotData = Arrays.asList(new MockPayload("1"));
+        final MockSnapshotState snapshotState = new MockSnapshotState(snapshotData);
+
+        org.opendaylight.controller.cluster.raft.Snapshot legacy = org.opendaylight.controller.cluster.raft.Snapshot
+            .create(SerializationUtils.serialize((Serializable) snapshotData),
+                Arrays.asList(new SimpleReplicatedLogEntry(6, 2, new MockPayload("payload"))),
+                6, 2, 5, 1, 3, "member-1", new ServerConfigurationPayload(Arrays.asList(
+                        new ServerInfo(persistenceId, true), new ServerInfo("2", false))));
+        InMemorySnapshotStore.addSnapshot(persistenceId, legacy);
+
+        doTestSnapshotAfterStartupWithMigratedMessage(persistenceId, true, snapshot -> {
+            assertEquals("getLastIndex", legacy.getLastIndex(), snapshot.getLastIndex());
+            assertEquals("getLastTerm", legacy.getLastTerm(), snapshot.getLastTerm());
+            assertEquals("getLastAppliedIndex", legacy.getLastAppliedIndex(), snapshot.getLastAppliedIndex());
+            assertEquals("getLastAppliedTerm", legacy.getLastAppliedTerm(), snapshot.getLastAppliedTerm());
+            assertEquals("getState", snapshotState, snapshot.getState());
+            assertEquals("Unapplied entries size", legacy.getUnAppliedEntries().size(),
+                    snapshot.getUnAppliedEntries().size());
+            assertEquals("Unapplied entry term", legacy.getUnAppliedEntries().get(0).getTerm(),
+                    snapshot.getUnAppliedEntries().get(0).getTerm());
+            assertEquals("Unapplied entry index", legacy.getUnAppliedEntries().get(0).getIndex(),
+                    snapshot.getUnAppliedEntries().get(0).getIndex());
+            assertEquals("Unapplied entry data", legacy.getUnAppliedEntries().get(0).getData(),
+                    snapshot.getUnAppliedEntries().get(0).getData());
+            assertEquals("getElectionVotedFor", legacy.getElectionVotedFor(), snapshot.getElectionVotedFor());
+            assertEquals("getElectionTerm", legacy.getElectionTerm(), snapshot.getElectionTerm());
+            assertEquals("getServerConfiguration", Sets.newHashSet(legacy.getServerConfiguration().getServerConfig()),
+                    Sets.newHashSet(snapshot.getServerConfiguration().getServerConfig()));
+        }, snapshotState);
+
+        TEST_LOG.info("testSnapshotAfterStartupWithMigratedSnapshot ending");
+    }
+
     @SuppressWarnings("checkstyle:IllegalCatch")
     private TestActorRef<MockRaftActor> doTestSnapshotAfterStartupWithMigratedMessage(String id, boolean persistent,
-            Consumer<Snapshot> snapshotVerifier) {
+            Consumer<Snapshot> snapshotVerifier, final State snapshotState) {
         InMemorySnapshotStore.addSnapshotSavedLatch(id);
         InMemoryJournal.addDeleteMessagesCompleteLatch(id);
         DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
@@ -246,12 +300,17 @@ public class MigratedMessagesTest extends AbstractActorTest {
 
         RaftActorSnapshotCohort snapshotCohort = new RaftActorSnapshotCohort() {
             @Override
-            public void createSnapshot(ActorRef actorRef) {
-                actorRef.tell(new CaptureSnapshotReply(new byte[0]), actorRef);
+            public void createSnapshot(ActorRef actorRef, java.util.Optional<OutputStream> installSnapshotStream) {
+                actorRef.tell(new CaptureSnapshotReply(snapshotState, installSnapshotStream), actorRef);
+            }
+
+            @Override
+            public void applySnapshot(State newState) {
             }
 
             @Override
-            public void applySnapshot(byte[] snapshotBytes) {
+            public State deserializeSnapshot(ByteSource snapshotBytes) {
+                throw new UnsupportedOperationException();
             }
         };
 
index dbced5f..20adaf1 100644 (file)
@@ -16,10 +16,10 @@ import akka.actor.Props;
 import com.google.common.base.Function;
 import com.google.common.base.Optional;
 import com.google.common.base.Throwables;
+import com.google.common.io.ByteSource;
 import com.google.common.util.concurrent.Uninterruptibles;
-import java.io.ByteArrayInputStream;
 import java.io.IOException;
-import java.io.ObjectInputStream;
+import java.io.OutputStream;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
@@ -27,8 +27,10 @@ import java.util.Map;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
 import javax.annotation.Nonnull;
+import org.apache.commons.lang3.SerializationUtils;
 import org.opendaylight.controller.cluster.DataPersistenceProvider;
 import org.opendaylight.controller.cluster.raft.behaviors.RaftActorBehavior;
+import org.opendaylight.controller.cluster.raft.persisted.Snapshot;
 import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload;
 import org.opendaylight.yangtools.concepts.Identifier;
 
@@ -169,38 +171,43 @@ public class MockRaftActor extends RaftActor implements RaftActorRecoveryCohort,
     }
 
     @Override
-    public void applyRecoverySnapshot(byte[] bytes) {
-        recoveryCohortDelegate.applyRecoverySnapshot(bytes);
-        applySnapshotBytes(bytes);
+    public void applyRecoverySnapshot(Snapshot.State newState) {
+        recoveryCohortDelegate.applyRecoverySnapshot(newState);
+        applySnapshotState(newState);
     }
 
-    private void applySnapshotBytes(byte[] bytes) {
-        if (bytes.length == 0) {
-            return;
-        }
-
-        try {
-            Object data = toObject(bytes);
-            if (data instanceof List) {
-                state.clear();
-                state.addAll((List<?>) data);
-            }
-        } catch (ClassNotFoundException | IOException e) {
-            Throwables.propagate(e);
+    private void applySnapshotState(Snapshot.State newState) {
+        if (newState instanceof MockSnapshotState) {
+            state.clear();
+            state.addAll(((MockSnapshotState)newState).getState());
         }
     }
 
     @Override
-    public void createSnapshot(ActorRef actorRef) {
+    public void createSnapshot(ActorRef actorRef, java.util.Optional<OutputStream> installSnapshotStream) {
         LOG.info("{}: createSnapshot called", persistenceId());
-        snapshotCohortDelegate.createSnapshot(actorRef);
+        snapshotCohortDelegate.createSnapshot(actorRef, installSnapshotStream);
     }
 
     @Override
-    public void applySnapshot(byte [] snapshot) {
+    public void applySnapshot(Snapshot.State newState) {
         LOG.info("{}: applySnapshot called", persistenceId());
-        applySnapshotBytes(snapshot);
-        snapshotCohortDelegate.applySnapshot(snapshot);
+        applySnapshotState(newState);
+        snapshotCohortDelegate.applySnapshot(newState);
+    }
+
+    @Override
+    public Snapshot.State deserializeSnapshot(ByteSource snapshotBytes) {
+        try {
+            return (Snapshot.State) SerializationUtils.deserialize(snapshotBytes.read());
+        } catch (IOException e) {
+            throw new RuntimeException("Error deserializing state", e);
+        }
+    }
+
+    @Override
+    public Snapshot.State deserializePreCarbonSnapshot(byte[] from) {
+        return new MockSnapshotState(SerializationUtils.deserialize(from));
     }
 
     @Override
@@ -243,23 +250,12 @@ public class MockRaftActor extends RaftActor implements RaftActorRecoveryCohort,
         }
     }
 
-    public static Object toObject(byte[] bs) throws ClassNotFoundException, IOException {
-        Object obj = null;
-        ByteArrayInputStream bis = null;
-        ObjectInputStream ois = null;
-        try {
-            bis = new ByteArrayInputStream(bs);
-            ois = new ObjectInputStream(bis);
-            obj = ois.readObject();
-        } finally {
-            if (bis != null) {
-                bis.close();
-            }
-            if (ois != null) {
-                ois.close();
-            }
+    public static List<Object> fromState(Snapshot.State from) {
+        if (from instanceof MockSnapshotState) {
+            return ((MockSnapshotState)from).getState();
         }
-        return obj;
+
+        throw new IllegalStateException("Unexpected snapshot State: " + from);
     }
 
     public ReplicatedLog getReplicatedLog() {
@@ -367,4 +363,53 @@ public class MockRaftActor extends RaftActor implements RaftActorRecoveryCohort,
             super(MockRaftActor.class);
         }
     }
+
+    public static class MockSnapshotState implements Snapshot.State {
+        private static final long serialVersionUID = 1L;
+
+        private final List<Object> state;
+
+        public MockSnapshotState(List<Object> state) {
+            this.state = state;
+        }
+
+        public List<Object> getState() {
+            return state;
+        }
+
+        @Override
+        public int hashCode() {
+            final int prime = 31;
+            int result = 1;
+            result = prime * result + (state == null ? 0 : state.hashCode());
+            return result;
+        }
+
+        @Override
+        public boolean equals(Object obj) {
+            if (this == obj) {
+                return true;
+            }
+            if (obj == null) {
+                return false;
+            }
+            if (getClass() != obj.getClass()) {
+                return false;
+            }
+            MockSnapshotState other = (MockSnapshotState) obj;
+            if (state == null) {
+                if (other.state != null) {
+                    return false;
+                }
+            } else if (!state.equals(other.state)) {
+                return false;
+            }
+            return true;
+        }
+
+        @Override
+        public String toString() {
+            return "MockSnapshotState [state=" + state + "]";
+        }
+    }
 }
index cdcaadf..d92f072 100644 (file)
@@ -14,12 +14,17 @@ import akka.actor.ActorSystem;
 import akka.actor.Props;
 import akka.japi.Procedure;
 import com.google.common.base.Throwables;
+import com.google.common.io.ByteSource;
+import java.io.IOException;
+import java.io.OutputStream;
 import java.io.Serializable;
 import java.util.HashMap;
 import java.util.Map;
 import org.opendaylight.controller.cluster.NonPersistentDataProvider;
 import org.opendaylight.controller.cluster.raft.behaviors.RaftActorBehavior;
+import org.opendaylight.controller.cluster.raft.persisted.ByteState;
 import org.opendaylight.controller.cluster.raft.persisted.SimpleReplicatedLogEntry;
+import org.opendaylight.controller.cluster.raft.persisted.Snapshot.State;
 import org.opendaylight.controller.cluster.raft.policy.RaftPolicy;
 import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload;
 import org.slf4j.Logger;
@@ -120,7 +125,23 @@ public class MockRaftActorContext extends RaftActorContextImpl {
     @Override
     public SnapshotManager getSnapshotManager() {
         SnapshotManager snapshotManager = super.getSnapshotManager();
-        snapshotManager.setCreateSnapshotRunnable(() -> { });
+        snapshotManager.setCreateSnapshotConsumer(out -> { });
+
+        snapshotManager.setSnapshotCohort(new RaftActorSnapshotCohort() {
+            @Override
+            public State deserializeSnapshot(ByteSource snapshotBytes) throws IOException {
+                return ByteState.of(snapshotBytes.read());
+            }
+
+            @Override
+            public void createSnapshot(ActorRef actorRef, java.util.Optional<OutputStream> installSnapshotStream) {
+            }
+
+            @Override
+            public void applySnapshot(State snapshotState) {
+            }
+        });
+
         return snapshotManager;
     }
 
index e0abd17..828e3eb 100644 (file)
@@ -10,9 +10,10 @@ package org.opendaylight.controller.cluster.raft;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
-import static org.mockito.AdditionalMatchers.aryEq;
 import static org.mockito.Matchers.any;
 import static org.mockito.Matchers.anyInt;
+import static org.mockito.Matchers.anyObject;
+import static org.mockito.Mockito.doAnswer;
 import static org.mockito.Mockito.doNothing;
 import static org.mockito.Mockito.doReturn;
 import static org.mockito.Mockito.never;
@@ -23,8 +24,11 @@ import akka.persistence.RecoveryCompleted;
 import akka.persistence.SnapshotMetadata;
 import akka.persistence.SnapshotOffer;
 import com.google.common.collect.Sets;
+import java.io.Serializable;
 import java.util.Arrays;
 import java.util.Collections;
+import java.util.List;
+import org.apache.commons.lang3.SerializationUtils;
 import org.hamcrest.Description;
 import org.junit.Before;
 import org.junit.Test;
@@ -36,11 +40,14 @@ import org.mockito.Mockito;
 import org.mockito.MockitoAnnotations;
 import org.opendaylight.controller.cluster.DataPersistenceProvider;
 import org.opendaylight.controller.cluster.PersistentDataProvider;
+import org.opendaylight.controller.cluster.raft.MockRaftActor.MockSnapshotState;
+import org.opendaylight.controller.cluster.raft.MockRaftActorContext.MockPayload;
 import org.opendaylight.controller.cluster.raft.persisted.ApplyJournalEntries;
 import org.opendaylight.controller.cluster.raft.persisted.DeleteEntries;
 import org.opendaylight.controller.cluster.raft.persisted.ServerConfigurationPayload;
 import org.opendaylight.controller.cluster.raft.persisted.ServerInfo;
 import org.opendaylight.controller.cluster.raft.persisted.SimpleReplicatedLogEntry;
+import org.opendaylight.controller.cluster.raft.persisted.Snapshot;
 import org.opendaylight.controller.cluster.raft.persisted.UpdateElectionTerm;
 import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload;
 import org.slf4j.Logger;
@@ -62,6 +69,9 @@ public class RaftActorRecoverySupportTest {
     @Mock
     private RaftActorRecoveryCohort mockCohort;
 
+    @Mock
+    private RaftActorSnapshotCohort mockSnapshotCohort;
+
     @Mock
     PersistentDataProvider mockPersistentProvider;
 
@@ -163,7 +173,44 @@ public class RaftActorRecoverySupportTest {
         replicatedLog.append(new SimpleReplicatedLogEntry(2, 1, new MockRaftActorContext.MockPayload("2")));
         replicatedLog.append(new SimpleReplicatedLogEntry(3, 1, new MockRaftActorContext.MockPayload("3")));
 
-        byte[] snapshotBytes = {1,2,3,4,5};
+        ReplicatedLogEntry unAppliedEntry1 = new SimpleReplicatedLogEntry(4, 1,
+                new MockRaftActorContext.MockPayload("4", 4));
+
+        ReplicatedLogEntry unAppliedEntry2 = new SimpleReplicatedLogEntry(5, 1,
+                new MockRaftActorContext.MockPayload("5", 5));
+
+        long lastAppliedDuringSnapshotCapture = 3;
+        long lastIndexDuringSnapshotCapture = 5;
+        long electionTerm = 2;
+        String electionVotedFor = "member-2";
+
+        MockSnapshotState snapshotState = new MockSnapshotState(Arrays.asList(new MockPayload("1")));
+        Snapshot snapshot = Snapshot.create(snapshotState,
+                Arrays.asList(unAppliedEntry1, unAppliedEntry2), lastIndexDuringSnapshotCapture, 1,
+                lastAppliedDuringSnapshotCapture, 1, electionTerm, electionVotedFor, null);
+
+        SnapshotMetadata metadata = new SnapshotMetadata("test", 6, 12345);
+        SnapshotOffer snapshotOffer = new SnapshotOffer(metadata , snapshot);
+
+        sendMessageToSupport(snapshotOffer);
+
+        assertEquals("Journal log size", 2, context.getReplicatedLog().size());
+        assertEquals("Journal data size", 9, context.getReplicatedLog().dataSize());
+        assertEquals("Last index", lastIndexDuringSnapshotCapture, context.getReplicatedLog().lastIndex());
+        assertEquals("Last applied", lastAppliedDuringSnapshotCapture, context.getLastApplied());
+        assertEquals("Commit index", lastAppliedDuringSnapshotCapture, context.getCommitIndex());
+        assertEquals("Snapshot term", 1, context.getReplicatedLog().getSnapshotTerm());
+        assertEquals("Snapshot index", lastAppliedDuringSnapshotCapture, context.getReplicatedLog().getSnapshotIndex());
+        assertEquals("Election term", electionTerm, context.getTermInformation().getCurrentTerm());
+        assertEquals("Election votedFor", electionVotedFor, context.getTermInformation().getVotedFor());
+        assertFalse("Dynamic server configuration", context.isDynamicServerConfigurationInUse());
+
+        verify(mockCohort).applyRecoverySnapshot(snapshotState);
+    }
+
+    @Deprecated
+    @Test
+    public void testOnSnapshotOfferWithPreCarbonSnapshot() {
 
         ReplicatedLogEntry unAppliedEntry1 = new SimpleReplicatedLogEntry(4, 1,
                 new MockRaftActorContext.MockPayload("4", 4));
@@ -176,12 +223,21 @@ public class RaftActorRecoverySupportTest {
         long electionTerm = 2;
         String electionVotedFor = "member-2";
 
-        Snapshot snapshot = Snapshot.create(snapshotBytes, Arrays.asList(unAppliedEntry1, unAppliedEntry2),
-                lastIndexDuringSnapshotCapture, 1, lastAppliedDuringSnapshotCapture, 1, electionTerm, electionVotedFor);
+        List<Object> snapshotData = Arrays.asList(new MockPayload("1"));
+        final MockSnapshotState snapshotState = new MockSnapshotState(snapshotData);
+
+        org.opendaylight.controller.cluster.raft.Snapshot snapshot = org.opendaylight.controller.cluster.raft.Snapshot
+            .create(SerializationUtils.serialize((Serializable) snapshotData),
+                Arrays.asList(unAppliedEntry1, unAppliedEntry2), lastIndexDuringSnapshotCapture, 1,
+                lastAppliedDuringSnapshotCapture, 1, electionTerm, electionVotedFor, null);
 
         SnapshotMetadata metadata = new SnapshotMetadata("test", 6, 12345);
         SnapshotOffer snapshotOffer = new SnapshotOffer(metadata , snapshot);
 
+        doAnswer(invocation -> new MockSnapshotState(SerializationUtils.deserialize(
+            invocation.getArgumentAt(0, byte[].class))))
+                .when(mockCohort).deserializePreCarbonSnapshot(any(byte[].class));
+
         sendMessageToSupport(snapshotOffer);
 
         assertEquals("Journal log size", 2, context.getReplicatedLog().size());
@@ -195,7 +251,7 @@ public class RaftActorRecoverySupportTest {
         assertEquals("Election votedFor", electionVotedFor, context.getTermInformation().getVotedFor());
         assertFalse("Dynamic server configuration", context.isDynamicServerConfigurationInUse());
 
-        verify(mockCohort).applyRecoverySnapshot(snapshotBytes);
+        verify(mockCohort).applyRecoverySnapshot(snapshotState);
     }
 
     @Test
@@ -255,11 +311,12 @@ public class RaftActorRecoverySupportTest {
 
     @Test
     public void testDataRecoveredWithPersistenceDisabled() {
-        doNothing().when(mockCohort).applyRecoverySnapshot(aryEq(new byte[0]));
+        doNothing().when(mockCohort).applyRecoverySnapshot(anyObject());
         doReturn(false).when(mockPersistence).isRecoveryApplicable();
         doReturn(10L).when(mockPersistentProvider).getLastSequenceNumber();
 
-        Snapshot snapshot = Snapshot.create(new byte[]{1}, Collections.<ReplicatedLogEntry>emptyList(), 3, 1, 3, 1);
+        Snapshot snapshot = Snapshot.create(new MockSnapshotState(Arrays.asList(new MockPayload("1"))),
+                Collections.<ReplicatedLogEntry>emptyList(), 3, 1, 3, 1, -1, null, null);
         SnapshotOffer snapshotOffer = new SnapshotOffer(new SnapshotMetadata("test", 6, 12345), snapshot);
 
         sendMessageToSupport(snapshotOffer);
@@ -285,7 +342,7 @@ public class RaftActorRecoverySupportTest {
 
         sendMessageToSupport(RecoveryCompleted.getInstance(), true);
 
-        verify(mockCohort).applyRecoverySnapshot(aryEq(new byte[0]));
+        verify(mockCohort, never()).applyRecoverySnapshot(anyObject());
         verify(mockCohort, never()).getRestoreFromSnapshot();
         verifyNoMoreInteractions(mockCohort);
 
@@ -389,7 +446,8 @@ public class RaftActorRecoverySupportTest {
                                                         new ServerInfo("follower1", true),
                                                         new ServerInfo("follower2", true)));
 
-        Snapshot snapshot = Snapshot.create(new byte[]{1}, Collections.<ReplicatedLogEntry>emptyList(),
+        MockSnapshotState snapshotState = new MockSnapshotState(Arrays.asList(new MockPayload("1")));
+        Snapshot snapshot = Snapshot.create(snapshotState, Collections.<ReplicatedLogEntry>emptyList(),
                 -1, -1, -1, -1, electionTerm, electionVotedFor, serverPayload);
 
         SnapshotMetadata metadata = new SnapshotMetadata("test", 6, 12345);
index a19281f..71412f3 100644 (file)
@@ -26,11 +26,15 @@ import com.google.common.base.Stopwatch;
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.Maps;
 import com.google.common.collect.Sets;
+import com.google.common.io.ByteSource;
+import java.io.OutputStream;
+import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.TimeUnit;
+import org.apache.commons.lang3.SerializationUtils;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
@@ -58,6 +62,7 @@ import org.opendaylight.controller.cluster.raft.messages.ServerChangeStatus;
 import org.opendaylight.controller.cluster.raft.messages.ServerRemoved;
 import org.opendaylight.controller.cluster.raft.messages.UnInitializedFollowerSnapshotReply;
 import org.opendaylight.controller.cluster.raft.persisted.ApplyJournalEntries;
+import org.opendaylight.controller.cluster.raft.persisted.ByteState;
 import org.opendaylight.controller.cluster.raft.persisted.ServerConfigurationPayload;
 import org.opendaylight.controller.cluster.raft.persisted.ServerInfo;
 import org.opendaylight.controller.cluster.raft.persisted.SimpleReplicatedLogEntry;
@@ -167,8 +172,7 @@ public class RaftActorServerConfigurationSupportTest extends AbstractActorTest {
         // Leader should install snapshot - capture and verify ApplySnapshot contents
 
         ApplySnapshot applySnapshot = expectFirstMatching(newFollowerCollectorActor, ApplySnapshot.class);
-        @SuppressWarnings("unchecked")
-        List<Object> snapshotState = (List<Object>) MockRaftActor.toObject(applySnapshot.getSnapshot().getState());
+        List<Object> snapshotState = MockRaftActor.fromState(applySnapshot.getSnapshot().getState());
         assertEquals("Snapshot state", snapshotState, leaderRaftActor.getState());
 
         AddServerReply addServerReply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"), AddServerReply.class);
@@ -248,8 +252,7 @@ public class RaftActorServerConfigurationSupportTest extends AbstractActorTest {
         // Leader should install snapshot - capture and verify ApplySnapshot contents
 
         ApplySnapshot applySnapshot = expectFirstMatching(newFollowerCollectorActor, ApplySnapshot.class);
-        @SuppressWarnings("unchecked")
-        List<Object> snapshotState = (List<Object>) MockRaftActor.toObject(applySnapshot.getSnapshot().getState());
+        List<Object> snapshotState = MockRaftActor.fromState(applySnapshot.getSnapshot().getState());
         assertEquals("Snapshot state", snapshotState, leaderRaftActor.getState());
 
         AddServerReply addServerReply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"), AddServerReply.class);
@@ -1518,12 +1521,19 @@ public class RaftActorServerConfigurationSupportTest extends AbstractActorTest {
             super(id, peerAddresses, config, persistent, collectorActor);
             snapshotCohortDelegate = new RaftActorSnapshotCohort() {
                 @Override
-                public void createSnapshot(ActorRef actorRef) {
-                    actorRef.tell(new CaptureSnapshotReply(new byte[0]), actorRef);
+                public void createSnapshot(ActorRef actorRef, java.util.Optional<OutputStream> installSnapshotStream) {
+                    actorRef.tell(new CaptureSnapshotReply(ByteState.empty(), installSnapshotStream), actorRef);
                 }
 
                 @Override
-                public void applySnapshot(byte[] snapshotBytes) {
+                public void applySnapshot(
+                        org.opendaylight.controller.cluster.raft.persisted.Snapshot.State snapshotState) {
+                }
+
+                @Override
+                public org.opendaylight.controller.cluster.raft.persisted.Snapshot.State deserializeSnapshot(
+                        ByteSource snapshotBytes) {
+                    throw new UnsupportedOperationException();
                 }
             };
         }
@@ -1564,12 +1574,13 @@ public class RaftActorServerConfigurationSupportTest extends AbstractActorTest {
 
         @Override
         @SuppressWarnings("checkstyle:IllegalCatch")
-        public void createSnapshot(ActorRef actorRef) {
-            try {
-                actorRef.tell(new CaptureSnapshotReply(RaftActorTest.fromObject(getState()).toByteArray()), actorRef);
-            } catch (Exception e) {
-                LOG.error("createSnapshot failed", e);
+        public void createSnapshot(ActorRef actorRef, java.util.Optional<OutputStream> installSnapshotStream) {
+            MockSnapshotState snapshotState = new MockSnapshotState(new ArrayList<>(getState()));
+            if (installSnapshotStream.isPresent()) {
+                SerializationUtils.serialize(snapshotState, installSnapshotStream.get());
             }
+
+            actorRef.tell(new CaptureSnapshotReply(snapshotState, installSnapshotStream), actorRef);
         }
 
         static Props props(Map<String, String> peerAddresses, RaftActorContext fromContext) {
index 45b89b7..495cc6d 100644 (file)
@@ -10,15 +10,17 @@ package org.opendaylight.controller.cluster.raft;
 import static org.junit.Assert.assertEquals;
 import static org.mockito.Matchers.anyLong;
 import static org.mockito.Matchers.eq;
-import static org.mockito.Matchers.same;
 import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.verify;
 
 import akka.actor.ActorRef;
 import akka.persistence.SaveSnapshotFailure;
 import akka.persistence.SaveSnapshotSuccess;
 import akka.persistence.SnapshotMetadata;
+import java.io.OutputStream;
 import java.util.Collections;
+import java.util.Optional;
 import org.junit.Before;
 import org.junit.Test;
 import org.mockito.Mock;
@@ -27,6 +29,8 @@ import org.opendaylight.controller.cluster.DataPersistenceProvider;
 import org.opendaylight.controller.cluster.raft.base.messages.ApplySnapshot;
 import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshotReply;
 import org.opendaylight.controller.cluster.raft.behaviors.RaftActorBehavior;
+import org.opendaylight.controller.cluster.raft.persisted.ByteState;
+import org.opendaylight.controller.cluster.raft.persisted.Snapshot;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -95,8 +99,8 @@ public class RaftActorSnapshotMessageSupportTest {
         long lastIndexDuringSnapshotCapture = 2;
         byte[] snapshotBytes = {1,2,3,4,5};
 
-        Snapshot snapshot = Snapshot.create(snapshotBytes, Collections.<ReplicatedLogEntry>emptyList(),
-                lastIndexDuringSnapshotCapture, 1, lastAppliedDuringSnapshotCapture, 1);
+        Snapshot snapshot = Snapshot.create(ByteState.of(snapshotBytes), Collections.<ReplicatedLogEntry>emptyList(),
+                lastIndexDuringSnapshotCapture, 1, lastAppliedDuringSnapshotCapture, 1, -1, null, null);
 
         ApplySnapshot applySnapshot = new ApplySnapshot(snapshot);
         sendMessageToSupport(applySnapshot);
@@ -106,11 +110,11 @@ public class RaftActorSnapshotMessageSupportTest {
 
     @Test
     public void testOnCaptureSnapshotReply() {
+        ByteState state = ByteState.of(new byte[]{1,2,3,4,5});
+        Optional<OutputStream> optionalStream = Optional.of(mock(OutputStream.class));
+        sendMessageToSupport(new CaptureSnapshotReply(state, optionalStream));
 
-        byte[] snapshot = {1,2,3,4,5};
-        sendMessageToSupport(new CaptureSnapshotReply(snapshot));
-
-        verify(mockSnapshotManager).persist(same(snapshot), anyLong());
+        verify(mockSnapshotManager).persist(eq(state), eq(optionalStream), anyLong());
     }
 
     @Test
index edc5109..d674afb 100644 (file)
@@ -8,7 +8,6 @@
 
 package org.opendaylight.controller.cluster.raft;
 
-import static org.junit.Assert.assertArrayEquals;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertNotSame;
@@ -63,10 +62,10 @@ import org.opendaylight.controller.cluster.NonPersistentDataProvider;
 import org.opendaylight.controller.cluster.PersistentDataProvider;
 import org.opendaylight.controller.cluster.notifications.LeaderStateChanged;
 import org.opendaylight.controller.cluster.notifications.RoleChanged;
+import org.opendaylight.controller.cluster.raft.MockRaftActor.MockSnapshotState;
 import org.opendaylight.controller.cluster.raft.MockRaftActorContext.MockPayload;
 import org.opendaylight.controller.cluster.raft.base.messages.ApplySnapshot;
 import org.opendaylight.controller.cluster.raft.base.messages.ApplyState;
-import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshot;
 import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshotReply;
 import org.opendaylight.controller.cluster.raft.base.messages.LeaderTransitioning;
 import org.opendaylight.controller.cluster.raft.base.messages.SendHeartBeat;
@@ -79,10 +78,13 @@ import org.opendaylight.controller.cluster.raft.client.messages.GetSnapshotReply
 import org.opendaylight.controller.cluster.raft.messages.AppendEntries;
 import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply;
 import org.opendaylight.controller.cluster.raft.persisted.ApplyJournalEntries;
+import org.opendaylight.controller.cluster.raft.persisted.ByteState;
 import org.opendaylight.controller.cluster.raft.persisted.DeleteEntries;
+import org.opendaylight.controller.cluster.raft.persisted.EmptyState;
 import org.opendaylight.controller.cluster.raft.persisted.ServerConfigurationPayload;
 import org.opendaylight.controller.cluster.raft.persisted.ServerInfo;
 import org.opendaylight.controller.cluster.raft.persisted.SimpleReplicatedLogEntry;
+import org.opendaylight.controller.cluster.raft.persisted.Snapshot;
 import org.opendaylight.controller.cluster.raft.persisted.UpdateElectionTerm;
 import org.opendaylight.controller.cluster.raft.policy.DisableElectionsRaftPolicy;
 import org.opendaylight.controller.cluster.raft.utils.InMemoryJournal;
@@ -153,15 +155,14 @@ public class RaftActorTest extends AbstractActorTest {
         int lastIndexDuringSnapshotCapture = 4;
 
         // 4 messages as part of snapshot, which are applied to state
-        ByteString snapshotBytes = fromObject(Arrays.asList(
+        MockSnapshotState snapshotState = new MockSnapshotState(Arrays.asList(
                 new MockRaftActorContext.MockPayload("A"),
                 new MockRaftActorContext.MockPayload("B"),
                 new MockRaftActorContext.MockPayload("C"),
                 new MockRaftActorContext.MockPayload("D")));
 
-        Snapshot snapshot = Snapshot.create(snapshotBytes.toByteArray(),
-                snapshotUnappliedEntries, lastIndexDuringSnapshotCapture, 1,
-                lastAppliedDuringSnapshotCapture, 1);
+        Snapshot snapshot = Snapshot.create(snapshotState, snapshotUnappliedEntries, lastIndexDuringSnapshotCapture, 1,
+                lastAppliedDuringSnapshotCapture, 1, -1, null, null);
         InMemorySnapshotStore.addSnapshot(persistenceId, snapshot);
 
         // add more entries after snapshot is taken
@@ -292,7 +293,8 @@ public class RaftActorTest extends AbstractActorTest {
         RaftActorRecoverySupport mockSupport = mock(RaftActorRecoverySupport.class);
         mockRaftActor.setRaftActorRecoverySupport(mockSupport );
 
-        Snapshot snapshot = Snapshot.create(new byte[]{1}, Collections.<ReplicatedLogEntry>emptyList(), 3, 1, 3, 1);
+        Snapshot snapshot = Snapshot.create(ByteState.of(new byte[]{1}),
+                Collections.<ReplicatedLogEntry>emptyList(), 3, 1, 3, 1, -1, null, null);
         SnapshotOffer snapshotOffer = new SnapshotOffer(new SnapshotMetadata("test", 6, 12345), snapshot);
         mockRaftActor.handleRecover(snapshotOffer);
 
@@ -337,11 +339,8 @@ public class RaftActorTest extends AbstractActorTest {
         doReturn(true).when(mockSupport).handleSnapshotMessage(same(applySnapshot), any(ActorRef.class));
         mockRaftActor.handleCommand(applySnapshot);
 
-        CaptureSnapshot captureSnapshot = new CaptureSnapshot(1, 1, 1, 1, 0, 1, null);
-        doReturn(true).when(mockSupport).handleSnapshotMessage(same(captureSnapshot), any(ActorRef.class));
-        mockRaftActor.handleCommand(captureSnapshot);
-
-        CaptureSnapshotReply captureSnapshotReply = new CaptureSnapshotReply(new byte[0]);
+        CaptureSnapshotReply captureSnapshotReply = new CaptureSnapshotReply(ByteState.empty(),
+                java.util.Optional.empty());
         doReturn(true).when(mockSupport).handleSnapshotMessage(same(captureSnapshotReply), any(ActorRef.class));
         mockRaftActor.handleCommand(captureSnapshotReply);
 
@@ -362,7 +361,6 @@ public class RaftActorTest extends AbstractActorTest {
         mockRaftActor.handleCommand(GetSnapshot.INSTANCE);
 
         verify(mockSupport).handleSnapshotMessage(same(applySnapshot), any(ActorRef.class));
-        verify(mockSupport).handleSnapshotMessage(same(captureSnapshot), any(ActorRef.class));
         verify(mockSupport).handleSnapshotMessage(same(captureSnapshotReply), any(ActorRef.class));
         verify(mockSupport).handleSnapshotMessage(same(saveSnapshotSuccess), any(ActorRef.class));
         verify(mockSupport).handleSnapshotMessage(same(saveSnapshotFailure), any(ActorRef.class));
@@ -592,7 +590,7 @@ public class RaftActorTest extends AbstractActorTest {
         leaderActor.getRaftActorContext().getSnapshotManager().capture(
                 new SimpleReplicatedLogEntry(6, 1, new MockRaftActorContext.MockPayload("x")), 4);
 
-        verify(leaderActor.snapshotCohortDelegate).createSnapshot(any(ActorRef.class));
+        verify(leaderActor.snapshotCohortDelegate).createSnapshot(anyObject(), anyObject());
 
         assertEquals(8, leaderActor.getReplicatedLog().size());
 
@@ -611,14 +609,14 @@ public class RaftActorTest extends AbstractActorTest {
 
         assertEquals(8, leaderActor.getReplicatedLog().size());
 
-        ByteString snapshotBytes = fromObject(Arrays.asList(
+        MockSnapshotState snapshotState = new MockSnapshotState(Arrays.asList(
                 new MockRaftActorContext.MockPayload("foo-0"),
                 new MockRaftActorContext.MockPayload("foo-1"),
                 new MockRaftActorContext.MockPayload("foo-2"),
                 new MockRaftActorContext.MockPayload("foo-3"),
                 new MockRaftActorContext.MockPayload("foo-4")));
 
-        leaderActor.getRaftActorContext().getSnapshotManager().persist(snapshotBytes.toByteArray(),
+        leaderActor.getRaftActorContext().getSnapshotManager().persist(snapshotState, java.util.Optional.empty(),
                 Runtime.getRuntime().totalMemory());
 
         assertTrue(leaderActor.getRaftActorContext().getSnapshotManager().isCapturing());
@@ -684,7 +682,7 @@ public class RaftActorTest extends AbstractActorTest {
         followerActor.getRaftActorContext().getSnapshotManager().capture(
                 new SimpleReplicatedLogEntry(5, 1, new MockRaftActorContext.MockPayload("D")), 4);
 
-        verify(followerActor.snapshotCohortDelegate).createSnapshot(any(ActorRef.class));
+        verify(followerActor.snapshotCohortDelegate).createSnapshot(anyObject(), anyObject());
 
         assertEquals(6, followerActor.getReplicatedLog().size());
 
@@ -711,7 +709,8 @@ public class RaftActorTest extends AbstractActorTest {
                 new MockRaftActorContext.MockPayload("foo-2"),
                 new MockRaftActorContext.MockPayload("foo-3"),
                 new MockRaftActorContext.MockPayload("foo-4")));
-        followerActor.onReceiveCommand(new CaptureSnapshotReply(snapshotBytes.toByteArray()));
+        followerActor.onReceiveCommand(new CaptureSnapshotReply(ByteState.of(snapshotBytes.toByteArray()),
+                java.util.Optional.empty()));
         assertTrue(followerActor.getRaftActorContext().getSnapshotManager().isCapturing());
 
         // The commit is needed to complete the snapshot creation process
@@ -802,7 +801,8 @@ public class RaftActorTest extends AbstractActorTest {
                 new MockRaftActorContext.MockPayload("foo-2"),
                 new MockRaftActorContext.MockPayload("foo-3"),
                 new MockRaftActorContext.MockPayload("foo-4")));
-        leaderActor.onReceiveCommand(new CaptureSnapshotReply(snapshotBytes.toByteArray()));
+        leaderActor.onReceiveCommand(new CaptureSnapshotReply(ByteState.of(snapshotBytes.toByteArray()),
+                java.util.Optional.empty()));
         assertTrue(leaderActor.getRaftActorContext().getSnapshotManager().isCapturing());
 
         assertEquals("Real snapshot didn't clear the log till replicatedToAllIndex", 0,
@@ -848,7 +848,8 @@ public class RaftActorTest extends AbstractActorTest {
                 leaderActor.getReplicatedLog().last(), -1, "member1");
 
         // Now send a CaptureSnapshotReply
-        mockActorRef.tell(new CaptureSnapshotReply(fromObject("foo").toByteArray()), mockActorRef);
+        mockActorRef.tell(new CaptureSnapshotReply(ByteState.of(fromObject("foo").toByteArray()),
+                java.util.Optional.empty()), mockActorRef);
 
         // Trimming log in this scenario is a no-op
         assertEquals(-1, leaderActor.getReplicatedLog().getSnapshotIndex());
@@ -888,7 +889,8 @@ public class RaftActorTest extends AbstractActorTest {
                 new MockRaftActorContext.MockPayload("duh"), false);
 
         // Now send a CaptureSnapshotReply
-        mockActorRef.tell(new CaptureSnapshotReply(fromObject("foo").toByteArray()), mockActorRef);
+        mockActorRef.tell(new CaptureSnapshotReply(ByteState.of(fromObject("foo").toByteArray()),
+                java.util.Optional.empty()), mockActorRef);
 
         // Trimming log in this scenario is a no-op
         assertEquals(3, leaderActor.getReplicatedLog().getSnapshotIndex());
@@ -1038,10 +1040,12 @@ public class RaftActorTest extends AbstractActorTest {
         raftActorRef.tell(GetSnapshot.INSTANCE, kit.getRef());
 
         ArgumentCaptor<ActorRef> replyActor = ArgumentCaptor.forClass(ActorRef.class);
-        verify(mockRaftActor.snapshotCohortDelegate, timeout(5000)).createSnapshot(replyActor.capture());
+        verify(mockRaftActor.snapshotCohortDelegate, timeout(5000)).createSnapshot(replyActor.capture(),
+                eq(java.util.Optional.empty()));
 
         byte[] stateSnapshot = new byte[]{1,2,3};
-        replyActor.getValue().tell(new CaptureSnapshotReply(stateSnapshot), ActorRef.noSender());
+        replyActor.getValue().tell(new CaptureSnapshotReply(ByteState.of(stateSnapshot), java.util.Optional.empty()),
+                ActorRef.noSender());
 
         GetSnapshotReply reply = kit.expectMsgClass(GetSnapshotReply.class);
 
@@ -1053,7 +1057,7 @@ public class RaftActorTest extends AbstractActorTest {
         assertEquals("getLastAppliedTerm", term, replySnapshot.getLastAppliedTerm());
         assertEquals("getLastIndex", 2L, replySnapshot.getLastIndex());
         assertEquals("getLastTerm", term, replySnapshot.getLastTerm());
-        assertArrayEquals("getState", stateSnapshot, replySnapshot.getState());
+        assertEquals("getState", ByteState.of(stateSnapshot), replySnapshot.getState());
         assertEquals("getUnAppliedEntries size", 1, replySnapshot.getUnAppliedEntries().size());
         assertEquals("UnApplied entry index ", 2L, replySnapshot.getUnAppliedEntries().get(0).getIndex());
 
@@ -1076,7 +1080,7 @@ public class RaftActorTest extends AbstractActorTest {
 
         raftActorRef.tell(GetSnapshot.INSTANCE, kit.getRef());
         reply = kit.expectMsgClass(GetSnapshotReply.class);
-        verify(mockRaftActor.snapshotCohortDelegate, never()).createSnapshot(any(ActorRef.class));
+        verify(mockRaftActor.snapshotCohortDelegate, never()).createSnapshot(anyObject(), anyObject());
 
         assertEquals("getId", persistenceId, reply.getId());
         replySnapshot = SerializationUtils.deserialize(reply.getSnapshot());
@@ -1086,7 +1090,7 @@ public class RaftActorTest extends AbstractActorTest {
         assertEquals("getLastAppliedTerm", -1L, replySnapshot.getLastAppliedTerm());
         assertEquals("getLastIndex", -1L, replySnapshot.getLastIndex());
         assertEquals("getLastTerm", -1L, replySnapshot.getLastTerm());
-        assertEquals("getState length", 0, replySnapshot.getState().length);
+        assertEquals("getState type", EmptyState.INSTANCE, replySnapshot.getState());
         assertEquals("getUnAppliedEntries size", 0, replySnapshot.getUnAppliedEntries().size());
 
         TEST_LOG.info("testGetSnapshot ending");
@@ -1106,15 +1110,14 @@ public class RaftActorTest extends AbstractActorTest {
         int snapshotLastApplied = 3;
         int snapshotLastIndex = 4;
 
-        List<MockPayload> state = Arrays.asList(
+        MockSnapshotState snapshotState = new MockSnapshotState(Arrays.asList(
                 new MockRaftActorContext.MockPayload("A"),
                 new MockRaftActorContext.MockPayload("B"),
                 new MockRaftActorContext.MockPayload("C"),
-                new MockRaftActorContext.MockPayload("D"));
-        ByteString stateBytes = fromObject(state);
+                new MockRaftActorContext.MockPayload("D")));
 
-        Snapshot snapshot = Snapshot.create(stateBytes.toByteArray(), snapshotUnappliedEntries,
-                snapshotLastIndex, 1, snapshotLastApplied, 1, 1, "member-1");
+        Snapshot snapshot = Snapshot.create(snapshotState, snapshotUnappliedEntries,
+                snapshotLastIndex, 1, snapshotLastApplied, 1, 1, "member-1", null);
 
         InMemorySnapshotStore.addSnapshotSavedLatch(persistenceId);
 
@@ -1132,24 +1135,24 @@ public class RaftActorTest extends AbstractActorTest {
         assertEquals("getLastAppliedTerm", snapshot.getLastAppliedTerm(), savedSnapshot.getLastAppliedTerm());
         assertEquals("getLastIndex", snapshot.getLastIndex(), savedSnapshot.getLastIndex());
         assertEquals("getLastTerm", snapshot.getLastTerm(), savedSnapshot.getLastTerm());
-        assertArrayEquals("getState", snapshot.getState(), savedSnapshot.getState());
+        assertEquals("getState", snapshot.getState(), savedSnapshot.getState());
         assertEquals("getUnAppliedEntries", snapshot.getUnAppliedEntries(), savedSnapshot.getUnAppliedEntries());
 
-        verify(mockRaftActor.snapshotCohortDelegate, timeout(5000)).applySnapshot(any(byte[].class));
+        verify(mockRaftActor.snapshotCohortDelegate, timeout(5000)).applySnapshot(any(Snapshot.State.class));
 
         RaftActorContext context = mockRaftActor.getRaftActorContext();
         assertEquals("Journal log size", 1, context.getReplicatedLog().size());
         assertEquals("Last index", snapshotLastIndex, context.getReplicatedLog().lastIndex());
         assertEquals("Last applied", snapshotLastApplied, context.getLastApplied());
         assertEquals("Commit index", snapshotLastApplied, context.getCommitIndex());
-        assertEquals("Recovered state", state, mockRaftActor.getState());
+        assertEquals("Recovered state", snapshotState.getState(), mockRaftActor.getState());
         assertEquals("Current term", 1L, context.getTermInformation().getCurrentTerm());
         assertEquals("Voted for", "member-1", context.getTermInformation().getVotedFor());
 
         // Test with data persistence disabled
 
-        snapshot = Snapshot.create(new byte[0], Collections.<ReplicatedLogEntry>emptyList(),
-                -1, -1, -1, -1, 5, "member-1");
+        snapshot = Snapshot.create(EmptyState.INSTANCE, Collections.<ReplicatedLogEntry>emptyList(),
+                -1, -1, -1, -1, 5, "member-1", null);
 
         persistenceId = factory.generateActorId("test-actor-");
 
@@ -1179,8 +1182,8 @@ public class RaftActorTest extends AbstractActorTest {
         config.setCustomRaftPolicyImplementationClass(DisableElectionsRaftPolicy.class.getName());
 
         List<MockPayload> state = Arrays.asList(new MockRaftActorContext.MockPayload("A"));
-        Snapshot snapshot = Snapshot.create(fromObject(state).toByteArray(), Arrays.<ReplicatedLogEntry>asList(),
-                5, 2, 5, 2, 2, "member-1");
+        Snapshot snapshot = Snapshot.create(ByteState.of(fromObject(state).toByteArray()),
+                Arrays.<ReplicatedLogEntry>asList(), 5, 2, 5, 2, 2, "member-1", null);
 
         InMemoryJournal.addEntry(persistenceId, 1, new SimpleReplicatedLogEntry(0, 1,
                 new MockRaftActorContext.MockPayload("B")));
@@ -1193,7 +1196,7 @@ public class RaftActorTest extends AbstractActorTest {
         mockRaftActor.waitForRecoveryComplete();
 
         Uninterruptibles.sleepUninterruptibly(500, TimeUnit.MILLISECONDS);
-        verify(mockRaftActor.snapshotCohortDelegate, never()).applySnapshot(any(byte[].class));
+        verify(mockRaftActor.snapshotCohortDelegate, never()).applySnapshot(any(Snapshot.State.class));
 
         RaftActorContext context = mockRaftActor.getRaftActorContext();
         assertEquals("Journal log size", 1, context.getReplicatedLog().size());
index 657905e..2a3a6c1 100644 (file)
@@ -18,6 +18,7 @@ import java.util.List;
 import org.junit.Before;
 import org.junit.Test;
 import org.opendaylight.controller.cluster.raft.persisted.ApplyJournalEntries;
+import org.opendaylight.controller.cluster.raft.persisted.Snapshot;
 import org.opendaylight.controller.cluster.raft.utils.InMemoryJournal;
 import org.opendaylight.controller.cluster.raft.utils.InMemorySnapshotStore;
 import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor;
@@ -82,8 +83,7 @@ public class RecoveryIntegrationSingleNodeTest extends AbstractRaftActorIntegrat
         List<Snapshot> persistedSnapshots = InMemorySnapshotStore.getSnapshots(persistenceId, Snapshot.class);
         assertEquals(1, persistedSnapshots.size());
 
-        @SuppressWarnings("unchecked")
-        List<Object> snapshottedState = (List<Object>)MockRaftActor.toObject(persistedSnapshots.get(0).getState());
+        List<Object> snapshottedState = MockRaftActor.fromState(persistedSnapshots.get(0).getState());
         assertEquals("Incorrect Snapshot", Lists.newArrayList(payload0, payload1, payload2, payload3),
                 snapshottedState);
 
index 0254c6d..e53dfe6 100644 (file)
@@ -20,6 +20,7 @@ import org.opendaylight.controller.cluster.raft.messages.AppendEntries;
 import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply;
 import org.opendaylight.controller.cluster.raft.persisted.ApplyJournalEntries;
 import org.opendaylight.controller.cluster.raft.persisted.SimpleReplicatedLogEntry;
+import org.opendaylight.controller.cluster.raft.persisted.Snapshot;
 import org.opendaylight.controller.cluster.raft.persisted.UpdateElectionTerm;
 import org.opendaylight.controller.cluster.raft.utils.InMemoryJournal;
 import org.opendaylight.controller.cluster.raft.utils.InMemorySnapshotStore;
index 8a91549..0dbcb1f 100644 (file)
@@ -21,6 +21,7 @@ import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.TimeUnit;
 import javax.annotation.Nullable;
+import org.apache.commons.lang3.SerializationUtils;
 import org.junit.Assert;
 import org.junit.Test;
 import org.opendaylight.controller.cluster.raft.MockRaftActorContext.MockPayload;
@@ -37,6 +38,7 @@ import org.opendaylight.controller.cluster.raft.persisted.ApplyJournalEntries;
 import org.opendaylight.controller.cluster.raft.persisted.ServerConfigurationPayload;
 import org.opendaylight.controller.cluster.raft.persisted.ServerInfo;
 import org.opendaylight.controller.cluster.raft.persisted.SimpleReplicatedLogEntry;
+import org.opendaylight.controller.cluster.raft.persisted.Snapshot;
 import org.opendaylight.controller.cluster.raft.persisted.UpdateElectionTerm;
 import org.opendaylight.controller.cluster.raft.utils.InMemoryJournal;
 import org.opendaylight.controller.cluster.raft.utils.InMemorySnapshotStore;
@@ -553,7 +555,7 @@ public class ReplicationAndSnapshotsWithLaggingFollowerIntegrationTest extends A
         List<ReplicatedLogEntry> unAppliedEntry = persistedSnapshot.getUnAppliedEntries();
         assertEquals("Persisted Snapshot getUnAppliedEntries size", 0, unAppliedEntry.size());
 
-        int snapshotSize = persistedSnapshot.getState().length;
+        int snapshotSize = SerializationUtils.serialize(persistedSnapshot.getState()).length;
         final int expTotalChunks = snapshotSize / SNAPSHOT_CHUNK_SIZE
                 + (snapshotSize % SNAPSHOT_CHUNK_SIZE > 0 ? 1 : 0);
 
index fab250b..7591c2f 100644 (file)
@@ -14,6 +14,8 @@ import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
 import static org.mockito.Matchers.any;
 import static org.mockito.Matchers.anyLong;
+import static org.mockito.Matchers.anyObject;
+import static org.mockito.Mockito.doNothing;
 import static org.mockito.Mockito.doReturn;
 import static org.mockito.Mockito.doThrow;
 import static org.mockito.Mockito.mock;
@@ -25,7 +27,10 @@ import static org.mockito.Mockito.verify;
 import akka.actor.ActorRef;
 import akka.persistence.SnapshotSelectionCriteria;
 import akka.testkit.TestActorRef;
+import java.io.OutputStream;
 import java.util.Arrays;
+import java.util.Optional;
+import java.util.function.Consumer;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
@@ -38,7 +43,9 @@ import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshot;
 import org.opendaylight.controller.cluster.raft.base.messages.SendInstallSnapshot;
 import org.opendaylight.controller.cluster.raft.base.messages.SnapshotComplete;
 import org.opendaylight.controller.cluster.raft.behaviors.RaftActorBehavior;
+import org.opendaylight.controller.cluster.raft.persisted.ByteState;
 import org.opendaylight.controller.cluster.raft.persisted.SimpleReplicatedLogEntry;
+import org.opendaylight.controller.cluster.raft.persisted.Snapshot;
 import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor;
 import org.slf4j.LoggerFactory;
 
@@ -60,7 +67,7 @@ public class SnapshotManagerTest extends AbstractActorTest {
     private RaftActorBehavior mockRaftActorBehavior;
 
     @Mock
-    private Runnable mockProcedure;
+    private Consumer<Optional<OutputStream>> mockProcedure;
 
     @Mock
     private ElectionTerm mockElectionTerm;
@@ -95,7 +102,7 @@ public class SnapshotManagerTest extends AbstractActorTest {
         actorRef = factory.createTestActor(MessageCollectorActor.props(), factory.generateActorId("test-"));
         doReturn(actorRef).when(mockRaftActorContext).getActor();
 
-        snapshotManager.setCreateSnapshotRunnable(mockProcedure);
+        snapshotManager.setCreateSnapshotConsumer(mockProcedure);
     }
 
     @After
@@ -108,6 +115,7 @@ public class SnapshotManagerTest extends AbstractActorTest {
         assertEquals(false, snapshotManager.isCapturing());
     }
 
+    @SuppressWarnings({ "unchecked", "rawtypes" })
     @Test
     public void testCaptureToInstall() throws Exception {
 
@@ -117,7 +125,9 @@ public class SnapshotManagerTest extends AbstractActorTest {
 
         assertEquals(true, snapshotManager.isCapturing());
 
-        verify(mockProcedure).run();
+        ArgumentCaptor<Optional> outputStream = ArgumentCaptor.forClass(Optional.class);
+        verify(mockProcedure).accept(outputStream.capture());
+        assertEquals("isPresent", true, outputStream.getValue().isPresent());
 
         CaptureSnapshot captureSnapshot = snapshotManager.getCaptureSnapshot();
 
@@ -135,6 +145,7 @@ public class SnapshotManagerTest extends AbstractActorTest {
         actorRef.underlyingActor().clear();
     }
 
+    @SuppressWarnings({ "rawtypes", "unchecked" })
     @Test
     public void testCapture() throws Exception {
         boolean capture = snapshotManager.capture(new SimpleReplicatedLogEntry(9, 1,
@@ -144,7 +155,9 @@ public class SnapshotManagerTest extends AbstractActorTest {
 
         assertEquals(true, snapshotManager.isCapturing());
 
-        verify(mockProcedure).run();
+        ArgumentCaptor<Optional> outputStream = ArgumentCaptor.forClass(Optional.class);
+        verify(mockProcedure).accept(outputStream.capture());
+        assertEquals("isPresent", false, outputStream.getValue().isPresent());
 
         CaptureSnapshot captureSnapshot = snapshotManager.getCaptureSnapshot();
 
@@ -164,6 +177,7 @@ public class SnapshotManagerTest extends AbstractActorTest {
 
     }
 
+    @SuppressWarnings({ "unchecked", "rawtypes" })
     @Test
     public void testCaptureWithNullLastLogEntry() throws Exception {
         boolean capture = snapshotManager.capture(null, 1);
@@ -172,7 +186,9 @@ public class SnapshotManagerTest extends AbstractActorTest {
 
         assertEquals(true, snapshotManager.isCapturing());
 
-        verify(mockProcedure).run();
+        ArgumentCaptor<Optional> outputStream = ArgumentCaptor.forClass(Optional.class);
+        verify(mockProcedure).accept(outputStream.capture());
+        assertEquals("isPresent", false, outputStream.getValue().isPresent());
 
         CaptureSnapshot captureSnapshot = snapshotManager.getCaptureSnapshot();
 
@@ -193,7 +209,7 @@ public class SnapshotManagerTest extends AbstractActorTest {
 
     @Test
     public void testCaptureWithCreateProcedureError() throws Exception {
-        doThrow(new RuntimeException("mock")).when(mockProcedure).run();
+        doThrow(new RuntimeException("mock")).when(mockProcedure).accept(anyObject());
 
         boolean capture = snapshotManager.capture(new SimpleReplicatedLogEntry(9, 1,
                 new MockRaftActorContext.MockPayload()), 9);
@@ -202,9 +218,10 @@ public class SnapshotManagerTest extends AbstractActorTest {
 
         assertEquals(false, snapshotManager.isCapturing());
 
-        verify(mockProcedure).run();
+        verify(mockProcedure).accept(anyObject());
     }
 
+    @SuppressWarnings("unchecked")
     @Test
     public void testIllegalCapture() throws Exception {
         boolean capture = snapshotManager.capture(new SimpleReplicatedLogEntry(9, 1,
@@ -212,7 +229,7 @@ public class SnapshotManagerTest extends AbstractActorTest {
 
         assertTrue(capture);
 
-        verify(mockProcedure).run();
+        verify(mockProcedure).accept(anyObject());
 
         reset(mockProcedure);
 
@@ -222,11 +239,11 @@ public class SnapshotManagerTest extends AbstractActorTest {
 
         assertFalse(capture);
 
-        verify(mockProcedure, never()).run();
+        verify(mockProcedure, never()).accept(anyObject());
     }
 
     @Test
-    public void testPersistWhenReplicatedToAllIndexMinusOne() {
+    public void testPersistWhenReplicatedToAllIndexMinusOne() throws Exception {
         doReturn(7L).when(mockReplicatedLog).getSnapshotIndex();
         doReturn(1L).when(mockReplicatedLog).getSnapshotTerm();
 
@@ -245,8 +262,8 @@ public class SnapshotManagerTest extends AbstractActorTest {
         // when replicatedToAllIndex = -1
         snapshotManager.capture(lastLogEntry, -1);
 
-        byte[] bytes = new byte[] {1,2,3,4,5,6,7,8,9,10};
-        snapshotManager.persist(bytes, Runtime.getRuntime().totalMemory());
+        ByteState snapshotState = ByteState.of(new byte[] {1,2,3,4,5,6,7,8,9,10});
+        snapshotManager.persist(snapshotState, Optional.empty(), Runtime.getRuntime().totalMemory());
 
         ArgumentCaptor<Snapshot> snapshotArgumentCaptor = ArgumentCaptor.forClass(Snapshot.class);
         verify(mockDataPersistenceProvider).saveSnapshot(snapshotArgumentCaptor.capture());
@@ -257,7 +274,7 @@ public class SnapshotManagerTest extends AbstractActorTest {
         assertEquals("getLastIndex", 9L, snapshot.getLastIndex());
         assertEquals("getLastAppliedTerm", 2L, snapshot.getLastAppliedTerm());
         assertEquals("getLastAppliedIndex", 8L, snapshot.getLastAppliedIndex());
-        assertArrayEquals("getState", bytes, snapshot.getState());
+        assertEquals("getState", snapshotState, snapshot.getState());
         assertEquals("getUnAppliedEntries", Arrays.asList(lastLogEntry), snapshot.getUnAppliedEntries());
         assertEquals("electionTerm", mockElectionTerm.getCurrentTerm(), snapshot.getElectionTerm());
         assertEquals("electionVotedFor", mockElectionTerm.getVotedFor(), snapshot.getElectionVotedFor());
@@ -266,7 +283,7 @@ public class SnapshotManagerTest extends AbstractActorTest {
     }
 
     @Test
-    public void testPersistWhenReplicatedToAllIndexNotMinus() {
+    public void testPersistWhenReplicatedToAllIndexNotMinus() throws Exception {
         doReturn(45L).when(mockReplicatedLog).getSnapshotIndex();
         doReturn(6L).when(mockReplicatedLog).getSnapshotTerm();
         ReplicatedLogEntry replicatedLogEntry = mock(ReplicatedLogEntry.class);
@@ -277,8 +294,8 @@ public class SnapshotManagerTest extends AbstractActorTest {
         // when replicatedToAllIndex != -1
         snapshotManager.capture(new SimpleReplicatedLogEntry(9, 6, new MockRaftActorContext.MockPayload()), 9);
 
-        byte[] bytes = new byte[] {1,2,3,4,5,6,7,8,9,10};
-        snapshotManager.persist(bytes, Runtime.getRuntime().totalMemory());
+        ByteState snapshotState = ByteState.of(new byte[] {1,2,3,4,5,6,7,8,9,10});
+        snapshotManager.persist(snapshotState, Optional.empty(), Runtime.getRuntime().totalMemory());
 
         ArgumentCaptor<Snapshot> snapshotArgumentCaptor = ArgumentCaptor.forClass(Snapshot.class);
         verify(mockDataPersistenceProvider).saveSnapshot(snapshotArgumentCaptor.capture());
@@ -289,7 +306,7 @@ public class SnapshotManagerTest extends AbstractActorTest {
         assertEquals("getLastIndex", 9L, snapshot.getLastIndex());
         assertEquals("getLastAppliedTerm", 6L, snapshot.getLastAppliedTerm());
         assertEquals("getLastAppliedIndex", 9L, snapshot.getLastAppliedIndex());
-        assertArrayEquals("getState", bytes, snapshot.getState());
+        assertEquals("getState", snapshotState, snapshot.getState());
         assertEquals("getUnAppliedEntries size", 0, snapshot.getUnAppliedEntries().size());
 
         verify(mockReplicatedLog).snapshotPreCommit(9L, 6L);
@@ -304,7 +321,7 @@ public class SnapshotManagerTest extends AbstractActorTest {
         // when replicatedToAllIndex = -1
         snapshotManager.capture(new SimpleReplicatedLogEntry(9, 6, new MockRaftActorContext.MockPayload()), -1);
 
-        snapshotManager.persist(new byte[]{}, Runtime.getRuntime().totalMemory());
+        snapshotManager.persist(ByteState.empty(), Optional.empty(), Runtime.getRuntime().totalMemory());
 
         verify(mockDataPersistenceProvider).saveSnapshot(any(Snapshot.class));
 
@@ -330,7 +347,7 @@ public class SnapshotManagerTest extends AbstractActorTest {
         snapshotManager.capture(new SimpleReplicatedLogEntry(9, 6,
                 new MockRaftActorContext.MockPayload()), replicatedToAllIndex);
 
-        snapshotManager.persist(new byte[]{}, 2000000L);
+        snapshotManager.persist(ByteState.empty(), Optional.empty(), 2000000L);
 
         verify(mockDataPersistenceProvider).saveSnapshot(any(Snapshot.class));
 
@@ -339,9 +356,11 @@ public class SnapshotManagerTest extends AbstractActorTest {
         verify(mockRaftActorBehavior).setReplicatedToAllIndex(replicatedToAllIndex);
     }
 
+    @SuppressWarnings({ "rawtypes", "unchecked" })
     @Test
-    public void testPersistSendInstallSnapshot() {
+    public void testPersistSendInstallSnapshot() throws Exception {
         doReturn(Integer.MAX_VALUE).when(mockReplicatedLog).dataSize();
+        doNothing().when(mockProcedure).accept(anyObject());
 
         // when replicatedToAllIndex = -1
         boolean capture = snapshotManager.captureToInstall(new SimpleReplicatedLogEntry(9, 6,
@@ -349,9 +368,17 @@ public class SnapshotManagerTest extends AbstractActorTest {
 
         assertTrue(capture);
 
-        byte[] bytes = new byte[] {1,2,3,4,5,6,7,8,9,10};
+        ByteState snapshotState = ByteState.of(new byte[] {1,2,3,4,5,6,7,8,9,10});
 
-        snapshotManager.persist(bytes, Runtime.getRuntime().totalMemory());
+        ArgumentCaptor<Optional> installSnapshotStreamCapture = ArgumentCaptor.forClass(Optional.class);
+        verify(mockProcedure).accept(installSnapshotStreamCapture.capture());
+
+        Optional<OutputStream> installSnapshotStream = installSnapshotStreamCapture.getValue();
+        assertEquals("isPresent", true, installSnapshotStream.isPresent());
+
+        installSnapshotStream.get().write(snapshotState.getBytes());
+
+        snapshotManager.persist(snapshotState, installSnapshotStream, Runtime.getRuntime().totalMemory());
 
         assertEquals(true, snapshotManager.isCapturing());
 
@@ -366,12 +393,13 @@ public class SnapshotManagerTest extends AbstractActorTest {
 
         SendInstallSnapshot sendInstallSnapshot = sendInstallSnapshotArgumentCaptor.getValue();
 
-        assertTrue(Arrays.equals(bytes, sendInstallSnapshot.getSnapshot().getState()));
+        assertEquals("state", snapshotState, sendInstallSnapshot.getSnapshot().getState());
+        assertArrayEquals("state", snapshotState.getBytes(), sendInstallSnapshot.getSnapshotBytes().read());
     }
 
     @Test
     public void testCallingPersistWithoutCaptureWillDoNothing() {
-        snapshotManager.persist(new byte[]{}, Runtime.getRuntime().totalMemory());
+        snapshotManager.persist(ByteState.empty(), Optional.empty(), Runtime.getRuntime().totalMemory());
 
         verify(mockDataPersistenceProvider, never()).saveSnapshot(any(Snapshot.class));
 
@@ -385,18 +413,15 @@ public class SnapshotManagerTest extends AbstractActorTest {
         doReturn(Integer.MAX_VALUE).when(mockReplicatedLog).dataSize();
 
         // when replicatedToAllIndex = -1
-        snapshotManager.captureToInstall(new SimpleReplicatedLogEntry(9, 6,
-                new MockRaftActorContext.MockPayload()), -1, "follower-1");
+        snapshotManager.capture(new SimpleReplicatedLogEntry(9, 6, new MockRaftActorContext.MockPayload()), -1);
 
-        snapshotManager.persist(new byte[]{}, Runtime.getRuntime().totalMemory());
+        snapshotManager.persist(ByteState.empty(), Optional.empty(), Runtime.getRuntime().totalMemory());
 
-        snapshotManager.persist(new byte[]{}, Runtime.getRuntime().totalMemory());
+        snapshotManager.persist(ByteState.empty(), Optional.empty(), Runtime.getRuntime().totalMemory());
 
         verify(mockDataPersistenceProvider).saveSnapshot(any(Snapshot.class));
 
         verify(mockReplicatedLog).snapshotPreCommit(9L, 6L);
-
-        verify(mockRaftActorBehavior).handleMessage(any(ActorRef.class), any(SendInstallSnapshot.class));
     }
 
     @Test
@@ -404,10 +429,9 @@ public class SnapshotManagerTest extends AbstractActorTest {
         doReturn(50L).when(mockDataPersistenceProvider).getLastSequenceNumber();
 
         // when replicatedToAllIndex = -1
-        snapshotManager.captureToInstall(new SimpleReplicatedLogEntry(9, 6,
-                new MockRaftActorContext.MockPayload()), -1, "follower-1");
+        snapshotManager.capture(new SimpleReplicatedLogEntry(9, 6, new MockRaftActorContext.MockPayload()), -1);
 
-        snapshotManager.persist(new byte[]{}, Runtime.getRuntime().totalMemory());
+        snapshotManager.persist(ByteState.empty(), Optional.empty(), Runtime.getRuntime().totalMemory());
 
         assertEquals(true, snapshotManager.isCapturing());
 
@@ -433,8 +457,7 @@ public class SnapshotManagerTest extends AbstractActorTest {
     @Test
     public void testCommitBeforePersist() {
         // when replicatedToAllIndex = -1
-        snapshotManager.captureToInstall(new SimpleReplicatedLogEntry(9, 6,
-                new MockRaftActorContext.MockPayload()), -1, "follower-1");
+        snapshotManager.capture(new SimpleReplicatedLogEntry(9, 6, new MockRaftActorContext.MockPayload()), -1);
 
         snapshotManager.commit(100L, 0);
 
@@ -463,10 +486,9 @@ public class SnapshotManagerTest extends AbstractActorTest {
         doReturn(50L).when(mockDataPersistenceProvider).getLastSequenceNumber();
 
         // when replicatedToAllIndex = -1
-        snapshotManager.captureToInstall(new SimpleReplicatedLogEntry(9, 6,
-                new MockRaftActorContext.MockPayload()), -1, "follower-1");
+        snapshotManager.capture(new SimpleReplicatedLogEntry(9, 6, new MockRaftActorContext.MockPayload()), -1);
 
-        snapshotManager.persist(new byte[]{}, Runtime.getRuntime().totalMemory());
+        snapshotManager.persist(ByteState.empty(), Optional.empty(), Runtime.getRuntime().totalMemory());
 
         snapshotManager.commit(100L, 0);
 
@@ -482,10 +504,9 @@ public class SnapshotManagerTest extends AbstractActorTest {
     @Test
     public void testRollback() {
         // when replicatedToAllIndex = -1
-        snapshotManager.captureToInstall(new SimpleReplicatedLogEntry(9, 6,
-                new MockRaftActorContext.MockPayload()), -1, "follower-1");
+        snapshotManager.capture(new SimpleReplicatedLogEntry(9, 6, new MockRaftActorContext.MockPayload()), -1);
 
-        snapshotManager.persist(new byte[]{}, Runtime.getRuntime().totalMemory());
+        snapshotManager.persist(ByteState.empty(), Optional.empty(), Runtime.getRuntime().totalMemory());
 
         snapshotManager.rollback();
 
@@ -498,8 +519,7 @@ public class SnapshotManagerTest extends AbstractActorTest {
     @Test
     public void testRollbackBeforePersist() {
         // when replicatedToAllIndex = -1
-        snapshotManager.captureToInstall(new SimpleReplicatedLogEntry(9, 6,
-                new MockRaftActorContext.MockPayload()), -1, "follower-1");
+        snapshotManager.capture(new SimpleReplicatedLogEntry(9, 6, new MockRaftActorContext.MockPayload()), -1);
 
         snapshotManager.rollback();
 
@@ -516,10 +536,9 @@ public class SnapshotManagerTest extends AbstractActorTest {
     @Test
     public void testCallingRollbackMultipleTimesCausesNoHarm() {
         // when replicatedToAllIndex = -1
-        snapshotManager.captureToInstall(new SimpleReplicatedLogEntry(9, 6,
-                new MockRaftActorContext.MockPayload()), -1, "follower-1");
+        snapshotManager.capture(new SimpleReplicatedLogEntry(9, 6, new MockRaftActorContext.MockPayload()), -1);
 
-        snapshotManager.persist(new byte[]{}, Runtime.getRuntime().totalMemory());
+        snapshotManager.persist(ByteState.empty(), Optional.empty(), Runtime.getRuntime().totalMemory());
 
         snapshotManager.rollback();
 
@@ -622,8 +641,8 @@ public class SnapshotManagerTest extends AbstractActorTest {
 
     @Test
     public void testTrimLogAfterCaptureToInstall() {
-        boolean capture = snapshotManager.captureToInstall(new SimpleReplicatedLogEntry(9, 1,
-                new MockRaftActorContext.MockPayload()), 9, "follower-1");
+        boolean capture = snapshotManager.capture(new SimpleReplicatedLogEntry(9, 1,
+                new MockRaftActorContext.MockPayload()), 9);
 
         assertTrue(capture);
 
index 8cb914c..487a4fa 100644 (file)
@@ -29,8 +29,10 @@ import com.google.common.base.Stopwatch;
 import com.google.common.base.Throwables;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableMap;
+import com.google.common.io.ByteSource;
 import com.google.common.util.concurrent.Uninterruptibles;
 import com.google.protobuf.ByteString;
+import java.io.OutputStream;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
@@ -44,12 +46,11 @@ import org.junit.Test;
 import org.opendaylight.controller.cluster.raft.DefaultConfigParamsImpl;
 import org.opendaylight.controller.cluster.raft.MockRaftActor;
 import org.opendaylight.controller.cluster.raft.MockRaftActor.Builder;
+import org.opendaylight.controller.cluster.raft.MockRaftActor.MockSnapshotState;
 import org.opendaylight.controller.cluster.raft.MockRaftActorContext;
 import org.opendaylight.controller.cluster.raft.RaftActorContext;
 import org.opendaylight.controller.cluster.raft.RaftActorSnapshotCohort;
-import org.opendaylight.controller.cluster.raft.RaftActorTest;
 import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
-import org.opendaylight.controller.cluster.raft.Snapshot;
 import org.opendaylight.controller.cluster.raft.base.messages.ApplySnapshot;
 import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshotReply;
 import org.opendaylight.controller.cluster.raft.base.messages.ElectionTimeout;
@@ -63,9 +64,12 @@ import org.opendaylight.controller.cluster.raft.messages.RaftRPC;
 import org.opendaylight.controller.cluster.raft.messages.RequestVote;
 import org.opendaylight.controller.cluster.raft.messages.RequestVoteReply;
 import org.opendaylight.controller.cluster.raft.persisted.ApplyJournalEntries;
+import org.opendaylight.controller.cluster.raft.persisted.ByteState;
 import org.opendaylight.controller.cluster.raft.persisted.ServerConfigurationPayload;
 import org.opendaylight.controller.cluster.raft.persisted.ServerInfo;
 import org.opendaylight.controller.cluster.raft.persisted.SimpleReplicatedLogEntry;
+import org.opendaylight.controller.cluster.raft.persisted.Snapshot;
+import org.opendaylight.controller.cluster.raft.persisted.Snapshot.State;
 import org.opendaylight.controller.cluster.raft.persisted.UpdateElectionTerm;
 import org.opendaylight.controller.cluster.raft.policy.DisableElectionsRaftPolicy;
 import org.opendaylight.controller.cluster.raft.utils.InMemoryJournal;
@@ -809,7 +813,7 @@ public class FollowerTest extends AbstractRaftActorBehaviorTest<Follower> {
 
         follower = createBehavior(context);
 
-        ByteString bsSnapshot  = createSnapshot();
+        ByteString bsSnapshot = createSnapshot();
         int offset = 0;
         int snapshotLength = bsSnapshot.size();
         int chunkSize = 50;
@@ -838,7 +842,8 @@ public class FollowerTest extends AbstractRaftActorBehaviorTest<Follower> {
         assertEquals("getLastAppliedIndex", lastInstallSnapshot.getLastIncludedIndex(),
                 snapshot.getLastAppliedIndex());
         assertEquals("getLastTerm", lastInstallSnapshot.getLastIncludedTerm(), snapshot.getLastTerm());
-        Assert.assertArrayEquals("getState", bsSnapshot.toByteArray(), snapshot.getState());
+        assertEquals("getState type", ByteState.class, snapshot.getState().getClass());
+        Assert.assertArrayEquals("getState", bsSnapshot.toByteArray(), ((ByteState)snapshot.getState()).getBytes());
         assertEquals("getElectionTerm", 1, snapshot.getElectionTerm());
         assertEquals("getElectionVotedFor", "leader", snapshot.getElectionVotedFor());
         applySnapshot.getCallback().onSuccess();
@@ -1154,7 +1159,7 @@ public class FollowerTest extends AbstractRaftActorBehaviorTest<Follower> {
         assertEquals("Snapshot getLastTerm", 1, snapshot.getLastTerm());
         assertEquals("Snapshot getLastIndex", 1, snapshot.getLastIndex());
         assertEquals("Snapshot state", ImmutableList.of(entries.get(0).getData(), entries.get(1).getData()),
-                MockRaftActor.toObject(snapshot.getState()));
+                MockRaftActor.fromState(snapshot.getState()));
     }
 
     @Test
@@ -1208,7 +1213,7 @@ public class FollowerTest extends AbstractRaftActorBehaviorTest<Follower> {
         assertEquals("Snapshot getLastTerm", 1, snapshot.getLastTerm());
         assertEquals("Snapshot getLastIndex", 2, snapshot.getLastIndex());
         assertEquals("Snapshot state", ImmutableList.of(entries.get(0).getData(), entries.get(1).getData(),
-                entries.get(2).getData()), MockRaftActor.toObject(snapshot.getState()));
+                entries.get(2).getData()), MockRaftActor.fromState(snapshot.getState()));
 
         assertEquals("Journal size", 0, followerRaftActor.get().getReplicatedLog().size());
         assertEquals("Snapshot index", 2, followerRaftActor.get().getReplicatedLog().getSnapshotIndex());
@@ -1283,24 +1288,29 @@ public class FollowerTest extends AbstractRaftActorBehaviorTest<Follower> {
         assertEquals("Snapshot getLastTerm", 1, snapshot.getLastTerm());
         assertEquals("Snapshot getLastIndex", 2, snapshot.getLastIndex());
         assertEquals("Snapshot state", ImmutableList.of(entries.get(0).getData()),
-                MockRaftActor.toObject(snapshot.getState()));
+                MockRaftActor.fromState(snapshot.getState()));
     }
 
     @SuppressWarnings("checkstyle:IllegalCatch")
     private RaftActorSnapshotCohort newRaftActorSnapshotCohort(final AtomicReference<MockRaftActor> followerRaftActor) {
         RaftActorSnapshotCohort snapshotCohort = new RaftActorSnapshotCohort() {
             @Override
-            public void createSnapshot(ActorRef actorRef) {
+            public void createSnapshot(ActorRef actorRef, java.util.Optional<OutputStream> installSnapshotStream) {
                 try {
-                    actorRef.tell(new CaptureSnapshotReply(RaftActorTest.fromObject(
-                            followerRaftActor.get().getState()).toByteArray()), actorRef);
+                    actorRef.tell(new CaptureSnapshotReply(new MockSnapshotState(followerRaftActor.get().getState()),
+                            installSnapshotStream), actorRef);
                 } catch (Exception e) {
                     Throwables.propagate(e);
                 }
             }
 
             @Override
-            public void applySnapshot(byte[] snapshotBytes) {
+            public void applySnapshot(State snapshotState) {
+            }
+
+            @Override
+            public State deserializeSnapshot(ByteSource snapshotBytes) {
+                throw new UnsupportedOperationException();
             }
         };
         return snapshotCohort;
index fbfdea0..99b647b 100644 (file)
@@ -25,6 +25,7 @@ import akka.actor.Terminated;
 import akka.testkit.JavaTestKit;
 import akka.testkit.TestActorRef;
 import com.google.common.collect.ImmutableMap;
+import com.google.common.io.ByteSource;
 import com.google.common.util.concurrent.Uninterruptibles;
 import com.google.protobuf.ByteString;
 import java.util.Arrays;
@@ -43,7 +44,6 @@ import org.opendaylight.controller.cluster.raft.RaftActorLeadershipTransferCohor
 import org.opendaylight.controller.cluster.raft.RaftState;
 import org.opendaylight.controller.cluster.raft.RaftVersions;
 import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
-import org.opendaylight.controller.cluster.raft.Snapshot;
 import org.opendaylight.controller.cluster.raft.VotingState;
 import org.opendaylight.controller.cluster.raft.base.messages.ApplyState;
 import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshot;
@@ -52,6 +52,7 @@ import org.opendaylight.controller.cluster.raft.base.messages.Replicate;
 import org.opendaylight.controller.cluster.raft.base.messages.SendHeartBeat;
 import org.opendaylight.controller.cluster.raft.base.messages.SendInstallSnapshot;
 import org.opendaylight.controller.cluster.raft.base.messages.TimeoutNow;
+import org.opendaylight.controller.cluster.raft.behaviors.AbstractLeader.SnapshotHolder;
 import org.opendaylight.controller.cluster.raft.messages.AppendEntries;
 import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply;
 import org.opendaylight.controller.cluster.raft.messages.InstallSnapshot;
@@ -59,7 +60,9 @@ import org.opendaylight.controller.cluster.raft.messages.InstallSnapshotReply;
 import org.opendaylight.controller.cluster.raft.messages.RaftRPC;
 import org.opendaylight.controller.cluster.raft.messages.RequestVoteReply;
 import org.opendaylight.controller.cluster.raft.persisted.ApplyJournalEntries;
+import org.opendaylight.controller.cluster.raft.persisted.ByteState;
 import org.opendaylight.controller.cluster.raft.persisted.SimpleReplicatedLogEntry;
+import org.opendaylight.controller.cluster.raft.persisted.Snapshot;
 import org.opendaylight.controller.cluster.raft.policy.DefaultRaftPolicy;
 import org.opendaylight.controller.cluster.raft.policy.RaftPolicy;
 import org.opendaylight.controller.cluster.raft.utils.ForwardMessageToBehaviorActor;
@@ -577,8 +580,9 @@ public class LeaderTest extends AbstractLeaderTest<Leader> {
         leader.markFollowerActive(FOLLOWER_ID);
 
         ByteString bs = toByteString(leadersSnapshot);
-        leader.setSnapshot(Snapshot.create(bs.toByteArray(), Collections.<ReplicatedLogEntry>emptyList(),
-                commitIndex, snapshotTerm, commitIndex, snapshotTerm));
+        leader.setSnapshot(new SnapshotHolder(Snapshot.create(ByteState.of(bs.toByteArray()),
+                Collections.<ReplicatedLogEntry>emptyList(), commitIndex, snapshotTerm, commitIndex, snapshotTerm,
+                -1, null, null), ByteSource.wrap(bs.toByteArray())));
         LeaderInstallSnapshotState fts = new LeaderInstallSnapshotState(
                 actorContext.getConfigParams().getSnapshotChunkSize(), leader.logName());
         fts.setSnapshotBytes(bs);
@@ -700,7 +704,6 @@ public class LeaderTest extends AbstractLeaderTest<Leader> {
 
         CaptureSnapshot cs = actorContext.getSnapshotManager().getCaptureSnapshot();
 
-        assertTrue(cs.isInstallSnapshotInitiated());
         assertEquals(3, cs.getLastAppliedIndex());
         assertEquals(1, cs.getLastAppliedTerm());
         assertEquals(4, cs.getLastIndex());
@@ -763,7 +766,6 @@ public class LeaderTest extends AbstractLeaderTest<Leader> {
 
         CaptureSnapshot cs = actorContext.getSnapshotManager().getCaptureSnapshot();
 
-        assertTrue(cs.isInstallSnapshotInitiated());
         assertEquals(3, cs.getLastAppliedIndex());
         assertEquals(1, cs.getLastAppliedTerm());
         assertEquals(4, cs.getLastIndex());
@@ -810,11 +812,12 @@ public class LeaderTest extends AbstractLeaderTest<Leader> {
         leader.getFollower(FOLLOWER_ID).setMatchIndex(-1);
         leader.getFollower(FOLLOWER_ID).setNextIndex(0);
 
-        Snapshot snapshot = Snapshot.create(toByteString(leadersSnapshot).toByteArray(),
-                Collections.<ReplicatedLogEntry>emptyList(),
-                lastAppliedIndex, snapshotTerm, lastAppliedIndex, snapshotTerm);
+        byte[] bytes = toByteString(leadersSnapshot).toByteArray();
+        Snapshot snapshot = Snapshot.create(ByteState.of(bytes), Collections.<ReplicatedLogEntry>emptyList(),
+                lastAppliedIndex, snapshotTerm, lastAppliedIndex, snapshotTerm, -1, null, null);
 
-        RaftActorBehavior raftBehavior = leader.handleMessage(leaderActor, new SendInstallSnapshot(snapshot));
+        RaftActorBehavior raftBehavior = leader.handleMessage(leaderActor,
+                new SendInstallSnapshot(snapshot, ByteSource.wrap(bytes)));
 
         assertTrue(raftBehavior instanceof Leader);
 
@@ -861,11 +864,12 @@ public class LeaderTest extends AbstractLeaderTest<Leader> {
         leader.getFollower(FOLLOWER_ID).setMatchIndex(-1);
         leader.getFollower(FOLLOWER_ID).setNextIndex(-1);
 
-        Snapshot snapshot = Snapshot.create(toByteString(leadersSnapshot).toByteArray(),
-                Collections.<ReplicatedLogEntry>emptyList(),
-                lastAppliedIndex, snapshotTerm, lastAppliedIndex, snapshotTerm);
+        byte[] bytes = toByteString(leadersSnapshot).toByteArray();
+        Snapshot snapshot = Snapshot.create(ByteState.of(bytes), Collections.<ReplicatedLogEntry>emptyList(),
+                lastAppliedIndex, snapshotTerm, lastAppliedIndex, snapshotTerm, -1, null, null);
 
-        RaftActorBehavior raftBehavior = leader.handleMessage(leaderActor, new SendInstallSnapshot(snapshot));
+        RaftActorBehavior raftBehavior = leader.handleMessage(leaderActor,
+                new SendInstallSnapshot(snapshot, ByteSource.wrap(bytes)));
 
         assertTrue(raftBehavior instanceof Leader);
 
@@ -915,8 +919,9 @@ public class LeaderTest extends AbstractLeaderTest<Leader> {
         actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString());
 
         ByteString bs = toByteString(leadersSnapshot);
-        leader.setSnapshot(Snapshot.create(bs.toByteArray(), Collections.<ReplicatedLogEntry>emptyList(),
-                commitIndex, snapshotTerm, commitIndex, snapshotTerm));
+        leader.setSnapshot(new SnapshotHolder(Snapshot.create(ByteState.of(bs.toByteArray()),
+                Collections.<ReplicatedLogEntry>emptyList(), commitIndex, snapshotTerm, commitIndex, snapshotTerm,
+                -1, null, null), ByteSource.wrap(bs.toByteArray())));
         LeaderInstallSnapshotState fts = new LeaderInstallSnapshotState(
                 actorContext.getConfigParams().getSnapshotChunkSize(), leader.logName());
         fts.setSnapshotBytes(bs);
@@ -983,11 +988,11 @@ public class LeaderTest extends AbstractLeaderTest<Leader> {
         actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString());
 
         ByteString bs = toByteString(leadersSnapshot);
-        Snapshot snapshot = Snapshot.create(bs.toByteArray(), Collections.<ReplicatedLogEntry>emptyList(),
-                commitIndex, snapshotTerm, commitIndex, snapshotTerm);
-        leader.setSnapshot(snapshot);
+        Snapshot snapshot = Snapshot.create(ByteState.of(bs.toByteArray()),
+                Collections.<ReplicatedLogEntry>emptyList(), commitIndex, snapshotTerm, commitIndex, snapshotTerm,
+                -1, null, null);
 
-        leader.handleMessage(leaderActor, new SendInstallSnapshot(snapshot));
+        leader.handleMessage(leaderActor, new SendInstallSnapshot(snapshot, ByteSource.wrap(bs.toByteArray())));
 
         InstallSnapshot installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor,
                 InstallSnapshot.class);
@@ -1057,12 +1062,12 @@ public class LeaderTest extends AbstractLeaderTest<Leader> {
         actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString());
 
         ByteString bs = toByteString(leadersSnapshot);
-        Snapshot snapshot = Snapshot.create(bs.toByteArray(), Collections.<ReplicatedLogEntry>emptyList(),
-                commitIndex, snapshotTerm, commitIndex, snapshotTerm);
-        leader.setSnapshot(snapshot);
+        Snapshot snapshot = Snapshot.create(ByteState.of(bs.toByteArray()),
+                Collections.<ReplicatedLogEntry>emptyList(), commitIndex, snapshotTerm, commitIndex, snapshotTerm,
+                -1, null, null);
 
         Uninterruptibles.sleepUninterruptibly(1, TimeUnit.SECONDS);
-        leader.handleMessage(leaderActor, new SendInstallSnapshot(snapshot));
+        leader.handleMessage(leaderActor, new SendInstallSnapshot(snapshot, ByteSource.wrap(bs.toByteArray())));
 
         InstallSnapshot installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor,
                 InstallSnapshot.class);
@@ -1122,11 +1127,11 @@ public class LeaderTest extends AbstractLeaderTest<Leader> {
         actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString());
 
         ByteString bs = toByteString(leadersSnapshot);
-        Snapshot snapshot = Snapshot.create(bs.toByteArray(), Collections.<ReplicatedLogEntry>emptyList(),
-                commitIndex, snapshotTerm, commitIndex, snapshotTerm);
-        leader.setSnapshot(snapshot);
+        Snapshot snapshot = Snapshot.create(ByteState.of(bs.toByteArray()),
+                Collections.<ReplicatedLogEntry>emptyList(), commitIndex, snapshotTerm, commitIndex, snapshotTerm,
+                -1, null, null);
 
-        leader.handleMessage(leaderActor, new SendInstallSnapshot(snapshot));
+        leader.handleMessage(leaderActor, new SendInstallSnapshot(snapshot, ByteSource.wrap(bs.toByteArray())));
 
         InstallSnapshot installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor,
                 InstallSnapshot.class);
diff --git a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/persisted/ByteState.java b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/persisted/ByteState.java
new file mode 100644 (file)
index 0000000..42411a9
--- /dev/null
@@ -0,0 +1,70 @@
+/*
+ * Copyright (c) 2017 Brocade Communications Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.raft.persisted;
+
+import com.google.common.base.Preconditions;
+import java.util.Arrays;
+import javax.annotation.Nonnull;
+
+/**
+ * Snapshot State implementation backed by a byte[].
+ *
+ * @author Thomas Pantelis
+ */
+public class ByteState implements Snapshot.State {
+    private static final long serialVersionUID = 1L;
+
+    private final byte[] bytes;
+
+    private ByteState(@Nonnull byte[] bytes) {
+        this.bytes = Preconditions.checkNotNull(bytes);
+    }
+
+    public static ByteState of(@Nonnull byte[] bytes) {
+        return new ByteState(bytes);
+    }
+
+    public static ByteState empty() {
+        return new ByteState(new byte[0]);
+    }
+
+    public byte[] getBytes() {
+        return bytes;
+    }
+
+    @Override
+    public int hashCode() {
+        final int prime = 31;
+        int result = 1;
+        result = prime * result + Arrays.hashCode(bytes);
+        return result;
+    }
+
+    @Override
+    public boolean equals(Object obj) {
+        if (this == obj) {
+            return true;
+        }
+        if (obj == null) {
+            return false;
+        }
+        if (getClass() != obj.getClass()) {
+            return false;
+        }
+        ByteState other = (ByteState) obj;
+        if (!Arrays.equals(bytes, other.bytes)) {
+            return false;
+        }
+        return true;
+    }
+
+    @Override
+    public String toString() {
+        return "ByteState [bytes=" + Arrays.toString(bytes) + "]";
+    }
+}
diff --git a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/persisted/EmptyStateTest.java b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/persisted/EmptyStateTest.java
new file mode 100644 (file)
index 0000000..963580c
--- /dev/null
@@ -0,0 +1,28 @@
+/*
+ * Copyright (c) 2017 Brocade Communications Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.raft.persisted;
+
+import static org.junit.Assert.assertSame;
+
+import org.apache.commons.lang.SerializationUtils;
+import org.junit.Test;
+
+/**
+ * Unit tests for EmptyState.
+ *
+ * @author Thomas Pantelis
+ *
+ */
+public class EmptyStateTest {
+
+    @Test
+    public void testSerialization() {
+        EmptyState cloned = (EmptyState) SerializationUtils.clone(EmptyState.INSTANCE);
+        assertSame("cloned", EmptyState.INSTANCE, cloned);
+    }
+}
diff --git a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/persisted/SnapshotTest.java b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/persisted/SnapshotTest.java
new file mode 100644 (file)
index 0000000..19f0ec1
--- /dev/null
@@ -0,0 +1,59 @@
+/*
+ * Copyright (c) 2017 Brocade Communications Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.raft.persisted;
+
+import static org.junit.Assert.assertEquals;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import org.apache.commons.lang.SerializationUtils;
+import org.junit.Test;
+import org.opendaylight.controller.cluster.raft.MockRaftActorContext.MockPayload;
+import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
+
+/**
+ * Unit tests for Snapshot.
+ *
+ * @author Thomas Pantelis
+ */
+public class SnapshotTest {
+
+    @Test
+    public void testSerialization() throws Exception {
+        testSerialization(new byte[]{1, 2, 3, 4, 5, 6, 7}, Arrays.asList(
+                new SimpleReplicatedLogEntry(6, 2, new MockPayload("payload"))));
+        testSerialization(new byte[]{1, 2, 3, 4, 5, 6, 7, 8, 9}, Collections.emptyList());
+    }
+
+    private void testSerialization(byte[] state, List<ReplicatedLogEntry> unapplied) throws Exception {
+        long lastIndex = 6;
+        long lastTerm = 2;
+        long lastAppliedIndex = 5;
+        long lastAppliedTerm = 1;
+        long electionTerm = 3;
+        String electionVotedFor = "member-1";
+        ServerConfigurationPayload serverConfig = new ServerConfigurationPayload(Arrays.asList(
+                new ServerInfo("1", true), new ServerInfo("2", false)));
+
+        Snapshot expected = Snapshot.create(ByteState.of(state), unapplied, lastIndex, lastTerm, lastAppliedIndex,
+                lastAppliedTerm, electionTerm, electionVotedFor, serverConfig);
+        Snapshot cloned = (Snapshot) SerializationUtils.clone(expected);
+
+        assertEquals("lastIndex", expected.getLastIndex(), cloned.getLastIndex());
+        assertEquals("lastTerm", expected.getLastTerm(), cloned.getLastTerm());
+        assertEquals("lastAppliedIndex", expected.getLastAppliedIndex(), cloned.getLastAppliedIndex());
+        assertEquals("lastAppliedTerm", expected.getLastAppliedTerm(), cloned.getLastAppliedTerm());
+        assertEquals("unAppliedEntries", expected.getUnAppliedEntries(), cloned.getUnAppliedEntries());
+        assertEquals("electionTerm", expected.getElectionTerm(), cloned.getElectionTerm());
+        assertEquals("electionVotedFor", expected.getElectionVotedFor(), cloned.getElectionVotedFor());
+        assertEquals("state", expected.getState(), cloned.getState());
+        assertEquals("serverConfig", expected.getServerConfiguration().getServerConfig(),
+                cloned.getServerConfiguration().getServerConfig());
+    }
+}
index 5e0d340..e87f083 100644 (file)
@@ -10,9 +10,13 @@ package org.opendaylight.controller.cluster.datastore;
 import com.google.common.base.Preconditions;
 import com.google.common.base.Throwables;
 import java.io.File;
+import java.io.IOException;
 import org.opendaylight.controller.cluster.datastore.persisted.ShardDataTreeSnapshot;
+import org.opendaylight.controller.cluster.datastore.persisted.ShardSnapshotState;
 import org.opendaylight.controller.cluster.datastore.utils.NormalizedNodeXMLOutput;
 import org.opendaylight.controller.cluster.raft.RaftActorRecoveryCohort;
+import org.opendaylight.controller.cluster.raft.persisted.Snapshot;
+import org.opendaylight.controller.cluster.raft.persisted.Snapshot.State;
 import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload;
 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
 import org.slf4j.Logger;
@@ -86,24 +90,21 @@ class ShardRecoveryCoordinator implements RaftActorRecoveryCohort {
      */
     @Override
     @SuppressWarnings("checkstyle:IllegalCatch")
-    public void applyRecoverySnapshot(final byte[] snapshotBytes) {
-        log.debug("{}: Applying recovered snapshot", shardName);
-
-        final ShardDataTreeSnapshot snapshot;
-        try {
-            snapshot = ShardDataTreeSnapshot.deserialize(snapshotBytes);
-        } catch (Exception e) {
-            log.error("{}: failed to deserialize snapshot", shardName, e);
-            throw Throwables.propagate(e);
+    public void applyRecoverySnapshot(final Snapshot.State snapshotState) {
+        if (!(snapshotState instanceof ShardSnapshotState)) {
+            log.debug("{}: applyRecoverySnapshot ignoring snapshot: {}", snapshotState);
         }
 
+        log.debug("{}: Applying recovered snapshot", shardName);
+
+        ShardDataTreeSnapshot shardSnapshot = ((ShardSnapshotState)snapshotState).getSnapshot();
         try {
-            store.applyRecoverySnapshot(snapshot);
+            store.applyRecoverySnapshot(shardSnapshot);
         } catch (Exception e) {
-            final File f = writeRoot("snapshot", snapshot.getRootNode().orElse(null));
+            final File f = writeRoot("snapshot", shardSnapshot.getRootNode().orElse(null));
             throw new IllegalStateException(String.format(
                     "%s: Failed to apply recovery snapshot %s. Node data was written to file %s",
-                    shardName, snapshot, f), e);
+                    shardName, shardSnapshot, f), e);
         }
     }
 
@@ -111,4 +112,15 @@ class ShardRecoveryCoordinator implements RaftActorRecoveryCohort {
     public byte[] getRestoreFromSnapshot() {
         return restoreFromSnapshot;
     }
+
+    @Override
+    @Deprecated
+    public State deserializePreCarbonSnapshot(byte[] from) {
+        try {
+            return new ShardSnapshotState(ShardDataTreeSnapshot.deserialize(from));
+        } catch (IOException e) {
+            log.error("{}: failed to deserialize snapshot", shardName, e);
+            throw Throwables.propagate(e);
+        }
+    }
 }
index 6dc3f03..0627c0a 100644 (file)
@@ -10,7 +10,11 @@ package org.opendaylight.controller.cluster.datastore;
 import akka.actor.ActorContext;
 import akka.actor.ActorRef;
 import com.google.common.base.Preconditions;
+import com.google.common.io.ByteSource;
 import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.Optional;
 import org.opendaylight.controller.cluster.access.concepts.ClientIdentifier;
 import org.opendaylight.controller.cluster.access.concepts.FrontendIdentifier;
 import org.opendaylight.controller.cluster.access.concepts.FrontendType;
@@ -18,7 +22,10 @@ import org.opendaylight.controller.cluster.access.concepts.LocalHistoryIdentifie
 import org.opendaylight.controller.cluster.access.concepts.MemberName;
 import org.opendaylight.controller.cluster.datastore.actors.ShardSnapshotActor;
 import org.opendaylight.controller.cluster.datastore.persisted.ShardDataTreeSnapshot;
+import org.opendaylight.controller.cluster.datastore.persisted.ShardSnapshotState;
 import org.opendaylight.controller.cluster.raft.RaftActorSnapshotCohort;
+import org.opendaylight.controller.cluster.raft.persisted.Snapshot;
+import org.opendaylight.controller.cluster.raft.persisted.Snapshot.State;
 import org.slf4j.Logger;
 
 /**
@@ -56,28 +63,26 @@ class ShardSnapshotCohort implements RaftActorSnapshotCohort {
     }
 
     @Override
-    public void createSnapshot(final ActorRef actorRef) {
+    public void createSnapshot(final ActorRef actorRef, final Optional<OutputStream> installSnapshotStream) {
         // Forward the request to the snapshot actor
-        ShardSnapshotActor.requestSnapshot(snapshotActor, store.takeStateSnapshot(), actorRef);
+        ShardSnapshotActor.requestSnapshot(snapshotActor, store.takeStateSnapshot(), installSnapshotStream, actorRef);
     }
 
     @Override
     @SuppressWarnings("checkstyle:IllegalCatch")
-    public void applySnapshot(final byte[] snapshotBytes) {
+    public void applySnapshot(final Snapshot.State snapshotState) {
+        if (!(snapshotState instanceof ShardSnapshotState)) {
+            log.debug("{}: applySnapshot ignoring snapshot: {}", snapshotState);
+        }
+
+        final ShardDataTreeSnapshot snapshot = ((ShardSnapshotState)snapshotState).getSnapshot();
+
         // Since this will be done only on Recovery or when this actor is a Follower
         // we can safely commit everything in here. We not need to worry about event notifications
         // as they would have already been disabled on the follower
 
         log.info("{}: Applying snapshot", logId);
 
-        final ShardDataTreeSnapshot snapshot;
-        try {
-            snapshot = ShardDataTreeSnapshot.deserialize(snapshotBytes);
-        } catch (IOException e) {
-            log.error("{}: Failed to deserialize snapshot", logId, e);
-            return;
-        }
-
         try {
             store.applySnapshot(snapshot);
         } catch (Exception e) {
@@ -87,4 +92,11 @@ class ShardSnapshotCohort implements RaftActorSnapshotCohort {
 
         log.info("{}: Done applying snapshot", logId);
     }
+
+    @Override
+    public State deserializeSnapshot(ByteSource snapshotBytes) throws IOException {
+        try (final InputStream is = snapshotBytes.openStream()) {
+            return new ShardSnapshotState(ShardDataTreeSnapshot.deserialize(is));
+        }
+    }
 }
index 00bb424..e7529fb 100644 (file)
@@ -10,9 +10,15 @@ package org.opendaylight.controller.cluster.datastore.actors;
 import akka.actor.ActorRef;
 import akka.actor.Props;
 import com.google.common.base.Preconditions;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.util.Optional;
 import org.opendaylight.controller.cluster.common.actor.AbstractUntypedActorWithMetering;
 import org.opendaylight.controller.cluster.datastore.persisted.ShardDataTreeSnapshot;
+import org.opendaylight.controller.cluster.datastore.persisted.ShardSnapshotState;
 import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshotReply;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
  * This is an offload actor, which is given an isolated snapshot of the data tree. It performs the potentially
@@ -21,13 +27,18 @@ import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshotRep
  * @author Robert Varga
  */
 public final class ShardSnapshotActor extends AbstractUntypedActorWithMetering {
+    private static final Logger LOG = LoggerFactory.getLogger(ShardSnapshotActor.class);
+
     // Internal message
     private static final class SerializeSnapshot {
         private final ShardDataTreeSnapshot snapshot;
+        private final Optional<OutputStream> installSnapshotStream;
         private final ActorRef replyTo;
 
-        SerializeSnapshot(final ShardDataTreeSnapshot snapshot, final ActorRef replyTo) {
+        SerializeSnapshot(final ShardDataTreeSnapshot snapshot, final Optional<OutputStream> installSnapshotStream,
+                final ActorRef replyTo) {
             this.snapshot = Preconditions.checkNotNull(snapshot);
+            this.installSnapshotStream = Preconditions.checkNotNull(installSnapshotStream);
             this.replyTo = Preconditions.checkNotNull(replyTo);
         }
 
@@ -35,6 +46,10 @@ public final class ShardSnapshotActor extends AbstractUntypedActorWithMetering {
             return snapshot;
         }
 
+        Optional<OutputStream> getInstallSnapshotStream() {
+            return installSnapshotStream;
+        }
+
         ActorRef getReplyTo() {
             return replyTo;
         }
@@ -50,16 +65,39 @@ public final class ShardSnapshotActor extends AbstractUntypedActorWithMetering {
     @Override
     protected void handleReceive(final Object message) throws Exception {
         if (message instanceof SerializeSnapshot) {
-            final SerializeSnapshot request = (SerializeSnapshot) message;
-            request.getReplyTo().tell(new CaptureSnapshotReply(request.getSnapshot().serialize()), ActorRef.noSender());
+            onSerializeSnapshot((SerializeSnapshot) message);
         } else {
             unknownMessage(message);
         }
     }
 
-    public static void requestSnapshot(final ActorRef snapshotActor, ShardDataTreeSnapshot snapshot,
-            final ActorRef replyTo) {
-        snapshotActor.tell(new SerializeSnapshot(snapshot, replyTo), ActorRef.noSender());
+    private void onSerializeSnapshot(SerializeSnapshot request) {
+        Optional<OutputStream> installSnapshotStream = request.getInstallSnapshotStream();
+        if (installSnapshotStream.isPresent()) {
+            try {
+                request.getSnapshot().serialize(installSnapshotStream.get());
+            } catch (IOException e) {
+                // TODO - we should communicate the failure in the CaptureSnapshotReply.
+                LOG.error("Error serializing snapshot", e);
+            }
+        }
+
+        request.getReplyTo().tell(new CaptureSnapshotReply(new ShardSnapshotState(request.getSnapshot()),
+                installSnapshotStream), ActorRef.noSender());
+    }
+
+    /**
+     * Sends a request to a ShardSnapshotActor to process a snapshot and send a CaptureSnapshotReply.
+     *
+     * @param snapshotActor the ShardSnapshotActor
+     * @param snapshot the snapshot to process
+     * @param installSnapshotStream Optional OutputStream that is present if the snapshot is to also be installed
+     *        on a follower.
+     * @param replyTo the actor to which to send the CaptureSnapshotReply
+     */
+    public static void requestSnapshot(final ActorRef snapshotActor, final ShardDataTreeSnapshot snapshot,
+            final Optional<OutputStream> installSnapshotStream, final ActorRef replyTo) {
+        snapshotActor.tell(new SerializeSnapshot(snapshot, installSnapshotStream, replyTo), ActorRef.noSender());
     }
 
     public static Props props() {
index 48d2673..1d9b58f 100644 (file)
@@ -8,12 +8,12 @@
 package org.opendaylight.controller.cluster.datastore.persisted;
 
 import com.google.common.base.Verify;
-import java.io.ByteArrayOutputStream;
 import java.io.DataInputStream;
 import java.io.DataOutputStream;
 import java.io.IOException;
 import java.io.ObjectInputStream;
 import java.io.ObjectOutputStream;
+import java.io.OutputStream;
 import java.util.Optional;
 import javax.annotation.Nonnull;
 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
@@ -93,15 +93,11 @@ abstract class AbstractVersionedShardDataTreeSnapshot extends ShardDataTreeSnaps
     }
 
     @Override
-    public final byte[] serialize() throws IOException {
-        try (final ByteArrayOutputStream bos = new ByteArrayOutputStream()) {
-            try (final DataOutputStream dos = new DataOutputStream(bos)) {
-                final PayloadVersion version = version();
-                version.writeTo(dos);
-                versionedSerialize(dos, version);
-            }
-
-            return bos.toByteArray();
+    public void serialize(final OutputStream os) throws IOException {
+        try (final DataOutputStream dos = new DataOutputStream(os)) {
+            final PayloadVersion version = version();
+            version.writeTo(dos);
+            versionedSerialize(dos, version);
         }
     }
 }
index 3aca7f2..7be0d85 100644 (file)
@@ -8,6 +8,9 @@
 package org.opendaylight.controller.cluster.datastore.persisted;
 
 import com.google.common.annotations.Beta;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.io.OutputStream;
 import java.util.Optional;
 import javax.annotation.Nullable;
 import org.opendaylight.controller.cluster.datastore.node.utils.stream.SerializationUtils;
@@ -33,7 +36,9 @@ public final class PreBoronShardDataTreeSnapshot extends ShardDataTreeSnapshot {
     }
 
     @Override
-    public byte[] serialize() {
-        return SerializationUtils.serializeNormalizedNode(rootNode);
+    public void serialize(OutputStream os) throws IOException {
+        try (final DataOutputStream dos = new DataOutputStream(os)) {
+            SerializationUtils.serializeNormalizedNode(rootNode, dos);
+        }
     }
-}
\ No newline at end of file
+}
index 6cb047f..7b8382e 100644 (file)
@@ -12,8 +12,8 @@ import java.io.ByteArrayInputStream;
 import java.io.DataInputStream;
 import java.io.IOException;
 import java.io.InputStream;
+import java.io.OutputStream;
 import java.util.Optional;
-import javax.annotation.Nonnull;
 import org.opendaylight.controller.cluster.datastore.node.utils.stream.SerializationUtils;
 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
 import org.slf4j.Logger;
@@ -32,6 +32,7 @@ public abstract class ShardDataTreeSnapshot {
         // Hidden to prevent subclassing from outside of this package
     }
 
+    @Deprecated
     public static ShardDataTreeSnapshot deserialize(final byte[] bytes) throws IOException {
         /**
          * Unfortunately versions prior to Boron did not include any way to evolve the snapshot format and contained
@@ -50,18 +51,7 @@ public abstract class ShardDataTreeSnapshot {
 
         try {
             try (final InputStream is = new ByteArrayInputStream(bytes)) {
-                try (final DataInputStream dis = new DataInputStream(is)) {
-                    final ShardDataTreeSnapshot ret = AbstractVersionedShardDataTreeSnapshot.deserialize(dis);
-
-                    // Make sure we consume all bytes, otherwise something went very wrong
-                    final int bytesLeft = dis.available();
-                    if (bytesLeft != 0) {
-                        throw new IOException("Deserialization left " + bytesLeft + " in the buffer");
-                    }
-
-
-                    return ret;
-                }
+                return deserialize(is);
             }
         } catch (IOException e) {
             LOG.debug("Failed to deserialize versioned stream, attempting pre-Lithium ProtoBuf", e);
@@ -69,6 +59,21 @@ public abstract class ShardDataTreeSnapshot {
         }
     }
 
+    public static ShardDataTreeSnapshot deserialize(final InputStream is) throws IOException {
+        try (final DataInputStream dis = new DataInputStream(is)) {
+            final ShardDataTreeSnapshot ret = AbstractVersionedShardDataTreeSnapshot.deserialize(dis);
+
+            // Make sure we consume all bytes, otherwise something went very wrong
+            final int bytesLeft = dis.available();
+            if (bytesLeft != 0) {
+                throw new IOException("Deserialization left " + bytesLeft + " in the buffer");
+            }
+
+
+            return ret;
+        }
+    }
+
     /**
      * Get the root data node contained in this snapshot.
      *
@@ -76,14 +81,9 @@ public abstract class ShardDataTreeSnapshot {
      */
     public abstract Optional<NormalizedNode<?, ?>> getRootNode();
 
-    /**
-     * Serialize this snapshot into a byte array for persistence.
-     *
-     * @return Serialized snapshot
-     * @throws IOException when a serialization problem occurs
-     */
-    public abstract @Nonnull byte[] serialize() throws IOException;
+    public abstract void serialize(final OutputStream os) throws IOException;
 
+    @Deprecated
     private static boolean isLegacyStream(final byte[] bytes) {
         if (bytes.length < 2) {
             // Versioned streams have at least two bytes
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/persisted/ShardSnapshotState.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/persisted/ShardSnapshotState.java
new file mode 100644 (file)
index 0000000..d4556cd
--- /dev/null
@@ -0,0 +1,104 @@
+/*
+ * Copyright (c) 2017 Brocade Communications Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.datastore.persisted;
+
+import com.google.common.base.Preconditions;
+import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
+import java.io.Externalizable;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.ObjectInput;
+import java.io.ObjectOutput;
+import java.io.OutputStream;
+import javax.annotation.Nonnull;
+import org.opendaylight.controller.cluster.raft.persisted.Snapshot;
+
+/**
+ * Encapsulates the snapshot State for a Shard.
+ *
+ * @author Thomas Pantelis
+ */
+public class ShardSnapshotState implements Snapshot.State {
+    private static final long serialVersionUID = 1L;
+
+    private static final class Proxy implements Externalizable {
+        private static final long serialVersionUID = 1L;
+
+        private ShardSnapshotState snapshotState;
+
+        // checkstyle flags the public modifier as redundant which really doesn't make sense since it clearly isn't
+        // redundant. It is explicitly needed for Java serialization to be able to create instances via reflection.
+        @SuppressWarnings("checkstyle:RedundantModifier")
+        public Proxy() {
+            // For Externalizable
+        }
+
+        Proxy(final ShardSnapshotState snapshotState) {
+            this.snapshotState = snapshotState;
+        }
+
+        @Override
+        public void writeExternal(final ObjectOutput out) throws IOException {
+            snapshotState.snapshot.serialize(toOutputStream(out));
+        }
+
+        private OutputStream toOutputStream(final ObjectOutput out) {
+            if (out instanceof OutputStream) {
+                return (OutputStream) out;
+            }
+
+            return new OutputStream() {
+                @Override
+                public void write(int value) throws IOException {
+                    out.write(value);
+                }
+            };
+        }
+
+        @Override
+        public void readExternal(final ObjectInput in) throws IOException, ClassNotFoundException {
+            snapshotState = new ShardSnapshotState(ShardDataTreeSnapshot.deserialize(toInputStream(in)));
+        }
+
+        private InputStream toInputStream(final ObjectInput in) {
+            if (in instanceof InputStream) {
+                return (InputStream) in;
+            }
+
+            return new InputStream() {
+                @Override
+                public int read() throws IOException {
+                    return in.read();
+                }
+            };
+        }
+
+        private Object readResolve() {
+            return snapshotState;
+        }
+    }
+
+    @SuppressFBWarnings(value = "SE_BAD_FIELD", justification = "This field is not Serializable but this class "
+            + "implements writeReplace to delegate serialization to a Proxy class and thus instances of this class "
+            + "aren't serialized. FindBugs does not recognize this.")
+    private final ShardDataTreeSnapshot snapshot;
+
+    public ShardSnapshotState(@Nonnull ShardDataTreeSnapshot snapshot) {
+        this.snapshot = Preconditions.checkNotNull(snapshot);
+    }
+
+    @Nonnull
+    public ShardDataTreeSnapshot getSnapshot() {
+        return snapshot;
+    }
+
+    @SuppressWarnings("static-method")
+    private Object writeReplace() {
+        return new Proxy(this);
+    }
+}
index 3de3e9a..e73fa08 100644 (file)
@@ -56,9 +56,10 @@ import org.opendaylight.controller.cluster.datastore.modification.MutableComposi
 import org.opendaylight.controller.cluster.datastore.modification.WriteModification;
 import org.opendaylight.controller.cluster.datastore.persisted.CommitTransactionPayload;
 import org.opendaylight.controller.cluster.datastore.persisted.PreBoronShardDataTreeSnapshot;
+import org.opendaylight.controller.cluster.datastore.persisted.ShardSnapshotState;
 import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
-import org.opendaylight.controller.cluster.raft.Snapshot;
 import org.opendaylight.controller.cluster.raft.TestActorFactory;
+import org.opendaylight.controller.cluster.raft.persisted.Snapshot;
 import org.opendaylight.controller.cluster.raft.utils.InMemoryJournal;
 import org.opendaylight.controller.cluster.raft.utils.InMemorySnapshotStore;
 import org.opendaylight.controller.md.cluster.datastore.model.CarsModel;
@@ -349,8 +350,8 @@ public abstract class AbstractShardTest extends AbstractActorTest {
         final NormalizedNode<?, ?> root = readStore(testStore, YangInstanceIdentifier.EMPTY);
 
         InMemorySnapshotStore.addSnapshot(shardID.toString(), Snapshot.create(
-                new PreBoronShardDataTreeSnapshot(root).serialize(),
-                Collections.<ReplicatedLogEntry>emptyList(), 0, 1, -1, -1));
+                new ShardSnapshotState(new PreBoronShardDataTreeSnapshot(root)),
+                Collections.<ReplicatedLogEntry>emptyList(), 0, 1, -1, -1, 1, null, null));
         return testStore;
     }
 
index cb2284a..1b885c4 100644 (file)
@@ -30,6 +30,7 @@ import com.google.common.util.concurrent.ListenableFuture;
 import com.google.common.util.concurrent.MoreExecutors;
 import com.google.common.util.concurrent.Uninterruptibles;
 import com.typesafe.config.ConfigFactory;
+import java.io.ByteArrayOutputStream;
 import java.io.IOException;
 import java.math.BigInteger;
 import java.util.ArrayList;
@@ -52,10 +53,12 @@ import org.opendaylight.controller.cluster.datastore.messages.DatastoreSnapshot;
 import org.opendaylight.controller.cluster.datastore.messages.FindLocalShard;
 import org.opendaylight.controller.cluster.datastore.messages.LocalShardFound;
 import org.opendaylight.controller.cluster.datastore.persisted.MetadataShardDataTreeSnapshot;
+import org.opendaylight.controller.cluster.datastore.persisted.ShardSnapshotState;
 import org.opendaylight.controller.cluster.datastore.utils.MockDataChangeListener;
 import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
-import org.opendaylight.controller.cluster.raft.Snapshot;
+import org.opendaylight.controller.cluster.raft.persisted.Snapshot;
 import org.opendaylight.controller.cluster.raft.utils.InMemoryJournal;
+import org.opendaylight.controller.cluster.raft.utils.InMemorySnapshotStore;
 import org.opendaylight.controller.md.cluster.datastore.model.CarsModel;
 import org.opendaylight.controller.md.cluster.datastore.model.PeopleModel;
 import org.opendaylight.controller.md.cluster.datastore.model.SchemaContextHelper;
@@ -180,7 +183,6 @@ public class DistributedDataStoreIntegrationTest {
 
     @Test
     public void testReadWriteTransactionWithSingleShard() throws Exception {
-        System.setProperty("shard.persistent", "true");
         new IntegrationTestKit(getSystem(), datastoreContextBuilder) {
             {
                 try (AbstractDataStore dataStore = setupDistributedDataStore(
@@ -1255,8 +1257,9 @@ public class DistributedDataStoreIntegrationTest {
                 AbstractShardTest.writeToStore(dataTree, CarsModel.BASE_PATH, carsNode);
                 NormalizedNode<?, ?> root = AbstractShardTest.readStore(dataTree, YangInstanceIdentifier.EMPTY);
 
-                final Snapshot carsSnapshot = Snapshot.create(new MetadataShardDataTreeSnapshot(root).serialize(),
-                        Collections.<ReplicatedLogEntry>emptyList(), 2, 1, 2, 1, 1, "member-1");
+                final Snapshot carsSnapshot = Snapshot.create(
+                        new ShardSnapshotState(new MetadataShardDataTreeSnapshot(root)),
+                        Collections.<ReplicatedLogEntry>emptyList(), 2, 1, 2, 1, 1, "member-1", null);
 
                 NormalizedNode<?, ?> peopleNode = PeopleModel.create();
                 dataTree = InMemoryDataTreeFactory.getInstance().create(TreeType.OPERATIONAL);
@@ -1264,8 +1267,9 @@ public class DistributedDataStoreIntegrationTest {
                 AbstractShardTest.writeToStore(dataTree, PeopleModel.BASE_PATH, peopleNode);
                 root = AbstractShardTest.readStore(dataTree, YangInstanceIdentifier.EMPTY);
 
-                Snapshot peopleSnapshot = Snapshot.create(new MetadataShardDataTreeSnapshot(root).serialize(),
-                        Collections.<ReplicatedLogEntry>emptyList(), 2, 1, 2, 1, 1, "member-1");
+                Snapshot peopleSnapshot = Snapshot.create(
+                        new ShardSnapshotState(new MetadataShardDataTreeSnapshot(root)),
+                        Collections.<ReplicatedLogEntry>emptyList(), 2, 1, 2, 1, 1, "member-1", null);
 
                 restoreFromSnapshot = new DatastoreSnapshot(name, null,
                         Arrays.asList(
@@ -1290,4 +1294,41 @@ public class DistributedDataStoreIntegrationTest {
             }
         };
     }
+
+    @Test
+    @Deprecated
+    public void testRecoveryFromPreCarbonSnapshot() throws Exception {
+        new IntegrationTestKit(getSystem(), datastoreContextBuilder) {
+            {
+                final String name = "testRecoveryFromPreCarbonSnapshot";
+
+                ContainerNode carsNode = CarsModel.newCarsNode(
+                        CarsModel.newCarsMapNode(CarsModel.newCarEntry("optima", BigInteger.valueOf(20000L)),
+                                CarsModel.newCarEntry("sportage", BigInteger.valueOf(30000L))));
+
+                DataTree dataTree = InMemoryDataTreeFactory.getInstance().create(TreeType.OPERATIONAL);
+                dataTree.setSchemaContext(SchemaContextHelper.full());
+                AbstractShardTest.writeToStore(dataTree, CarsModel.BASE_PATH, carsNode);
+                NormalizedNode<?, ?> root = AbstractShardTest.readStore(dataTree, YangInstanceIdentifier.EMPTY);
+
+                final ByteArrayOutputStream bos = new ByteArrayOutputStream();
+                new MetadataShardDataTreeSnapshot(root).serialize(bos);
+                final org.opendaylight.controller.cluster.raft.Snapshot snapshot =
+                    org.opendaylight.controller.cluster.raft.Snapshot.create(bos.toByteArray(),
+                            Collections.<ReplicatedLogEntry>emptyList(), 2, 1, 2, 1, 1, "member-1", null);
+
+                InMemorySnapshotStore.addSnapshot("member-1-shard-cars-" + name, snapshot);
+
+                try (AbstractDataStore dataStore = setupDistributedDataStore(name, "module-shards-member1.conf",
+                        true, "cars")) {
+
+                    DOMStoreReadTransaction readTx = dataStore.newReadOnlyTransaction();
+
+                    Optional<NormalizedNode<?, ?>> optional = readTx.read(CarsModel.BASE_PATH).get(5, TimeUnit.SECONDS);
+                    assertEquals("isPresent", true, optional.isPresent());
+                    assertEquals("Data node", carsNode, optional.get());
+                }
+            }
+        };
+    }
 }
index ff44f01..501da5c 100644 (file)
@@ -36,6 +36,7 @@ import com.google.common.util.concurrent.Uninterruptibles;
 import com.typesafe.config.ConfigFactory;
 import java.math.BigInteger;
 import java.util.Arrays;
+import java.util.Collections;
 import java.util.LinkedList;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
@@ -56,10 +57,14 @@ import org.opendaylight.controller.cluster.datastore.messages.ReadyLocalTransact
 import org.opendaylight.controller.cluster.datastore.messages.ReadyTransactionReply;
 import org.opendaylight.controller.cluster.datastore.modification.MergeModification;
 import org.opendaylight.controller.cluster.datastore.modification.WriteModification;
+import org.opendaylight.controller.cluster.datastore.persisted.MetadataShardDataTreeSnapshot;
+import org.opendaylight.controller.cluster.datastore.persisted.ShardSnapshotState;
 import org.opendaylight.controller.cluster.raft.client.messages.Shutdown;
 import org.opendaylight.controller.cluster.raft.persisted.ApplyJournalEntries;
+import org.opendaylight.controller.cluster.raft.persisted.Snapshot;
 import org.opendaylight.controller.cluster.raft.policy.DisableElectionsRaftPolicy;
 import org.opendaylight.controller.cluster.raft.utils.InMemoryJournal;
+import org.opendaylight.controller.cluster.raft.utils.InMemorySnapshotStore;
 import org.opendaylight.controller.md.cluster.datastore.model.CarsModel;
 import org.opendaylight.controller.md.cluster.datastore.model.PeopleModel;
 import org.opendaylight.controller.md.cluster.datastore.model.SchemaContextHelper;
@@ -131,6 +136,9 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest {
 
     @Before
     public void setUp() {
+        InMemoryJournal.clear();
+        InMemorySnapshotStore.clear();
+
         leaderSystem = ActorSystem.create("cluster-test", ConfigFactory.load().getConfig("Member1"));
         Cluster.get(leaderSystem).join(MEMBER_1_ADDRESS);
 
@@ -153,6 +161,9 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest {
         JavaTestKit.shutdownActorSystem(leaderSystem);
         JavaTestKit.shutdownActorSystem(followerSystem);
         JavaTestKit.shutdownActorSystem(follower2System);
+
+        InMemoryJournal.clear();
+        InMemorySnapshotStore.clear();
     }
 
     private void initDatastoresWithCars(final String type) {
@@ -1012,6 +1023,56 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest {
         }
     }
 
+    @Test
+    public void testInstallSnapshot() throws Exception {
+        final String testName = "testInstallSnapshot";
+        final String leaderCarShardName = "member-1-shard-cars-" + testName;
+        final String followerCarShardName = "member-2-shard-cars-" + testName;
+
+        // Setup a saved snapshot on the leader. The follower will startup with no data and the leader should
+        // install a snapshot to sync the follower.
+
+        TipProducingDataTree tree = InMemoryDataTreeFactory.getInstance().create(TreeType.CONFIGURATION);
+        tree.setSchemaContext(SchemaContextHelper.full());
+
+        ContainerNode carsNode = CarsModel.newCarsNode(
+                CarsModel.newCarsMapNode(CarsModel.newCarEntry("optima", BigInteger.valueOf(20000))));
+        AbstractShardTest.writeToStore(tree, CarsModel.BASE_PATH, carsNode);
+
+        NormalizedNode<?, ?> snapshotRoot = AbstractShardTest.readStore(tree, YangInstanceIdentifier.EMPTY);
+        Snapshot initialSnapshot = Snapshot.create(
+                new ShardSnapshotState(new MetadataShardDataTreeSnapshot(snapshotRoot)),
+                Collections.emptyList(), 5, 1, 5, 1, 1, null, null);
+        InMemorySnapshotStore.addSnapshot(leaderCarShardName, initialSnapshot);
+
+        InMemorySnapshotStore.addSnapshotSavedLatch(leaderCarShardName);
+        InMemorySnapshotStore.addSnapshotSavedLatch(followerCarShardName);
+
+        initDatastoresWithCars(testName);
+
+        Optional<NormalizedNode<?, ?>> readOptional = leaderDistributedDataStore.newReadOnlyTransaction().read(
+                CarsModel.BASE_PATH).checkedGet(5, TimeUnit.SECONDS);
+        assertEquals("isPresent", true, readOptional.isPresent());
+        assertEquals("Node", carsNode, readOptional.get());
+
+        verifySnapshot(InMemorySnapshotStore.waitForSavedSnapshot(leaderCarShardName, Snapshot.class),
+                initialSnapshot, snapshotRoot);
+
+        verifySnapshot(InMemorySnapshotStore.waitForSavedSnapshot(followerCarShardName, Snapshot.class),
+                initialSnapshot, snapshotRoot);
+    }
+
+    private static void verifySnapshot(Snapshot actual, Snapshot expected, NormalizedNode<?, ?> expRoot) {
+        assertEquals("Snapshot getLastAppliedTerm", expected.getLastAppliedTerm(), actual.getLastAppliedTerm());
+        assertEquals("Snapshot getLastAppliedIndex", expected.getLastAppliedIndex(), actual.getLastAppliedIndex());
+        assertEquals("Snapshot getLastTerm", expected.getLastTerm(), actual.getLastTerm());
+        assertEquals("Snapshot getLastIndex", expected.getLastIndex(), actual.getLastIndex());
+        assertEquals("Snapshot state type", ShardSnapshotState.class, actual.getState().getClass());
+        MetadataShardDataTreeSnapshot shardSnapshot =
+                (MetadataShardDataTreeSnapshot) ((ShardSnapshotState)actual.getState()).getSnapshot();
+        assertEquals("Snapshot root node", expRoot, shardSnapshot.getRootNode().get());
+    }
+
     private static void sendDatastoreContextUpdate(final AbstractDataStore dataStore, final Builder builder) {
         final Builder newBuilder = DatastoreContext.newBuilderFrom(builder.build());
         DatastoreContextFactory mockContextFactory = Mockito.mock(DatastoreContextFactory.class);
index 262758f..736e291 100644 (file)
@@ -17,7 +17,8 @@ import org.junit.Before;
 import org.junit.Test;
 import org.mockito.Mockito;
 import org.opendaylight.controller.cluster.datastore.persisted.CommitTransactionPayload;
-import org.opendaylight.controller.cluster.datastore.persisted.PreBoronShardDataTreeSnapshot;
+import org.opendaylight.controller.cluster.datastore.persisted.MetadataShardDataTreeSnapshot;
+import org.opendaylight.controller.cluster.datastore.persisted.ShardSnapshotState;
 import org.opendaylight.controller.md.cluster.datastore.model.CarsModel;
 import org.opendaylight.controller.md.cluster.datastore.model.PeopleModel;
 import org.opendaylight.controller.md.cluster.datastore.model.SchemaContextHelper;
@@ -133,7 +134,7 @@ public class ShardRecoveryCoordinatorTest extends AbstractTest {
         return shardDataTree.readNode(PeopleModel.BASE_PATH);
     }
 
-    private static byte[] createSnapshot() {
+    private static ShardSnapshotState createSnapshot() {
         final TipProducingDataTree dataTree = InMemoryDataTreeFactory.getInstance().create(TreeType.OPERATIONAL);
         dataTree.setSchemaContext(SchemaContextHelper.select(SchemaContextHelper.CARS_YANG,
                 SchemaContextHelper.PEOPLE_YANG));
@@ -147,7 +148,7 @@ public class ShardRecoveryCoordinatorTest extends AbstractTest {
         modification.ready();
         dataTree.commit(dataTree.prepare(modification));
 
-        return new PreBoronShardDataTreeSnapshot(dataTree.takeSnapshot().readNode(YangInstanceIdentifier.EMPTY).get())
-                .serialize();
+        return new ShardSnapshotState(new MetadataShardDataTreeSnapshot(dataTree.takeSnapshot().readNode(
+                YangInstanceIdentifier.EMPTY).get()));
     }
 }
index ab07b66..2edbff2 100644 (file)
@@ -77,14 +77,13 @@ import org.opendaylight.controller.cluster.datastore.messages.UpdateSchemaContex
 import org.opendaylight.controller.cluster.datastore.modification.MergeModification;
 import org.opendaylight.controller.cluster.datastore.modification.WriteModification;
 import org.opendaylight.controller.cluster.datastore.persisted.MetadataShardDataTreeSnapshot;
-import org.opendaylight.controller.cluster.datastore.persisted.ShardDataTreeSnapshot;
+import org.opendaylight.controller.cluster.datastore.persisted.ShardSnapshotState;
 import org.opendaylight.controller.cluster.datastore.utils.MockDataChangeListener;
 import org.opendaylight.controller.cluster.datastore.utils.MockDataTreeChangeListener;
 import org.opendaylight.controller.cluster.notifications.RegisterRoleChangeListener;
 import org.opendaylight.controller.cluster.notifications.RegisterRoleChangeListenerReply;
 import org.opendaylight.controller.cluster.raft.RaftActorContext;
 import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
-import org.opendaylight.controller.cluster.raft.Snapshot;
 import org.opendaylight.controller.cluster.raft.base.messages.ApplySnapshot;
 import org.opendaylight.controller.cluster.raft.base.messages.ElectionTimeout;
 import org.opendaylight.controller.cluster.raft.base.messages.FollowerInitialSyncUpStatus;
@@ -97,6 +96,7 @@ import org.opendaylight.controller.cluster.raft.messages.RequestVote;
 import org.opendaylight.controller.cluster.raft.messages.ServerRemoved;
 import org.opendaylight.controller.cluster.raft.persisted.ApplyJournalEntries;
 import org.opendaylight.controller.cluster.raft.persisted.SimpleReplicatedLogEntry;
+import org.opendaylight.controller.cluster.raft.persisted.Snapshot;
 import org.opendaylight.controller.cluster.raft.policy.DisableElectionsRaftPolicy;
 import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload;
 import org.opendaylight.controller.cluster.raft.utils.InMemoryJournal;
@@ -442,8 +442,9 @@ public class ShardTest extends AbstractShardTest {
         final YangInstanceIdentifier root = YangInstanceIdentifier.EMPTY;
         final NormalizedNode<?,?> expected = readStore(store, root);
 
-        final Snapshot snapshot = Snapshot.create(new MetadataShardDataTreeSnapshot(expected).serialize(),
-                Collections.<ReplicatedLogEntry>emptyList(), 1, 2, 3, 4);
+        final Snapshot snapshot = Snapshot.create(
+                new ShardSnapshotState(new MetadataShardDataTreeSnapshot(expected)),
+                Collections.<ReplicatedLogEntry>emptyList(), 1, 2, 3, 4, -1, null, null);
 
         shard.tell(new ApplySnapshot(snapshot), ActorRef.noSender());
 
@@ -1975,8 +1976,8 @@ public class ShardTest extends AbstractShardTest {
 
             private void verifySnapshot(final Snapshot snapshot, final NormalizedNode<?, ?> expectedRoot)
                     throws IOException {
-                final NormalizedNode<?, ?> actual = ShardDataTreeSnapshot.deserialize(snapshot.getState()).getRootNode()
-                        .get();
+                final NormalizedNode<?, ?> actual = ((ShardSnapshotState)snapshot.getState()).getSnapshot()
+                        .getRootNode().get();
                 assertEquals("Root node", expectedRoot, actual);
             }
         };
index 65a8ac9..5cf8183 100644 (file)
@@ -9,16 +9,17 @@ package org.opendaylight.controller.cluster.datastore.actors;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertTrue;
 
 import akka.actor.ActorRef;
 import akka.testkit.JavaTestKit;
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
 import java.util.Optional;
 import org.junit.Test;
 import org.opendaylight.controller.cluster.datastore.AbstractActorTest;
 import org.opendaylight.controller.cluster.datastore.persisted.MetadataShardDataTreeSnapshot;
-import org.opendaylight.controller.cluster.datastore.persisted.PreBoronShardDataTreeSnapshot;
 import org.opendaylight.controller.cluster.datastore.persisted.ShardDataTreeSnapshot;
+import org.opendaylight.controller.cluster.datastore.persisted.ShardSnapshotState;
 import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshotReply;
 import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
@@ -27,41 +28,42 @@ import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
 public class ShardSnapshotActorTest extends AbstractActorTest {
     private static final NormalizedNode<?, ?> DATA = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
 
-    private static void testSerializeSnapshot(final String testName, final ShardDataTreeSnapshot snapshot)
-            throws Exception {
+    private static void testSerializeSnapshot(final String testName, final ShardDataTreeSnapshot snapshot,
+            final boolean withInstallSnapshot) throws Exception {
         new JavaTestKit(getSystem()) {
             {
-
                 final ActorRef snapshotActor = getSystem().actorOf(ShardSnapshotActor.props(), testName);
                 watch(snapshotActor);
 
                 final NormalizedNode<?, ?> expectedRoot = snapshot.getRootNode().get();
 
-                ShardSnapshotActor.requestSnapshot(snapshotActor, snapshot, getRef());
+                ByteArrayOutputStream installSnapshotStream = withInstallSnapshot ? new ByteArrayOutputStream() : null;
+                ShardSnapshotActor.requestSnapshot(snapshotActor, snapshot,
+                        Optional.ofNullable(installSnapshotStream), getRef());
 
                 final CaptureSnapshotReply reply = expectMsgClass(duration("3 seconds"), CaptureSnapshotReply.class);
-                assertNotNull("getSnapshot is null", reply.getSnapshot());
-
-                final ShardDataTreeSnapshot actual = ShardDataTreeSnapshot.deserialize(reply.getSnapshot());
-                assertNotNull(actual);
-                assertEquals(snapshot.getClass(), actual.getClass());
+                assertNotNull("getSnapshotState is null", reply.getSnapshotState());
+                assertEquals("SnapshotState type", ShardSnapshotState.class, reply.getSnapshotState().getClass());
+                assertEquals("Snapshot", snapshot, ((ShardSnapshotState)reply.getSnapshotState()).getSnapshot());
 
-                final Optional<NormalizedNode<?, ?>> maybeNode = actual.getRootNode();
-                assertTrue(maybeNode.isPresent());
+                if (installSnapshotStream != null) {
+                    final ShardDataTreeSnapshot deserialized = ShardDataTreeSnapshot.deserialize(
+                            new ByteArrayInputStream(installSnapshotStream.toByteArray()));
+                    assertEquals("Deserialized snapshot type", snapshot.getClass(), deserialized.getClass());
 
-                assertEquals("Root node", expectedRoot, maybeNode.get());
+                    final Optional<NormalizedNode<?, ?>> maybeNode = deserialized.getRootNode();
+                    assertEquals("isPresent", true, maybeNode.isPresent());
+                    assertEquals("Root node", expectedRoot, maybeNode.get());
+                }
             }
         };
     }
 
     @Test
     public void testSerializeBoronSnapshot() throws Exception {
-        testSerializeSnapshot("testSerializeBoronSnapshot", new MetadataShardDataTreeSnapshot(DATA));
-    }
-
-    @Deprecated
-    @Test
-    public void testSerializeLegacySnapshot() throws Exception {
-        testSerializeSnapshot("testSerializeLegacySnapshot", new PreBoronShardDataTreeSnapshot(DATA));
+        testSerializeSnapshot("testSerializeBoronSnapshotWithInstallSnapshot",
+                new MetadataShardDataTreeSnapshot(DATA), true);
+        testSerializeSnapshot("testSerializeBoronSnapshotWithoutInstallSnapshot",
+                new MetadataShardDataTreeSnapshot(DATA), false);
     }
 }
index 7be9131..e909e79 100644 (file)
@@ -10,6 +10,8 @@ package org.opendaylight.controller.cluster.datastore.persisted;
 import static org.junit.Assert.assertEquals;
 
 import com.google.common.collect.ImmutableMap;
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
 import java.io.Externalizable;
 import java.io.IOException;
 import java.io.ObjectInput;
@@ -37,9 +39,11 @@ public class ShardDataTreeSnapshotTest {
                 .withChild(ImmutableNodes.leafNode(TestModel.DESC_QNAME, "foo")).build();
 
         MetadataShardDataTreeSnapshot snapshot = new MetadataShardDataTreeSnapshot(expectedNode);
-        byte[] serialized = snapshot.serialize();
+        ByteArrayOutputStream bos = new ByteArrayOutputStream();
+        snapshot.serialize(bos);
 
-        ShardDataTreeSnapshot deserialized = ShardDataTreeSnapshot.deserialize(serialized);
+        ShardDataTreeSnapshot deserialized = ShardDataTreeSnapshot.deserialize(
+                new ByteArrayInputStream(bos.toByteArray()));
 
         Optional<NormalizedNode<?, ?>> actualNode = deserialized.getRootNode();
         assertEquals("rootNode present", true, actualNode.isPresent());
@@ -57,9 +61,11 @@ public class ShardDataTreeSnapshotTest {
         Map<Class<? extends ShardDataTreeSnapshotMetadata<?>>, ShardDataTreeSnapshotMetadata<?>> expMetadata =
                 ImmutableMap.of(TestShardDataTreeSnapshotMetadata.class, new TestShardDataTreeSnapshotMetadata("test"));
         MetadataShardDataTreeSnapshot snapshot = new MetadataShardDataTreeSnapshot(expectedNode, expMetadata);
-        byte[] serialized = snapshot.serialize();
+        ByteArrayOutputStream bos = new ByteArrayOutputStream();
+        snapshot.serialize(bos);
 
-        ShardDataTreeSnapshot deserialized = ShardDataTreeSnapshot.deserialize(serialized);
+        ShardDataTreeSnapshot deserialized = ShardDataTreeSnapshot.deserialize(
+                new ByteArrayInputStream(bos.toByteArray()));
 
         Optional<NormalizedNode<?, ?>> actualNode = deserialized.getRootNode();
         assertEquals("rootNode present", true, actualNode.isPresent());
@@ -69,15 +75,17 @@ public class ShardDataTreeSnapshotTest {
     }
 
     @Test
+    @Deprecated
     public void testPreBoronShardDataTreeSnapshot() throws Exception {
         NormalizedNode<?, ?> expectedNode = ImmutableContainerNodeBuilder.create()
                 .withNodeIdentifier(new YangInstanceIdentifier.NodeIdentifier(TestModel.TEST_QNAME))
                 .withChild(ImmutableNodes.leafNode(TestModel.DESC_QNAME, "foo")).build();
 
         PreBoronShardDataTreeSnapshot snapshot = new PreBoronShardDataTreeSnapshot(expectedNode);
-        byte[] serialized = snapshot.serialize();
+        ByteArrayOutputStream bos = new ByteArrayOutputStream();
+        snapshot.serialize(bos);
 
-        ShardDataTreeSnapshot deserialized = ShardDataTreeSnapshot.deserialize(serialized);
+        ShardDataTreeSnapshot deserialized = ShardDataTreeSnapshot.deserialize(bos.toByteArray());
 
         Optional<NormalizedNode<?, ?>> actualNode = deserialized.getRootNode();
         assertEquals("rootNode present", true, actualNode.isPresent());
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/persisted/ShardSnapshotStateTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/persisted/ShardSnapshotStateTest.java
new file mode 100644 (file)
index 0000000..82b9f45
--- /dev/null
@@ -0,0 +1,42 @@
+/*
+ * Copyright (c) 2017 Brocade Communications Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.datastore.persisted;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+
+import org.apache.commons.lang.SerializationUtils;
+import org.junit.Test;
+import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
+import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
+import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
+import org.opendaylight.yangtools.yang.data.impl.schema.builder.impl.ImmutableContainerNodeBuilder;
+
+/**
+ * Unit tests for ShardSnapshotState.
+ *
+ * @author Thomas Pantelis
+ */
+public class ShardSnapshotStateTest {
+
+    @Test
+    public void testSerialization() {
+        NormalizedNode<?, ?> expectedNode = ImmutableContainerNodeBuilder.create()
+                .withNodeIdentifier(new YangInstanceIdentifier.NodeIdentifier(TestModel.TEST_QNAME))
+                .withChild(ImmutableNodes.leafNode(TestModel.DESC_QNAME, "foo")).build();
+
+        ShardSnapshotState expected = new ShardSnapshotState(new MetadataShardDataTreeSnapshot(expectedNode));
+        ShardSnapshotState cloned = (ShardSnapshotState) SerializationUtils.clone(expected);
+
+        assertNotNull("getSnapshot is null", cloned.getSnapshot());
+        assertEquals("getSnapshot type", MetadataShardDataTreeSnapshot.class, cloned.getSnapshot().getClass());
+        assertEquals("getRootNode", expectedNode,
+                ((MetadataShardDataTreeSnapshot)cloned.getSnapshot()).getRootNode().get());
+    }
+}

©2013 OpenDaylight, A Linux Foundation Collaborative Project. All Rights Reserved.
OpenDaylight is a registered trademark of The OpenDaylight Project, Inc.
Linux Foundation and OpenDaylight are registered trademarks of the Linux Foundation.
Linux is a registered trademark of Linus Torvalds.