Bug 7521: Convert Snapshot to store a State instance 72/50572/9
authorTom Pantelis <tpanteli@brocade.com>
Tue, 17 Jan 2017 18:24:28 +0000 (13:24 -0500)
committerTom Pantelis <tpanteli@brocade.com>
Fri, 10 Feb 2017 12:16:14 +0000 (12:16 +0000)
Created a new Snapshot class that stores the state as a Serializable
State instance instead of a byte[] to lay the groundwork for handling
snapshots > 2G. In this manner, State implementations can serialize
their data directly to the ObjectOutputStream.

The previous Snapshot class was deprecated but, unfortunately, it
can't simply "read resolve" to the new class on de-serialization
as the deserialization code doesn't know how to convert the data
from byte[] to a State instance. Therefore a new method,
deserializePreCarbonSnapshot, was added to RaftActorRecoveryCohort.

The introduction of the State interface necessitated changes throughout
the snapshot capture and snapshot recovery code paths. The Shard
implementations were also changed accordingly.

For follower snapshot install, an optional OutputStream is also passed
to createSnapshot. The new API contract is: if the OutputStream is present,
the implementation must serialize its state to the OutputStream and return
the stream instance in the CaptureSnapshotReply along with the snapshot
State instance. The snapshot State is serialized directly to the snapshot
store while the OutputStream is used to send the state data to follower(s)
in chunks. The deserializeSnapshot method is used to convert the serialized
data back to a State instance on the follower end. The serialization for
snapshot install is passed off so the cost of serialization is not charged
to the raft actor's thread.

The OutputStream is converted to a ByteSource when passed to AbstractLeader.
AbstractLeader still converts it to a byte[] but will be changed to use
the ByteSource's InputStream to chunk the data in a subsequent patch.

Also the SnapshotManager currently passes a ByteArrayOutputStream to
createSnapshot but this will be changed to a FileBackedOutputStream in a
subsequent patch.

Change-Id: I2ea30870b54478d7ef5669335ca4b444539f8d56
Signed-off-by: Tom Pantelis <tpanteli@brocade.com>
54 files changed:
opendaylight/md-sal/sal-akka-raft-example/pom.xml
opendaylight/md-sal/sal-akka-raft-example/src/main/java/org/opendaylight/controller/cluster/example/ExampleActor.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/GetSnapshotReplyActor.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/NoopRaftActorSnapshotCohort.java [new file with mode: 0644]
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActor.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActorRecoveryCohort.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActorRecoverySupport.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActorSnapshotCohort.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActorSnapshotMessageSupport.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/ReplicatedLogImpl.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/Snapshot.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/SnapshotManager.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/SnapshotState.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/base/messages/ApplySnapshot.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/base/messages/CaptureSnapshot.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/base/messages/CaptureSnapshotReply.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/base/messages/SendInstallSnapshot.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractLeader.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/Follower.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/persisted/EmptyState.java [new file with mode: 0644]
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/persisted/Snapshot.java [new file with mode: 0644]
opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/AbstractRaftActorIntegrationTest.java
opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/MigratedMessagesTest.java
opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/MockRaftActor.java
opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/MockRaftActorContext.java
opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/RaftActorRecoverySupportTest.java
opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/RaftActorServerConfigurationSupportTest.java
opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/RaftActorSnapshotMessageSupportTest.java
opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/RaftActorTest.java
opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/RecoveryIntegrationSingleNodeTest.java
opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/ReplicationAndSnapshotsIntegrationTest.java
opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/ReplicationAndSnapshotsWithLaggingFollowerIntegrationTest.java
opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/SnapshotManagerTest.java
opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/SnapshotTest.java
opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/FollowerTest.java
opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/LeaderTest.java
opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/persisted/ByteState.java [new file with mode: 0644]
opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/persisted/EmptyStateTest.java [new file with mode: 0644]
opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/persisted/SnapshotTest.java [new file with mode: 0644]
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardRecoveryCoordinator.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardSnapshotCohort.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/actors/ShardSnapshotActor.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/persisted/AbstractVersionedShardDataTreeSnapshot.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/persisted/PreBoronShardDataTreeSnapshot.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/persisted/ShardDataTreeSnapshot.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/persisted/ShardSnapshotState.java [new file with mode: 0644]
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/AbstractShardTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DistributedDataStoreIntegrationTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DistributedDataStoreRemotingIntegrationTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardRecoveryCoordinatorTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/actors/ShardSnapshotActorTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/persisted/ShardDataTreeSnapshotTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/persisted/ShardSnapshotStateTest.java [new file with mode: 0644]

index 315349a90c0a38ca31193e329b245d73570ad775..8667253a2d1aea7beb4a5a946c0bbc5802e2ffca 100644 (file)
       <groupId>org.slf4j</groupId>
       <artifactId>slf4j-simple</artifactId>
     </dependency>
+
+    <dependency>
+      <groupId>com.google.guava</groupId>
+      <artifactId>guava</artifactId>
+    </dependency>
   </dependencies>
 
   <build>
index 6e8051fe55daa461b4d0982eddac7ebe25ccb9f8..663d400b53b3e3f7a589a08b819f930572d4c753 100644 (file)
@@ -11,15 +11,15 @@ package org.opendaylight.controller.cluster.example;
 import akka.actor.ActorRef;
 import akka.actor.Props;
 import com.google.common.base.Optional;
-import com.google.protobuf.ByteString;
-import java.io.ByteArrayInputStream;
-import java.io.ByteArrayOutputStream;
+import com.google.common.base.Throwables;
+import com.google.common.io.ByteSource;
 import java.io.IOException;
-import java.io.ObjectInputStream;
-import java.io.ObjectOutputStream;
+import java.io.OutputStream;
+import java.io.Serializable;
 import java.util.HashMap;
 import java.util.Map;
 import javax.annotation.Nonnull;
+import org.apache.commons.lang3.SerializationUtils;
 import org.opendaylight.controller.cluster.example.messages.KeyValue;
 import org.opendaylight.controller.cluster.example.messages.KeyValueSaved;
 import org.opendaylight.controller.cluster.example.messages.PrintRole;
@@ -32,6 +32,7 @@ import org.opendaylight.controller.cluster.raft.RaftActorSnapshotCohort;
 import org.opendaylight.controller.cluster.raft.RaftState;
 import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshotReply;
 import org.opendaylight.controller.cluster.raft.behaviors.Leader;
+import org.opendaylight.controller.cluster.raft.persisted.Snapshot;
 import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload;
 import org.opendaylight.yangtools.concepts.Identifier;
 import org.opendaylight.yangtools.util.AbstractStringIdentifier;
@@ -131,21 +132,23 @@ public class ExampleActor extends RaftActor implements RaftActorRecoveryCohort,
     }
 
     @Override
-    public void createSnapshot(ActorRef actorRef) {
-        ByteString bs = null;
+    public void createSnapshot(ActorRef actorRef, java.util.Optional<OutputStream> installSnapshotStream) {
         try {
-            bs = fromObject(state);
+            if (installSnapshotStream.isPresent()) {
+                SerializationUtils.serialize((Serializable) state, installSnapshotStream.get());
+            }
         } catch (Exception e) {
             LOG.error("Exception in creating snapshot", e);
         }
-        getSelf().tell(new CaptureSnapshotReply(bs.toByteArray()), null);
+
+        getSelf().tell(new CaptureSnapshotReply(new MapState(state), installSnapshotStream), null);
     }
 
     @Override
-    public void applySnapshot(byte [] snapshot) {
+    public void applySnapshot(Snapshot.State snapshotState) {
         state.clear();
         try {
-            state.putAll((HashMap<String, String>) toObject(snapshot));
+            state.putAll(((MapState)snapshotState).state);
         } catch (Exception e) {
            LOG.error("Exception in applying snapshot", e);
         }
@@ -154,45 +157,6 @@ public class ExampleActor extends RaftActor implements RaftActorRecoveryCohort,
         }
     }
 
-    private static ByteString fromObject(Object snapshot) throws Exception {
-        ByteArrayOutputStream b = null;
-        ObjectOutputStream o = null;
-        try {
-            b = new ByteArrayOutputStream();
-            o = new ObjectOutputStream(b);
-            o.writeObject(snapshot);
-            byte[] snapshotBytes = b.toByteArray();
-            return ByteString.copyFrom(snapshotBytes);
-        } finally {
-            if (o != null) {
-                o.flush();
-                o.close();
-            }
-            if (b != null) {
-                b.close();
-            }
-        }
-    }
-
-    private static Object toObject(byte [] bs) throws ClassNotFoundException, IOException {
-        Object obj = null;
-        ByteArrayInputStream bis = null;
-        ObjectInputStream ois = null;
-        try {
-            bis = new ByteArrayInputStream(bs);
-            ois = new ObjectInputStream(bis);
-            obj = ois.readObject();
-        } finally {
-            if (bis != null) {
-                bis.close();
-            }
-            if (ois != null) {
-                ois.close();
-            }
-        }
-        return obj;
-    }
-
     @Override protected void onStateChanged() {
 
     }
@@ -224,7 +188,7 @@ public class ExampleActor extends RaftActor implements RaftActorRecoveryCohort,
     }
 
     @Override
-    public void applyRecoverySnapshot(byte[] snapshot) {
+    public void applyRecoverySnapshot(Snapshot.State snapshotState) {
     }
 
     @Override
@@ -236,4 +200,29 @@ public class ExampleActor extends RaftActor implements RaftActorRecoveryCohort,
     public byte[] getRestoreFromSnapshot() {
         return null;
     }
+
+    @Override
+    public Snapshot.State deserializeSnapshot(ByteSource snapshotBytes) {
+        try {
+            return deserializePreCarbonSnapshot(snapshotBytes.read());
+        } catch (IOException e) {
+            throw Throwables.propagate(e);
+        }
+    }
+
+    @SuppressWarnings("unchecked")
+    @Override
+    public Snapshot.State deserializePreCarbonSnapshot(byte[] from) {
+        return new MapState((Map<String, String>) SerializationUtils.deserialize(from));
+    }
+
+    private static class MapState implements Snapshot.State {
+        private static final long serialVersionUID = 1L;
+
+        Map<String, String> state;
+
+        MapState(Map<String, String> state) {
+            this.state = state;
+        }
+    }
 }
index 3aac07f6da8ae1a54d1e4aa7273b7dd92b6ffb0f..c94b780b04fccfb437c0d3e36ad0b78339bcdd4f 100644 (file)
@@ -19,6 +19,7 @@ import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshot;
 import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshotReply;
 import org.opendaylight.controller.cluster.raft.client.messages.GetSnapshotReply;
 import org.opendaylight.controller.cluster.raft.persisted.ServerConfigurationPayload;
+import org.opendaylight.controller.cluster.raft.persisted.Snapshot;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import scala.concurrent.duration.Duration;
@@ -42,7 +43,8 @@ class GetSnapshotReplyActor extends UntypedActor {
     @Override
     public void onReceive(Object message) {
         if (message instanceof CaptureSnapshotReply) {
-            Snapshot snapshot = Snapshot.create(((CaptureSnapshotReply)message).getSnapshot(),
+            Snapshot snapshot = Snapshot.create(
+                    ((CaptureSnapshotReply)message).getSnapshotState(),
                     params.captureSnapshot.getUnAppliedEntries(),
                     params.captureSnapshot.getLastIndex(), params.captureSnapshot.getLastTerm(),
                     params.captureSnapshot.getLastAppliedIndex(), params.captureSnapshot.getLastAppliedTerm(),
diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/NoopRaftActorSnapshotCohort.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/NoopRaftActorSnapshotCohort.java
new file mode 100644 (file)
index 0000000..42b226f
--- /dev/null
@@ -0,0 +1,41 @@
+/*
+ * Copyright (c) 2017 Brocade Communications Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.raft;
+
+import akka.actor.ActorRef;
+import com.google.common.io.ByteSource;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.util.Optional;
+import org.opendaylight.controller.cluster.raft.persisted.EmptyState;
+import org.opendaylight.controller.cluster.raft.persisted.Snapshot.State;
+
+/**
+ * RaftActorSnapshotCohort implementation that does nothing.
+ *
+ * @author Thomas Pantelis
+ */
+public final class NoopRaftActorSnapshotCohort implements RaftActorSnapshotCohort {
+    public static final NoopRaftActorSnapshotCohort INSTANCE = new NoopRaftActorSnapshotCohort();
+
+    private NoopRaftActorSnapshotCohort() {
+    }
+
+    @Override
+    public void createSnapshot(ActorRef actorRef, Optional<OutputStream> installSnapshotStream) {
+    }
+
+    @Override
+    public void applySnapshot(State snapshotState) {
+    }
+
+    @Override
+    public State deserializeSnapshot(ByteSource snapshotBytes) throws IOException {
+        return EmptyState.INSTANCE;
+    }
+}
index 2d2fce22f9c3c824cf977be4637cbf60e24e0a46..b007e2d8f5586ec037a40e106e2a8a4482f98c2e 100644 (file)
@@ -762,7 +762,7 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
     protected abstract void onRecoveryComplete();
 
     /**
-     * Returns the RaftActorSnapshotCohort to participate in persistence recovery.
+     * Returns the RaftActorSnapshotCohort to participate in snapshot captures.
      */
     @Nonnull
     protected abstract RaftActorSnapshotCohort getRaftActorSnapshotCohort();
index c3760472ac2a11614b1bbb55ad64578deb21dc8f..9803f1eae72a95597a50823a3febb46c9350893b 100644 (file)
@@ -7,7 +7,9 @@
  */
 package org.opendaylight.controller.cluster.raft;
 
+import javax.annotation.Nonnull;
 import javax.annotation.Nullable;
+import org.opendaylight.controller.cluster.raft.persisted.Snapshot;
 import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload;
 
 /**
@@ -36,9 +38,9 @@ public interface RaftActorRecoveryCohort {
     /**
      * This method is called during recovery to reconstruct the state of the actor.
      *
-     * @param snapshotBytes A snapshot of the state of the actor
+     * @param snapshotState A snapshot of the state of the actor
      */
-    void applyRecoverySnapshot(byte[] snapshotBytes);
+    void applyRecoverySnapshot(Snapshot.State snapshotState);
 
     /**
      * This method is called during recovery at the end of a batch to apply the current batched
@@ -53,4 +55,14 @@ public interface RaftActorRecoveryCohort {
      */
     @Nullable
     byte[] getRestoreFromSnapshot();
+
+    /**
+     * This method is called during recovery to de-serialize a snapshot that was persisted in the pre-Carbon format.
+     *
+     * @param from the snaphot bytes
+     * @return a Snapshot.State instance
+     */
+    @Deprecated
+    @Nonnull
+    Snapshot.State deserializePreCarbonSnapshot(byte [] from);
 }
index 5e4e6571a10d3d1f998796849a9be7486d86de75..df207670d9fb2625219a76729af2b4d0bea0b4c9 100644 (file)
@@ -18,8 +18,10 @@ import org.opendaylight.controller.cluster.PersistentDataProvider;
 import org.opendaylight.controller.cluster.raft.base.messages.ApplySnapshot;
 import org.opendaylight.controller.cluster.raft.persisted.ApplyJournalEntries;
 import org.opendaylight.controller.cluster.raft.persisted.DeleteEntries;
+import org.opendaylight.controller.cluster.raft.persisted.EmptyState;
 import org.opendaylight.controller.cluster.raft.persisted.MigratedSerializable;
 import org.opendaylight.controller.cluster.raft.persisted.ServerConfigurationPayload;
+import org.opendaylight.controller.cluster.raft.persisted.Snapshot;
 import org.opendaylight.controller.cluster.raft.persisted.UpdateElectionTerm;
 import org.opendaylight.controller.cluster.raft.protobuff.client.messages.PersistentPayload;
 import org.slf4j.Logger;
@@ -113,11 +115,23 @@ class RaftActorRecoverySupport {
     }
 
     private void onRecoveredSnapshot(SnapshotOffer offer) {
-        log.debug("{}: SnapshotOffer called..", context.getId());
+        log.debug("{}: SnapshotOffer called.", context.getId());
 
         initRecoveryTimer();
 
-        Snapshot snapshot = (Snapshot) offer.snapshot();
+        Object snapshotObj = offer.snapshot();
+        Snapshot snapshot;
+        if (snapshotObj instanceof org.opendaylight.controller.cluster.raft.Snapshot) {
+            org.opendaylight.controller.cluster.raft.Snapshot legacy =
+                    (org.opendaylight.controller.cluster.raft.Snapshot)snapshotObj;
+            snapshot = Snapshot.create(cohort.deserializePreCarbonSnapshot(legacy.getState()),
+                    legacy.getUnAppliedEntries(), legacy.getLastIndex(), legacy.getLastTerm(),
+                    legacy.getLastAppliedIndex(), legacy.getLastAppliedTerm(),
+                    legacy.getElectionTerm(), legacy.getElectionVotedFor(), legacy.getServerConfiguration());
+            hasMigratedDataRecovered = true;
+        } else {
+            snapshot = (Snapshot) offer.snapshot();
+        }
 
         for (ReplicatedLogEntry entry: snapshot.getUnAppliedEntries()) {
             if (isMigratedPayload(entry)) {
@@ -129,7 +143,8 @@ class RaftActorRecoverySupport {
             // We may have just transitioned to disabled and have a snapshot containing state data and/or log
             // entries - we don't want to preserve these, only the server config and election term info.
 
-            snapshot = Snapshot.create(new byte[0], Collections.emptyList(), -1, -1, -1, -1,
+            snapshot = Snapshot.create(
+                    EmptyState.INSTANCE, Collections.emptyList(), -1, -1, -1, -1,
                     snapshot.getElectionTerm(), snapshot.getElectionVotedFor(), snapshot.getServerConfiguration());
         }
 
@@ -145,7 +160,9 @@ class RaftActorRecoverySupport {
         Stopwatch timer = Stopwatch.createStarted();
 
         // Apply the snapshot to the actors state
-        cohort.applyRecoverySnapshot(snapshot.getState());
+        if (!(snapshot.getState() instanceof EmptyState)) {
+            cohort.applyRecoverySnapshot(snapshot.getState());
+        }
 
         if (snapshot.getServerConfiguration() != null) {
             context.updatePeerIds(snapshot.getServerConfiguration());
@@ -274,7 +291,8 @@ class RaftActorRecoverySupport {
             // messages. Either way, we persist a snapshot and delete all the messages from the akka journal
             // to clean out unwanted messages.
 
-            Snapshot snapshot = Snapshot.create(new byte[0], Collections.<ReplicatedLogEntry>emptyList(),
+            Snapshot snapshot = Snapshot.create(
+                    EmptyState.INSTANCE, Collections.<ReplicatedLogEntry>emptyList(),
                     -1, -1, -1, -1,
                     context.getTermInformation().getCurrentTerm(), context.getTermInformation().getVotedFor(),
                     context.getPeerServerInfo(true));
index ad68726371621cf210f0ba1407955ca84672e9c7..a02b295dbc1ed7b39180d880c9b5eade21f78b54 100644 (file)
@@ -8,6 +8,12 @@
 package org.opendaylight.controller.cluster.raft;
 
 import akka.actor.ActorRef;
+import com.google.common.io.ByteSource;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.util.Optional;
+import javax.annotation.Nonnull;
+import org.opendaylight.controller.cluster.raft.persisted.Snapshot;
 
 /**
  * Interface for a class that participates in raft actor snapshotting.
@@ -21,13 +27,31 @@ public interface RaftActorSnapshotCohort {
      * created. The implementation should send a CaptureSnapshotReply to the given actor.
      *
      * @param actorRef the actor to which to respond
+     * @param installSnapshotStream Optional OutputStream that is present if the snapshot is to also be installed
+     *        on a follower. The implementation must serialize its state to the OutputStream and return the
+     *        installSnapshotStream instance in the CaptureSnapshotReply along with the snapshot State instance.
+     *        The snapshot State is serialized directly to the snapshot store while the OutputStream is used to send
+     *        the state data to follower(s) in chunks. The {@link #deserializeSnapshot} method is used to convert the
+     *        serialized data back to a State instance on the follower end. The serialization for snapshot install is
+     *        passed off so the cost of serialization is not charged to the raft actor's thread.
      */
-    void createSnapshot(ActorRef actorRef);
+    void createSnapshot(@Nonnull ActorRef actorRef, @Nonnull Optional<OutputStream> installSnapshotStream);
 
     /**
      * This method is called to apply a snapshot installed by the leader.
      *
-     * @param snapshotBytes a snapshot of the state of the actor
+     * @param snapshotState a snapshot of the state of the actor
      */
-    void applySnapshot(byte[] snapshotBytes);
+    void applySnapshot(@Nonnull Snapshot.State snapshotState);
+
+    /**
+     * This method is called to de-serialize snapshot data that was previously serialized via {@link #createSnapshot}
+     * to a State instance.
+     *
+     * @param snapshotBytes the ByteSource containing the serialized data
+     * @return the converted snapshot State
+     * @throws IOException if an error occurs accessing the ByteSource or de-serializing
+     */
+    @Nonnull
+    Snapshot.State deserializeSnapshot(@Nonnull ByteSource snapshotBytes) throws IOException;
 }
index 4a17becd5bc8e37ac09a6895846ff5afaa3b85c2..f119a1ecc4245633f7142920c322f9ec94271ca7 100644 (file)
@@ -12,6 +12,7 @@ import akka.persistence.SaveSnapshotFailure;
 import akka.persistence.SaveSnapshotSuccess;
 import com.google.common.annotations.VisibleForTesting;
 import java.util.Collections;
+import java.util.Optional;
 import java.util.concurrent.TimeUnit;
 import org.apache.commons.lang3.SerializationUtils;
 import org.opendaylight.controller.cluster.raft.base.messages.ApplySnapshot;
@@ -19,6 +20,8 @@ import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshot;
 import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshotReply;
 import org.opendaylight.controller.cluster.raft.client.messages.GetSnapshot;
 import org.opendaylight.controller.cluster.raft.client.messages.GetSnapshotReply;
+import org.opendaylight.controller.cluster.raft.persisted.EmptyState;
+import org.opendaylight.controller.cluster.raft.persisted.Snapshot;
 import org.slf4j.Logger;
 import scala.concurrent.duration.Duration;
 
@@ -46,8 +49,13 @@ class RaftActorSnapshotMessageSupport {
         this.cohort = cohort;
         this.log = context.getLogger();
 
-        context.getSnapshotManager().setCreateSnapshotRunnable(() -> cohort.createSnapshot(context.getActor()));
-        context.getSnapshotManager().setApplySnapshotConsumer(cohort::applySnapshot);
+        context.getSnapshotManager().setCreateSnapshotConsumer(
+            outputStream -> cohort.createSnapshot(context.getActor(), outputStream));
+        context.getSnapshotManager().setSnapshotCohort(cohort);
+    }
+
+    RaftActorSnapshotCohort getSnapshotCohort() {
+        return cohort;
     }
 
     boolean handleSnapshotMessage(Object message, ActorRef sender) {
@@ -58,7 +66,7 @@ class RaftActorSnapshotMessageSupport {
         } else if (message instanceof SaveSnapshotFailure) {
             onSaveSnapshotFailure((SaveSnapshotFailure) message);
         } else if (message instanceof CaptureSnapshotReply) {
-            onCaptureSnapshotReply(((CaptureSnapshotReply) message).getSnapshot());
+            onCaptureSnapshotReply((CaptureSnapshotReply) message);
         } else if (COMMIT_SNAPSHOT.equals(message)) {
             context.getSnapshotManager().commit(-1, -1);
         } else if (message instanceof GetSnapshot) {
@@ -70,11 +78,11 @@ class RaftActorSnapshotMessageSupport {
         return true;
     }
 
-    private void onCaptureSnapshotReply(byte[] snapshotBytes) {
-        log.debug("{}: CaptureSnapshotReply received by actor: snapshot size {}", context.getId(),
-                snapshotBytes.length);
+    private void onCaptureSnapshotReply(CaptureSnapshotReply reply) {
+        log.debug("{}: CaptureSnapshotReply received by actor", context.getId());
 
-        context.getSnapshotManager().persist(snapshotBytes, context.getTotalMemory());
+        context.getSnapshotManager().persist(reply.getSnapshotState(), reply.getInstallSnapshotStream(),
+                context.getTotalMemory());
     }
 
     private void onSaveSnapshotFailure(SaveSnapshotFailure saveSnapshotFailure) {
@@ -103,15 +111,16 @@ class RaftActorSnapshotMessageSupport {
 
         if (context.getPersistenceProvider().isRecoveryApplicable()) {
             CaptureSnapshot captureSnapshot = context.getSnapshotManager().newCaptureSnapshot(
-                    context.getReplicatedLog().last(), -1, false);
+                    context.getReplicatedLog().last(), -1);
 
             ActorRef snapshotReplyActor = context.actorOf(GetSnapshotReplyActor.props(captureSnapshot,
                     ImmutableElectionTerm.copyOf(context.getTermInformation()), sender,
                     snapshotReplyActorTimeout, context.getId(), context.getPeerServerInfo(true)));
 
-            cohort.createSnapshot(snapshotReplyActor);
+            cohort.createSnapshot(snapshotReplyActor, Optional.empty());
         } else {
-            Snapshot snapshot = Snapshot.create(new byte[0], Collections.<ReplicatedLogEntry>emptyList(),
+            Snapshot snapshot = Snapshot.create(
+                    EmptyState.INSTANCE, Collections.<ReplicatedLogEntry>emptyList(),
                     -1, -1, -1, -1,
                     context.getTermInformation().getCurrentTerm(), context.getTermInformation().getVotedFor(),
                     context.getPeerServerInfo(true));
index fe873340aa3b414d4f89f60354dabcede970b52c..7196fc5f12d82013c58f80248d82342d0f88aecb 100644 (file)
@@ -14,6 +14,7 @@ import java.util.List;
 import javax.annotation.Nonnull;
 import javax.annotation.Nullable;
 import org.opendaylight.controller.cluster.raft.persisted.DeleteEntries;
+import org.opendaylight.controller.cluster.raft.persisted.Snapshot;
 
 /**
  * Implementation of ReplicatedLog used by the RaftActor.
index 2677baff6da40d1e2caba4a7b257594e5233bb3b..93226ccab201aab0ed98214ae7e64251a98f0540 100644 (file)
@@ -17,7 +17,10 @@ import org.opendaylight.controller.cluster.raft.persisted.ServerConfigurationPay
  *
  * @author Moiz Raja
  * @author Thomas Pantelis
+ *
+ * @deprecated Use {@link org.opendaylight.controller.cluster.raft.persisted.Snapshot} instead.
  */
+@Deprecated
 public class Snapshot implements Serializable {
     private static final long serialVersionUID = -8298574936724056236L;
 
index 79f2ce9b4ca0fb2b0deca496eed67e069bc1a547..12508aebffedc10195ad6c3bd482a1905c23fa7a 100644 (file)
@@ -10,13 +10,21 @@ package org.opendaylight.controller.cluster.raft;
 
 import akka.persistence.SnapshotSelectionCriteria;
 import com.google.common.annotations.VisibleForTesting;
+import com.google.common.io.ByteSource;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.OutputStream;
 import java.util.List;
+import java.util.Optional;
 import java.util.function.Consumer;
+import javax.annotation.Nonnull;
 import org.opendaylight.controller.cluster.raft.base.messages.ApplySnapshot;
 import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshot;
 import org.opendaylight.controller.cluster.raft.base.messages.SendInstallSnapshot;
 import org.opendaylight.controller.cluster.raft.base.messages.SnapshotComplete;
 import org.opendaylight.controller.cluster.raft.behaviors.RaftActorBehavior;
+import org.opendaylight.controller.cluster.raft.persisted.EmptyState;
+import org.opendaylight.controller.cluster.raft.persisted.Snapshot;
 import org.slf4j.Logger;
 
 /**
@@ -48,10 +56,10 @@ public class SnapshotManager implements SnapshotState {
     private CaptureSnapshot captureSnapshot;
     private long lastSequenceNumber = -1;
 
-    private Runnable createSnapshotProcedure;
+    private Consumer<Optional<OutputStream>> createSnapshotProcedure;
 
     private ApplySnapshot applySnapshot;
-    private Consumer<byte[]> applySnapshotProcedure;
+    private RaftActorSnapshotCohort snapshotCohort = NoopRaftActorSnapshotCohort.INSTANCE;
 
     /**
      * Constructs an instance.
@@ -89,8 +97,9 @@ public class SnapshotManager implements SnapshotState {
     }
 
     @Override
-    public void persist(final byte[] snapshotBytes, final long totalMemory) {
-        currentState.persist(snapshotBytes, totalMemory);
+    public void persist(final Snapshot.State state, final Optional<OutputStream> installSnapshotStream,
+            final long totalMemory) {
+        currentState.persist(state, installSnapshotStream, totalMemory);
     }
 
     @Override
@@ -108,12 +117,17 @@ public class SnapshotManager implements SnapshotState {
         return currentState.trimLog(desiredTrimIndex);
     }
 
-    public void setCreateSnapshotRunnable(Runnable createSnapshotProcedure) {
+    void setCreateSnapshotConsumer(Consumer<Optional<OutputStream>> createSnapshotProcedure) {
         this.createSnapshotProcedure = createSnapshotProcedure;
     }
 
-    public void setApplySnapshotConsumer(Consumer<byte[]> applySnapshotProcedure) {
-        this.applySnapshotProcedure = applySnapshotProcedure;
+    void setSnapshotCohort(final RaftActorSnapshotCohort snapshotCohort) {
+        this.snapshotCohort = snapshotCohort;
+    }
+
+    @Nonnull
+    public Snapshot.State convertSnapshot(ByteSource snapshotBytes) throws IOException {
+        return snapshotCohort.deserializeSnapshot(snapshotBytes);
     }
 
     public long getLastSequenceNumber() {
@@ -138,11 +152,9 @@ public class SnapshotManager implements SnapshotState {
      *
      * @param lastLogEntry the last log entry for the snapshot.
      * @param replicatedToAllIndex the index of the last entry replicated to all followers.
-     * @param installSnapshotInitiated true if snapshot is initiated to install on a follower.
      * @return a new CaptureSnapshot instance.
      */
-    public CaptureSnapshot newCaptureSnapshot(ReplicatedLogEntry lastLogEntry, long replicatedToAllIndex,
-            boolean installSnapshotInitiated) {
+    public CaptureSnapshot newCaptureSnapshot(ReplicatedLogEntry lastLogEntry, long replicatedToAllIndex) {
         TermInformationReader lastAppliedTermInfoReader =
                 lastAppliedTermInformationReader.init(context.getReplicatedLog(), context.getLastApplied(),
                         lastLogEntry, hasFollowers());
@@ -169,7 +181,7 @@ public class SnapshotManager implements SnapshotState {
         }
 
         return new CaptureSnapshot(lastLogEntryIndex, lastLogEntryTerm, lastAppliedIndex, lastAppliedTerm,
-                newReplicatedToAllIndex, newReplicatedToAllTerm, unAppliedEntries, installSnapshotInitiated);
+                newReplicatedToAllIndex, newReplicatedToAllTerm, unAppliedEntries);
     }
 
     private class AbstractSnapshotState implements SnapshotState {
@@ -198,7 +210,8 @@ public class SnapshotManager implements SnapshotState {
         }
 
         @Override
-        public void persist(final byte[] snapshotBytes, final long totalMemory) {
+        public void persist(final Snapshot.State state, final Optional<OutputStream> installSnapshotStream,
+                final long totalMemory) {
             log.debug("persist should not be called in state {}", this);
         }
 
@@ -261,9 +274,11 @@ public class SnapshotManager implements SnapshotState {
 
         @SuppressWarnings("checkstyle:IllegalCatch")
         private boolean capture(ReplicatedLogEntry lastLogEntry, long replicatedToAllIndex, String targetFollower) {
-            captureSnapshot = newCaptureSnapshot(lastLogEntry, replicatedToAllIndex, targetFollower != null);
+            captureSnapshot = newCaptureSnapshot(lastLogEntry, replicatedToAllIndex);
 
-            if (captureSnapshot.isInstallSnapshotInitiated()) {
+            OutputStream installSnapshotStream = null;
+            if (targetFollower != null) {
+                installSnapshotStream = new ByteArrayOutputStream();
                 log.info("{}: Initiating snapshot capture {} to install on {}",
                         persistenceId(), captureSnapshot, targetFollower);
             } else {
@@ -277,7 +292,7 @@ public class SnapshotManager implements SnapshotState {
             SnapshotManager.this.currentState = CREATING;
 
             try {
-                createSnapshotProcedure.run();
+                createSnapshotProcedure.accept(Optional.ofNullable(installSnapshotStream));
             } catch (Exception e) {
                 SnapshotManager.this.currentState = IDLE;
                 log.error("Error creating snapshot", e);
@@ -325,11 +340,12 @@ public class SnapshotManager implements SnapshotState {
     private class Creating extends AbstractSnapshotState {
 
         @Override
-        public void persist(final byte[] snapshotBytes, final long totalMemory) {
+        public void persist(final Snapshot.State snapshotState, final Optional<OutputStream> installSnapshotStream,
+                final long totalMemory) {
             // create a snapshot object from the state provided and save it
             // when snapshot is saved async, SaveSnapshotSuccess is raised.
 
-            Snapshot snapshot = Snapshot.create(snapshotBytes,
+            Snapshot snapshot = Snapshot.create(snapshotState,
                     captureSnapshot.getUnAppliedEntries(),
                     captureSnapshot.getLastIndex(), captureSnapshot.getLastTerm(),
                     captureSnapshot.getLastAppliedIndex(), captureSnapshot.getLastAppliedTerm(),
@@ -393,10 +409,20 @@ public class SnapshotManager implements SnapshotState {
                     context.getId(), context.getReplicatedLog().getSnapshotIndex(),
                     context.getReplicatedLog().getSnapshotTerm());
 
-            if (context.getId().equals(currentBehavior.getLeaderId())
-                    && captureSnapshot.isInstallSnapshotInitiated()) {
-                // this would be call straight to the leader and won't initiate in serialization
-                currentBehavior.handleMessage(context.getActor(), new SendInstallSnapshot(snapshot));
+            if (installSnapshotStream.isPresent()) {
+                try {
+                    installSnapshotStream.get().close();
+                } catch (IOException e) {
+                    log.warn("Error closing install snapshot OutputStream", e);
+                }
+
+                if (context.getId().equals(currentBehavior.getLeaderId())) {
+                    ByteSource snapshotBytes = ByteSource.wrap(((ByteArrayOutputStream)installSnapshotStream.get())
+                            .toByteArray());
+
+                    // this would be call straight to the leader and won't initiate in serialization
+                    currentBehavior.handleMessage(context.getActor(), new SendInstallSnapshot(snapshot, snapshotBytes));
+                }
             }
 
             captureSnapshot = null;
@@ -431,8 +457,8 @@ public class SnapshotManager implements SnapshotState {
                         context.updatePeerIds(snapshot.getServerConfiguration());
                     }
 
-                    if (snapshot.getState().length > 0 ) {
-                        applySnapshotProcedure.accept(snapshot.getState());
+                    if (!(snapshot.getState() instanceof EmptyState)) {
+                        snapshotCohort.applySnapshot(snapshot.getState());
                     }
 
                     applySnapshot.getCallback().onSuccess();
index e423737706bc0d42055d388bb72b7f4a7890256b..0a702741d8894ceb46386fba63d28ca39e1e51af 100644 (file)
@@ -8,7 +8,10 @@
 
 package org.opendaylight.controller.cluster.raft;
 
+import java.io.OutputStream;
+import java.util.Optional;
 import org.opendaylight.controller.cluster.raft.base.messages.ApplySnapshot;
+import org.opendaylight.controller.cluster.raft.persisted.Snapshot;
 
 /**
  * Interface for a snapshot phase state.
@@ -53,10 +56,12 @@ public interface SnapshotState {
     /**
      * Persists a snapshot.
      *
-     * @param snapshotBytes the snapshot bytes
+     * @param snapshotState the snapshot State
+     * @param installSnapshotStream Optional OutputStream that is present if the snapshot is to also be installed
+     *        on a follower.
      * @param totalMemory the total memory threshold
      */
-    void persist(byte[] snapshotBytes, long totalMemory);
+    void persist(Snapshot.State snapshotState, Optional<OutputStream> installSnapshotStream, long totalMemory);
 
     /**
      * Commit the snapshot by trimming the log.
index 9fb5554abcdf886bcf86567c3954ee31dfa57667..9cf2a3f6c125d4defa774f39b33786cd0813b49d 100644 (file)
@@ -10,7 +10,7 @@ package org.opendaylight.controller.cluster.raft.base.messages;
 
 import com.google.common.base.Preconditions;
 import javax.annotation.Nonnull;
-import org.opendaylight.controller.cluster.raft.Snapshot;
+import org.opendaylight.controller.cluster.raft.persisted.Snapshot;
 
 /**
  * Internal message, issued by follower to its actor.
index 14bd3a0af4c7cbc42304605429057188dce11f18..2173534a5897dea6d4728715430be4fab2a291a8 100644 (file)
@@ -17,25 +17,17 @@ public class CaptureSnapshot {
     private final long lastAppliedTerm;
     private final long lastIndex;
     private final long lastTerm;
-    private final boolean installSnapshotInitiated;
     private final long replicatedToAllIndex;
     private final long replicatedToAllTerm;
     private final List<ReplicatedLogEntry> unAppliedEntries;
 
-    public CaptureSnapshot(long lastIndex, long lastTerm, long lastAppliedIndex, long lastAppliedTerm,
-            long replicatedToAllIndex, long replicatedToAllTerm, List<ReplicatedLogEntry> unAppliedEntries) {
-        this(lastIndex, lastTerm, lastAppliedIndex, lastAppliedTerm, replicatedToAllIndex, replicatedToAllTerm,
-                unAppliedEntries, false);
-    }
-
     public CaptureSnapshot(long lastIndex, long lastTerm, long lastAppliedIndex,
             long lastAppliedTerm, long replicatedToAllIndex, long replicatedToAllTerm,
-            List<ReplicatedLogEntry> unAppliedEntries, boolean installSnapshotInitiated) {
+            List<ReplicatedLogEntry> unAppliedEntries) {
         this.lastIndex = lastIndex;
         this.lastTerm = lastTerm;
         this.lastAppliedIndex = lastAppliedIndex;
         this.lastAppliedTerm = lastAppliedTerm;
-        this.installSnapshotInitiated = installSnapshotInitiated;
         this.replicatedToAllIndex = replicatedToAllIndex;
         this.replicatedToAllTerm = replicatedToAllTerm;
         this.unAppliedEntries = unAppliedEntries != null ? unAppliedEntries :
@@ -58,10 +50,6 @@ public class CaptureSnapshot {
         return lastTerm;
     }
 
-    public boolean isInstallSnapshotInitiated() {
-        return installSnapshotInitiated;
-    }
-
     public long getReplicatedToAllIndex() {
         return replicatedToAllIndex;
     }
@@ -79,7 +67,7 @@ public class CaptureSnapshot {
         StringBuilder builder = new StringBuilder();
         builder.append("CaptureSnapshot [lastAppliedIndex=").append(lastAppliedIndex).append(", lastAppliedTerm=")
                 .append(lastAppliedTerm).append(", lastIndex=").append(lastIndex).append(", lastTerm=")
-                .append(lastTerm).append(", installSnapshotInitiated=").append(installSnapshotInitiated)
+                .append(lastTerm).append(", installSnapshotInitiated=")
                 .append(", replicatedToAllIndex=").append(replicatedToAllIndex).append(", replicatedToAllTerm=")
                 .append(replicatedToAllTerm).append(", unAppliedEntries size=")
                 .append(unAppliedEntries.size()).append("]");
index 1e573014a9ed6596ae6af84df0f724a9e730c34c..cc981e5711ae5e2e356adf693dded5abe126e4fb 100644 (file)
@@ -7,22 +7,29 @@
  */
 package org.opendaylight.controller.cluster.raft.base.messages;
 
-import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
+import com.google.common.base.Preconditions;
+import java.io.OutputStream;
+import java.util.Optional;
+import javax.annotation.Nonnull;
+import org.opendaylight.controller.cluster.raft.persisted.Snapshot;
 
 public class CaptureSnapshotReply {
-    private final byte [] snapshot;
+    private final Snapshot.State snapshotState;
+    private final Optional<OutputStream> installSnapshotStream;
 
-    @SuppressFBWarnings(value = "EI_EXPOSE_REP2", justification = "Stores a reference to an externally mutable byte[] "
-            + "object but this is OK since this class is merely a DTO and does not process byte[] internally. "
-            + "Also it would be inefficient to create a copy as the byte[] could be large.")
-    public CaptureSnapshotReply(byte [] snapshot) {
-        this.snapshot = snapshot;
+    public CaptureSnapshotReply(@Nonnull final Snapshot.State snapshotState,
+            @Nonnull final Optional<OutputStream> installSnapshotStream) {
+        this.snapshotState = Preconditions.checkNotNull(snapshotState);
+        this.installSnapshotStream = Preconditions.checkNotNull(installSnapshotStream);
     }
 
-    @SuppressFBWarnings(value = "EI_EXPOSE_REP", justification = "Exposes a mutable object stored in a field but "
-            + "this is OK since this class is merely a DTO and does not process the byte[] internally. "
-            + "Also it would be inefficient to create a return copy as the byte[] could be large.")
-    public byte [] getSnapshot() {
-        return snapshot;
+    @Nonnull
+    public Snapshot.State getSnapshotState() {
+        return snapshotState;
+    }
+
+    @Nonnull
+    public Optional<OutputStream> getInstallSnapshotStream() {
+        return installSnapshotStream;
     }
 }
index de33b8c95b1ad0679d6a2af3c37e880dca099efa..e8d58f253afe09a27cc79c07e306c258e1b35929 100644 (file)
@@ -9,8 +9,9 @@
 package org.opendaylight.controller.cluster.raft.base.messages;
 
 import com.google.common.base.Preconditions;
+import com.google.common.io.ByteSource;
 import javax.annotation.Nonnull;
-import org.opendaylight.controller.cluster.raft.Snapshot;
+import org.opendaylight.controller.cluster.raft.persisted.Snapshot;
 
 /**
  * Internal message sent from the SnapshotManager to its associated leader when a snapshot capture is complete to
@@ -18,13 +19,19 @@ import org.opendaylight.controller.cluster.raft.Snapshot;
  */
 public final class SendInstallSnapshot {
     private final Snapshot snapshot;
+    private final ByteSource snapshotBytes;
 
-    public SendInstallSnapshot(@Nonnull Snapshot snapshot) {
+    public SendInstallSnapshot(@Nonnull Snapshot snapshot, @Nonnull ByteSource snapshotBytes) {
         this.snapshot = Preconditions.checkNotNull(snapshot);
+        this.snapshotBytes = Preconditions.checkNotNull(snapshotBytes);
     }
 
     @Nonnull
     public Snapshot getSnapshot() {
         return snapshot;
     }
+
+    public ByteSource getSnapshotBytes() {
+        return snapshotBytes;
+    }
 }
index 10c1a156f9cbbdb528076ad70fa6ecbff48052dc..548b920fe771f183b904e2d5e2e65d7001cb746e 100644 (file)
@@ -14,7 +14,9 @@ import akka.actor.Cancellable;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Optional;
 import com.google.common.base.Preconditions;
+import com.google.common.io.ByteSource;
 import com.google.protobuf.ByteString;
+import java.io.IOException;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
@@ -33,7 +35,6 @@ import org.opendaylight.controller.cluster.raft.PeerInfo;
 import org.opendaylight.controller.cluster.raft.RaftActorContext;
 import org.opendaylight.controller.cluster.raft.RaftState;
 import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
-import org.opendaylight.controller.cluster.raft.Snapshot;
 import org.opendaylight.controller.cluster.raft.VotingState;
 import org.opendaylight.controller.cluster.raft.base.messages.CheckConsensusReached;
 import org.opendaylight.controller.cluster.raft.base.messages.Replicate;
@@ -47,6 +48,7 @@ import org.opendaylight.controller.cluster.raft.messages.RaftRPC;
 import org.opendaylight.controller.cluster.raft.messages.RequestVoteReply;
 import org.opendaylight.controller.cluster.raft.messages.UnInitializedFollowerSnapshotReply;
 import org.opendaylight.controller.cluster.raft.persisted.ServerConfigurationPayload;
+import org.opendaylight.controller.cluster.raft.persisted.Snapshot;
 import scala.concurrent.duration.FiniteDuration;
 
 /**
@@ -83,7 +85,7 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior {
     private final Queue<ClientRequestTracker> trackers = new LinkedList<>();
 
     private Cancellable heartbeatSchedule = null;
-    private Optional<SnapshotHolder> snapshot = Optional.absent();
+    private Optional<SnapshotHolder> snapshotHolder = Optional.absent();
     private int minReplicationCount;
 
     protected AbstractLeader(RaftActorContext context, RaftState state,
@@ -92,7 +94,7 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior {
 
         if (initializeFromLeader != null) {
             followerToLog.putAll(initializeFromLeader.followerToLog);
-            snapshot = initializeFromLeader.snapshot;
+            snapshotHolder = initializeFromLeader.snapshotHolder;
             trackers.addAll(initializeFromLeader.trackers);
         } else {
             for (PeerInfo peerInfo: context.getPeers()) {
@@ -165,17 +167,13 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior {
     }
 
     @VisibleForTesting
-    void setSnapshot(@Nullable Snapshot snapshot) {
-        if (snapshot != null) {
-            this.snapshot = Optional.of(new SnapshotHolder(snapshot));
-        } else {
-            this.snapshot = Optional.absent();
-        }
+    void setSnapshot(@Nullable SnapshotHolder snapshotHolder) {
+        this.snapshotHolder = Optional.fromNullable(snapshotHolder);
     }
 
     @VisibleForTesting
     boolean hasSnapshot() {
-        return snapshot.isPresent();
+        return snapshotHolder.isPresent();
     }
 
     @Override
@@ -451,8 +449,8 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior {
             sendHeartBeat();
             scheduleHeartBeat(context.getConfigParams().getHeartBeatInterval());
         } else if (message instanceof SendInstallSnapshot) {
-            // received from RaftActor
-            setSnapshot(((SendInstallSnapshot) message).getSnapshot());
+            SendInstallSnapshot sendInstallSnapshot = (SendInstallSnapshot) message;
+            setSnapshot(new SnapshotHolder(sendInstallSnapshot.getSnapshot(), sendInstallSnapshot.getSnapshotBytes()));
             sendInstallSnapshot();
         } else if (message instanceof Replicate) {
             replicate((Replicate) message);
@@ -497,7 +495,7 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior {
                             + " Setting nextIndex: {}", logName(), reply.getChunkIndex(), followerId,
                             context.getReplicatedLog().getSnapshotIndex() + 1);
 
-                    long followerMatchIndex = snapshot.get().getLastIncludedIndex();
+                    long followerMatchIndex = snapshotHolder.get().getLastIncludedIndex();
                     followerLogInformation.setMatchIndex(followerMatchIndex);
                     followerLogInformation.setNextIndex(followerMatchIndex + 1);
                     followerLogInformation.clearLeaderInstallSnapshotState();
@@ -730,7 +728,7 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior {
      */
     public boolean initiateCaptureSnapshot(String followerId) {
         FollowerLogInformation followerLogInfo = followerToLog.get(followerId);
-        if (snapshot.isPresent()) {
+        if (snapshotHolder.isPresent()) {
             // If a snapshot is present in the memory, most likely another install is in progress no need to capture
             // snapshot. This could happen if another follower needs an install when one is going on.
             final ActorSelection followerActor = context.getPeerActorSelection(followerId);
@@ -783,7 +781,7 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior {
      *  InstallSnapshot should qualify as a heartbeat too.
      */
     private void sendSnapshotChunk(ActorSelection followerActor, FollowerLogInformation followerLogInfo) {
-        if (snapshot.isPresent()) {
+        if (snapshotHolder.isPresent()) {
             LeaderInstallSnapshotState installSnapshotState = followerLogInfo.getInstallSnapshotState();
             if (installSnapshotState == null) {
                 installSnapshotState = new LeaderInstallSnapshotState(context.getConfigParams().getSnapshotChunkSize(),
@@ -792,7 +790,7 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior {
             }
 
             // Ensure the snapshot bytes are set - this is a no-op.
-            installSnapshotState.setSnapshotBytes(snapshot.get().getSnapshotBytes());
+            installSnapshotState.setSnapshotBytes(snapshotHolder.get().getSnapshotBytes());
 
             byte[] nextSnapshotChunk = installSnapshotState.getNextChunk();
 
@@ -807,8 +805,8 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior {
 
             followerActor.tell(
                 new InstallSnapshot(currentTerm(), context.getId(),
-                    snapshot.get().getLastIncludedIndex(),
-                    snapshot.get().getLastIncludedTerm(),
+                    snapshotHolder.get().getLastIncludedIndex(),
+                    snapshotHolder.get().getLastIncludedTerm(),
                     nextSnapshotChunk,
                     nextChunkIndex,
                     installSnapshotState.getTotalChunks(),
@@ -911,15 +909,19 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior {
         return followerToLog.size();
     }
 
-    private static class SnapshotHolder {
+    static class SnapshotHolder {
         private final long lastIncludedTerm;
         private final long lastIncludedIndex;
         private final ByteString snapshotBytes;
 
-        SnapshotHolder(Snapshot snapshot) {
+        SnapshotHolder(Snapshot snapshot, ByteSource snapshotBytes) {
             this.lastIncludedTerm = snapshot.getLastAppliedTerm();
             this.lastIncludedIndex = snapshot.getLastAppliedIndex();
-            this.snapshotBytes = ByteString.copyFrom(snapshot.getState());
+            try {
+                this.snapshotBytes = ByteString.copyFrom(snapshotBytes.read());
+            } catch (IOException e) {
+                throw new RuntimeException("Error reading state", e);
+            }
         }
 
         long getLastIncludedTerm() {
index 6f107e9ae61ee060071c842810cb82c3e7f92f85..727d6a3131682281ea9825264dc9ac380d43597b 100644 (file)
@@ -18,6 +18,8 @@ import akka.cluster.MemberStatus;
 import akka.japi.Procedure;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Stopwatch;
+import com.google.common.io.ByteSource;
+import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Optional;
 import java.util.Set;
@@ -27,7 +29,6 @@ import javax.annotation.Nullable;
 import org.opendaylight.controller.cluster.raft.RaftActorContext;
 import org.opendaylight.controller.cluster.raft.RaftState;
 import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
-import org.opendaylight.controller.cluster.raft.Snapshot;
 import org.opendaylight.controller.cluster.raft.base.messages.ApplySnapshot;
 import org.opendaylight.controller.cluster.raft.base.messages.ElectionTimeout;
 import org.opendaylight.controller.cluster.raft.base.messages.TimeoutNow;
@@ -39,6 +40,7 @@ import org.opendaylight.controller.cluster.raft.messages.RaftRPC;
 import org.opendaylight.controller.cluster.raft.messages.RequestVote;
 import org.opendaylight.controller.cluster.raft.messages.RequestVoteReply;
 import org.opendaylight.controller.cluster.raft.persisted.ServerConfigurationPayload;
+import org.opendaylight.controller.cluster.raft.persisted.Snapshot;
 
 /**
  * The behavior of a RaftActor in the Follower raft state.
@@ -528,7 +530,9 @@ public class Follower extends AbstractRaftActorBehavior {
 
             if (snapshotTracker.addChunk(installSnapshot.getChunkIndex(), installSnapshot.getData(),
                     installSnapshot.getLastChunkHashCode())) {
-                Snapshot snapshot = Snapshot.create(snapshotTracker.getSnapshot(),
+                ByteSource snapshotBytes = ByteSource.wrap(snapshotTracker.getSnapshot());
+                Snapshot snapshot = Snapshot.create(
+                        context.getSnapshotManager().convertSnapshot(snapshotBytes),
                         new ArrayList<>(),
                         installSnapshot.getLastIncludedIndex(),
                         installSnapshot.getLastIncludedTerm(),
@@ -560,7 +564,7 @@ public class Follower extends AbstractRaftActorBehavior {
 
                 sender.tell(reply, actor());
             }
-        } catch (SnapshotTracker.InvalidChunkException e) {
+        } catch (SnapshotTracker.InvalidChunkException | IOException e) {
             log.debug("{}: Exception in InstallSnapshot of follower", logName(), e);
 
             sender.tell(new InstallSnapshotReply(currentTerm(), context.getId(),
diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/persisted/EmptyState.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/persisted/EmptyState.java
new file mode 100644 (file)
index 0000000..40c90fb
--- /dev/null
@@ -0,0 +1,27 @@
+/*
+ * Copyright (c) 2017 Brocade Communications Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.raft.persisted;
+
+/**
+ * Empty Snapshot State implementation.
+ *
+ * @author Thomas Pantelis
+ */
+public class EmptyState implements Snapshot.State {
+    private static final long serialVersionUID = 1L;
+
+    public static final EmptyState INSTANCE = new EmptyState();
+
+    private EmptyState() {
+    }
+
+    @SuppressWarnings("static-method")
+    private Object readResolve() {
+        return INSTANCE;
+    }
+}
diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/persisted/Snapshot.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/persisted/Snapshot.java
new file mode 100644 (file)
index 0000000..1763ad3
--- /dev/null
@@ -0,0 +1,180 @@
+/*
+ * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.raft.persisted;
+
+import java.io.Externalizable;
+import java.io.IOException;
+import java.io.ObjectInput;
+import java.io.ObjectOutput;
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.List;
+import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
+import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload;
+
+/**
+ * Represents a snapshot of the raft data.
+ *
+ * @author Thomas Pantelis
+ */
+public class Snapshot implements Serializable {
+
+    /**
+     * Implementations of this interface are used as the state payload for a snapshot.
+     *
+     * @author Thomas Pantelis
+     */
+    public interface State extends Serializable {
+    }
+
+    private static final class Proxy implements Externalizable {
+        private static final long serialVersionUID = 1L;
+
+        private Snapshot snapshot;
+
+        // checkstyle flags the public modifier as redundant which really doesn't make sense since it clearly isn't
+        // redundant. It is explicitly needed for Java serialization to be able to create instances via reflection.
+        @SuppressWarnings("checkstyle:RedundantModifier")
+        public Proxy() {
+            // For Externalizable
+        }
+
+        Proxy(final Snapshot snapshot) {
+            this.snapshot = snapshot;
+        }
+
+        @Override
+        public void writeExternal(final ObjectOutput out) throws IOException {
+            out.writeLong(snapshot.lastIndex);
+            out.writeLong(snapshot.lastTerm);
+            out.writeLong(snapshot.lastAppliedIndex);
+            out.writeLong(snapshot.lastAppliedTerm);
+            out.writeLong(snapshot.electionTerm);
+            out.writeObject(snapshot.electionVotedFor);
+            out.writeObject(snapshot.serverConfig);
+
+            out.writeInt(snapshot.unAppliedEntries.size());
+            for (ReplicatedLogEntry e: snapshot.unAppliedEntries) {
+                out.writeLong(e.getIndex());
+                out.writeLong(e.getTerm());
+                out.writeObject(e.getData());
+            }
+
+            out.writeObject(snapshot.state);
+        }
+
+        @Override
+        public void readExternal(final ObjectInput in) throws IOException, ClassNotFoundException {
+            long lastIndex = in.readLong();
+            long lastTerm = in.readLong();
+            long lastAppliedIndex = in.readLong();
+            long lastAppliedTerm = in.readLong();
+            long electionTerm = in.readLong();
+            String electionVotedFor = (String) in.readObject();
+            ServerConfigurationPayload serverConfig = (ServerConfigurationPayload) in.readObject();
+
+            int size = in.readInt();
+            List<ReplicatedLogEntry> unAppliedEntries = new ArrayList<>(size);
+            for (int i = 0; i < size; i++) {
+                unAppliedEntries.add(new SimpleReplicatedLogEntry(in.readLong(), in.readLong(),
+                        (Payload) in.readObject()));
+            }
+
+            State state = (State) in.readObject();
+
+            snapshot = Snapshot.create(state, unAppliedEntries, lastIndex, lastTerm, lastAppliedIndex, lastAppliedTerm,
+                    electionTerm, electionVotedFor, serverConfig);
+        }
+
+        private Object readResolve() {
+            return snapshot;
+        }
+    }
+
+    private static final long serialVersionUID = 1L;
+
+    private final State state;
+    private final List<ReplicatedLogEntry> unAppliedEntries;
+    private final long lastIndex;
+    private final long lastTerm;
+    private final long lastAppliedIndex;
+    private final long lastAppliedTerm;
+    private final long electionTerm;
+    private final String electionVotedFor;
+    private final ServerConfigurationPayload serverConfig;
+
+    private Snapshot(State state, List<ReplicatedLogEntry> unAppliedEntries, long lastIndex, long lastTerm,
+            long lastAppliedIndex, long lastAppliedTerm, long electionTerm, String electionVotedFor,
+            ServerConfigurationPayload serverConfig) {
+        this.state = state;
+        this.unAppliedEntries = unAppliedEntries;
+        this.lastIndex = lastIndex;
+        this.lastTerm = lastTerm;
+        this.lastAppliedIndex = lastAppliedIndex;
+        this.lastAppliedTerm = lastAppliedTerm;
+        this.electionTerm = electionTerm;
+        this.electionVotedFor = electionVotedFor;
+        this.serverConfig = serverConfig;
+    }
+
+    public static Snapshot create(State state, List<ReplicatedLogEntry> entries, long lastIndex, long lastTerm,
+            long lastAppliedIndex, long lastAppliedTerm, long electionTerm, String electionVotedFor,
+            ServerConfigurationPayload serverConfig) {
+        return new Snapshot(state, entries, lastIndex, lastTerm, lastAppliedIndex, lastAppliedTerm,
+                electionTerm, electionVotedFor, serverConfig);
+    }
+
+    public State getState() {
+        return state;
+    }
+
+    public List<ReplicatedLogEntry> getUnAppliedEntries() {
+        return unAppliedEntries;
+    }
+
+    public long getLastTerm() {
+        return lastTerm;
+    }
+
+    public long getLastAppliedIndex() {
+        return lastAppliedIndex;
+    }
+
+    public long getLastAppliedTerm() {
+        return lastAppliedTerm;
+    }
+
+    public long getLastIndex() {
+        return this.lastIndex;
+    }
+
+    public long getElectionTerm() {
+        return electionTerm;
+    }
+
+    public String getElectionVotedFor() {
+        return electionVotedFor;
+    }
+
+    public ServerConfigurationPayload getServerConfiguration() {
+        return serverConfig;
+    }
+
+    @SuppressWarnings("static-method")
+    private Object writeReplace() {
+        return new Proxy(this);
+    }
+
+    @Override
+    public String toString() {
+        return "Snapshot [lastIndex=" + lastIndex + ", lastTerm=" + lastTerm + ", lastAppliedIndex=" + lastAppliedIndex
+                + ", lastAppliedTerm=" + lastAppliedTerm + ", unAppliedEntries size=" + unAppliedEntries.size()
+                + ", state=" + state + ", electionTerm=" + electionTerm + ", electionVotedFor="
+                + electionVotedFor + ", ServerConfigPayload="  + serverConfig + "]";
+    }
+}
index 4a053137a5a7d387eaf3210f11a55d47850a7a57..eb5c9e5e91974e7b042e44867a9fbc69ec18bd47 100644 (file)
@@ -20,18 +20,21 @@ import akka.testkit.JavaTestKit;
 import akka.testkit.TestActorRef;
 import akka.util.Timeout;
 import com.google.common.base.Stopwatch;
-import com.google.common.base.Throwables;
 import com.google.common.collect.ImmutableMap;
 import com.google.common.util.concurrent.Uninterruptibles;
+import java.io.OutputStream;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
 import java.util.Map;
+import java.util.Optional;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.TimeUnit;
 import java.util.function.Consumer;
 import java.util.function.Predicate;
+import org.apache.commons.lang3.SerializationUtils;
 import org.junit.After;
+import org.opendaylight.controller.cluster.raft.MockRaftActor.MockSnapshotState;
 import org.opendaylight.controller.cluster.raft.MockRaftActorContext.MockPayload;
 import org.opendaylight.controller.cluster.raft.base.messages.ApplyState;
 import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshotReply;
@@ -41,6 +44,7 @@ import org.opendaylight.controller.cluster.raft.client.messages.GetOnDemandRaftS
 import org.opendaylight.controller.cluster.raft.client.messages.OnDemandRaftState;
 import org.opendaylight.controller.cluster.raft.persisted.ApplyJournalEntries;
 import org.opendaylight.controller.cluster.raft.persisted.ServerConfigurationPayload;
+import org.opendaylight.controller.cluster.raft.persisted.Snapshot;
 import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload;
 import org.opendaylight.controller.cluster.raft.utils.InMemoryJournal;
 import org.opendaylight.controller.cluster.raft.utils.InMemorySnapshotStore;
@@ -149,12 +153,13 @@ public abstract class AbstractRaftActorIntegrationTest extends AbstractActorTest
 
         @Override
         @SuppressWarnings("checkstyle:IllegalCatch")
-        public void createSnapshot(ActorRef actorRef) {
-            try {
-                actorRef.tell(new CaptureSnapshotReply(RaftActorTest.fromObject(getState()).toByteArray()), actorRef);
-            } catch (Exception e) {
-                Throwables.propagate(e);
+        public void createSnapshot(ActorRef actorRef, Optional<OutputStream> installSnapshotStream) {
+            MockSnapshotState snapshotState = new MockSnapshotState(new ArrayList<>(getState()));
+            if (installSnapshotStream.isPresent()) {
+                SerializationUtils.serialize(snapshotState, installSnapshotStream.get());
             }
+
+            actorRef.tell(new CaptureSnapshotReply(snapshotState, installSnapshotStream), actorRef);
         }
 
         public ActorRef collectorActor() {
@@ -291,7 +296,7 @@ public abstract class AbstractRaftActorIntegrationTest extends AbstractActorTest
         assertEquals(prefix + " Snapshot getLastTerm", lastTerm, snapshot.getLastTerm());
         assertEquals(prefix + " Snapshot getLastIndex", lastIndex, snapshot.getLastIndex());
 
-        List<Object> actualState = (List<Object>)MockRaftActor.toObject(snapshot.getState());
+        List<Object> actualState = ((MockSnapshotState)snapshot.getState()).getState();
         assertEquals(String.format("%s Snapshot getState size. Expected %s: . Actual: %s", prefix, expSnapshotState,
                 actualState), expSnapshotState.size(), actualState.size());
         for (int i = 0; i < expSnapshotState.size(); i++) {
index 9ce89093fd6ef36403d09dc539814a65f94d6feb..460dd4a445a8306731499e205b1207c563e1e110 100644 (file)
@@ -15,20 +15,30 @@ import akka.testkit.JavaTestKit;
 import akka.testkit.TestActorRef;
 import com.google.common.base.Optional;
 import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Sets;
+import com.google.common.io.ByteSource;
 import com.google.common.util.concurrent.Uninterruptibles;
+import java.io.OutputStream;
+import java.io.Serializable;
 import java.util.Arrays;
 import java.util.HashSet;
 import java.util.List;
 import java.util.concurrent.TimeUnit;
 import java.util.function.Consumer;
+import org.apache.commons.lang3.SerializationUtils;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
+import org.opendaylight.controller.cluster.raft.MockRaftActor.MockSnapshotState;
+import org.opendaylight.controller.cluster.raft.MockRaftActorContext.MockPayload;
 import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshotReply;
 import org.opendaylight.controller.cluster.raft.persisted.ApplyJournalEntries;
+import org.opendaylight.controller.cluster.raft.persisted.ByteState;
 import org.opendaylight.controller.cluster.raft.persisted.ServerConfigurationPayload;
 import org.opendaylight.controller.cluster.raft.persisted.ServerInfo;
 import org.opendaylight.controller.cluster.raft.persisted.SimpleReplicatedLogEntry;
+import org.opendaylight.controller.cluster.raft.persisted.Snapshot;
+import org.opendaylight.controller.cluster.raft.persisted.Snapshot.State;
 import org.opendaylight.controller.cluster.raft.persisted.UpdateElectionTerm;
 import org.opendaylight.controller.cluster.raft.policy.DisableElectionsRaftPolicy;
 import org.opendaylight.controller.cluster.raft.utils.InMemoryJournal;
@@ -103,7 +113,7 @@ public class MigratedMessagesTest extends AbstractActorTest {
         doTestSnapshotAfterStartupWithMigratedMessage(persistenceId, true, snapshot -> {
             assertEquals("getElectionVotedFor", persistenceId, snapshot.getElectionVotedFor());
             assertEquals("getElectionTerm", 5, snapshot.getElectionTerm());
-        });
+        }, ByteState.empty());
 
         TEST_LOG.info("testSnapshotAfterStartupWithMigratedUpdateElectionTermAndPersistenceEnabled ending");
     }
@@ -122,7 +132,7 @@ public class MigratedMessagesTest extends AbstractActorTest {
         doTestSnapshotAfterStartupWithMigratedMessage(persistenceId, false, snapshot -> {
             assertEquals("getElectionVotedFor", persistenceId, snapshot.getElectionVotedFor());
             assertEquals("getElectionTerm", 5, snapshot.getElectionTerm());
-        });
+        }, ByteState.empty());
 
         TEST_LOG.info("testSnapshotAfterStartupWithMigratedUpdateElectionTermAndPersistenceDisabled ending");
     }
@@ -145,7 +155,7 @@ public class MigratedMessagesTest extends AbstractActorTest {
             assertEquals("getLastAppliedTerm", 1, snapshot.getLastAppliedTerm());
             assertEquals("getLastIndex", 0, snapshot.getLastIndex());
             assertEquals("getLastTerm", 1, snapshot.getLastTerm());
-        });
+        }, ByteState.empty());
 
         TEST_LOG.info("testSnapshotAfterStartupWithMigratedApplyJournalEntries ending");
     }
@@ -164,12 +174,17 @@ public class MigratedMessagesTest extends AbstractActorTest {
 
         RaftActorSnapshotCohort snapshotCohort = new RaftActorSnapshotCohort() {
             @Override
-            public void createSnapshot(ActorRef actorRef) {
-                actorRef.tell(new CaptureSnapshotReply(new byte[0]), actorRef);
+            public void createSnapshot(ActorRef actorRef, java.util.Optional<OutputStream> installSnapshotStream) {
+                actorRef.tell(new CaptureSnapshotReply(ByteState.empty(), installSnapshotStream), actorRef);
             }
 
             @Override
-            public void applySnapshot(byte[] snapshotBytes) {
+            public void applySnapshot(Snapshot.State snapshotState) {
+            }
+
+            @Override
+            public State deserializeSnapshot(ByteSource snapshotBytes) {
+                throw new UnsupportedOperationException();
             }
         };
 
@@ -204,7 +219,7 @@ public class MigratedMessagesTest extends AbstractActorTest {
             assertEquals("Unapplied entry term", 1, snapshot.getUnAppliedEntries().get(0).getTerm());
             assertEquals("Unapplied entry index", 0, snapshot.getUnAppliedEntries().get(0).getIndex());
             assertEquals("Unapplied entry data", expPayload, snapshot.getUnAppliedEntries().get(0).getData());
-        });
+        }, ByteState.empty());
 
         TEST_LOG.info("testSnapshotAfterStartupWithMigratedReplicatedLogEntry ending");
     }
@@ -231,14 +246,53 @@ public class MigratedMessagesTest extends AbstractActorTest {
                 assertEquals("getElectionTerm", 1, snapshot.getElectionTerm());
                 assertEquals("getServerConfiguration", new HashSet<>(expectedServerConfig.getServerConfig()),
                         new HashSet<>(snapshot.getServerConfiguration().getServerConfig()));
-            });
+            }, ByteState.empty());
 
         return actor;
     }
 
+    @Test
+    public void testSnapshotAfterStartupWithMigratedSnapshot() throws Exception {
+        TEST_LOG.info("testSnapshotAfterStartupWithMigratedSnapshot starting");
+
+        String persistenceId = factory.generateActorId("test-actor-");
+
+        List<Object> snapshotData = Arrays.asList(new MockPayload("1"));
+        final MockSnapshotState snapshotState = new MockSnapshotState(snapshotData);
+
+        org.opendaylight.controller.cluster.raft.Snapshot legacy = org.opendaylight.controller.cluster.raft.Snapshot
+            .create(SerializationUtils.serialize((Serializable) snapshotData),
+                Arrays.asList(new SimpleReplicatedLogEntry(6, 2, new MockPayload("payload"))),
+                6, 2, 5, 1, 3, "member-1", new ServerConfigurationPayload(Arrays.asList(
+                        new ServerInfo(persistenceId, true), new ServerInfo("2", false))));
+        InMemorySnapshotStore.addSnapshot(persistenceId, legacy);
+
+        doTestSnapshotAfterStartupWithMigratedMessage(persistenceId, true, snapshot -> {
+            assertEquals("getLastIndex", legacy.getLastIndex(), snapshot.getLastIndex());
+            assertEquals("getLastTerm", legacy.getLastTerm(), snapshot.getLastTerm());
+            assertEquals("getLastAppliedIndex", legacy.getLastAppliedIndex(), snapshot.getLastAppliedIndex());
+            assertEquals("getLastAppliedTerm", legacy.getLastAppliedTerm(), snapshot.getLastAppliedTerm());
+            assertEquals("getState", snapshotState, snapshot.getState());
+            assertEquals("Unapplied entries size", legacy.getUnAppliedEntries().size(),
+                    snapshot.getUnAppliedEntries().size());
+            assertEquals("Unapplied entry term", legacy.getUnAppliedEntries().get(0).getTerm(),
+                    snapshot.getUnAppliedEntries().get(0).getTerm());
+            assertEquals("Unapplied entry index", legacy.getUnAppliedEntries().get(0).getIndex(),
+                    snapshot.getUnAppliedEntries().get(0).getIndex());
+            assertEquals("Unapplied entry data", legacy.getUnAppliedEntries().get(0).getData(),
+                    snapshot.getUnAppliedEntries().get(0).getData());
+            assertEquals("getElectionVotedFor", legacy.getElectionVotedFor(), snapshot.getElectionVotedFor());
+            assertEquals("getElectionTerm", legacy.getElectionTerm(), snapshot.getElectionTerm());
+            assertEquals("getServerConfiguration", Sets.newHashSet(legacy.getServerConfiguration().getServerConfig()),
+                    Sets.newHashSet(snapshot.getServerConfiguration().getServerConfig()));
+        }, snapshotState);
+
+        TEST_LOG.info("testSnapshotAfterStartupWithMigratedSnapshot ending");
+    }
+
     @SuppressWarnings("checkstyle:IllegalCatch")
     private TestActorRef<MockRaftActor> doTestSnapshotAfterStartupWithMigratedMessage(String id, boolean persistent,
-            Consumer<Snapshot> snapshotVerifier) {
+            Consumer<Snapshot> snapshotVerifier, final State snapshotState) {
         InMemorySnapshotStore.addSnapshotSavedLatch(id);
         InMemoryJournal.addDeleteMessagesCompleteLatch(id);
         DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
@@ -246,12 +300,17 @@ public class MigratedMessagesTest extends AbstractActorTest {
 
         RaftActorSnapshotCohort snapshotCohort = new RaftActorSnapshotCohort() {
             @Override
-            public void createSnapshot(ActorRef actorRef) {
-                actorRef.tell(new CaptureSnapshotReply(new byte[0]), actorRef);
+            public void createSnapshot(ActorRef actorRef, java.util.Optional<OutputStream> installSnapshotStream) {
+                actorRef.tell(new CaptureSnapshotReply(snapshotState, installSnapshotStream), actorRef);
+            }
+
+            @Override
+            public void applySnapshot(State newState) {
             }
 
             @Override
-            public void applySnapshot(byte[] snapshotBytes) {
+            public State deserializeSnapshot(ByteSource snapshotBytes) {
+                throw new UnsupportedOperationException();
             }
         };
 
index dbced5f4cb6b1c9372e662144e0d60f0f8d8517f..20adaf12beff56f0125441c265d75c2533b3c769 100644 (file)
@@ -16,10 +16,10 @@ import akka.actor.Props;
 import com.google.common.base.Function;
 import com.google.common.base.Optional;
 import com.google.common.base.Throwables;
+import com.google.common.io.ByteSource;
 import com.google.common.util.concurrent.Uninterruptibles;
-import java.io.ByteArrayInputStream;
 import java.io.IOException;
-import java.io.ObjectInputStream;
+import java.io.OutputStream;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
@@ -27,8 +27,10 @@ import java.util.Map;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
 import javax.annotation.Nonnull;
+import org.apache.commons.lang3.SerializationUtils;
 import org.opendaylight.controller.cluster.DataPersistenceProvider;
 import org.opendaylight.controller.cluster.raft.behaviors.RaftActorBehavior;
+import org.opendaylight.controller.cluster.raft.persisted.Snapshot;
 import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload;
 import org.opendaylight.yangtools.concepts.Identifier;
 
@@ -169,38 +171,43 @@ public class MockRaftActor extends RaftActor implements RaftActorRecoveryCohort,
     }
 
     @Override
-    public void applyRecoverySnapshot(byte[] bytes) {
-        recoveryCohortDelegate.applyRecoverySnapshot(bytes);
-        applySnapshotBytes(bytes);
+    public void applyRecoverySnapshot(Snapshot.State newState) {
+        recoveryCohortDelegate.applyRecoverySnapshot(newState);
+        applySnapshotState(newState);
     }
 
-    private void applySnapshotBytes(byte[] bytes) {
-        if (bytes.length == 0) {
-            return;
-        }
-
-        try {
-            Object data = toObject(bytes);
-            if (data instanceof List) {
-                state.clear();
-                state.addAll((List<?>) data);
-            }
-        } catch (ClassNotFoundException | IOException e) {
-            Throwables.propagate(e);
+    private void applySnapshotState(Snapshot.State newState) {
+        if (newState instanceof MockSnapshotState) {
+            state.clear();
+            state.addAll(((MockSnapshotState)newState).getState());
         }
     }
 
     @Override
-    public void createSnapshot(ActorRef actorRef) {
+    public void createSnapshot(ActorRef actorRef, java.util.Optional<OutputStream> installSnapshotStream) {
         LOG.info("{}: createSnapshot called", persistenceId());
-        snapshotCohortDelegate.createSnapshot(actorRef);
+        snapshotCohortDelegate.createSnapshot(actorRef, installSnapshotStream);
     }
 
     @Override
-    public void applySnapshot(byte [] snapshot) {
+    public void applySnapshot(Snapshot.State newState) {
         LOG.info("{}: applySnapshot called", persistenceId());
-        applySnapshotBytes(snapshot);
-        snapshotCohortDelegate.applySnapshot(snapshot);
+        applySnapshotState(newState);
+        snapshotCohortDelegate.applySnapshot(newState);
+    }
+
+    @Override
+    public Snapshot.State deserializeSnapshot(ByteSource snapshotBytes) {
+        try {
+            return (Snapshot.State) SerializationUtils.deserialize(snapshotBytes.read());
+        } catch (IOException e) {
+            throw new RuntimeException("Error deserializing state", e);
+        }
+    }
+
+    @Override
+    public Snapshot.State deserializePreCarbonSnapshot(byte[] from) {
+        return new MockSnapshotState(SerializationUtils.deserialize(from));
     }
 
     @Override
@@ -243,23 +250,12 @@ public class MockRaftActor extends RaftActor implements RaftActorRecoveryCohort,
         }
     }
 
-    public static Object toObject(byte[] bs) throws ClassNotFoundException, IOException {
-        Object obj = null;
-        ByteArrayInputStream bis = null;
-        ObjectInputStream ois = null;
-        try {
-            bis = new ByteArrayInputStream(bs);
-            ois = new ObjectInputStream(bis);
-            obj = ois.readObject();
-        } finally {
-            if (bis != null) {
-                bis.close();
-            }
-            if (ois != null) {
-                ois.close();
-            }
+    public static List<Object> fromState(Snapshot.State from) {
+        if (from instanceof MockSnapshotState) {
+            return ((MockSnapshotState)from).getState();
         }
-        return obj;
+
+        throw new IllegalStateException("Unexpected snapshot State: " + from);
     }
 
     public ReplicatedLog getReplicatedLog() {
@@ -367,4 +363,53 @@ public class MockRaftActor extends RaftActor implements RaftActorRecoveryCohort,
             super(MockRaftActor.class);
         }
     }
+
+    public static class MockSnapshotState implements Snapshot.State {
+        private static final long serialVersionUID = 1L;
+
+        private final List<Object> state;
+
+        public MockSnapshotState(List<Object> state) {
+            this.state = state;
+        }
+
+        public List<Object> getState() {
+            return state;
+        }
+
+        @Override
+        public int hashCode() {
+            final int prime = 31;
+            int result = 1;
+            result = prime * result + (state == null ? 0 : state.hashCode());
+            return result;
+        }
+
+        @Override
+        public boolean equals(Object obj) {
+            if (this == obj) {
+                return true;
+            }
+            if (obj == null) {
+                return false;
+            }
+            if (getClass() != obj.getClass()) {
+                return false;
+            }
+            MockSnapshotState other = (MockSnapshotState) obj;
+            if (state == null) {
+                if (other.state != null) {
+                    return false;
+                }
+            } else if (!state.equals(other.state)) {
+                return false;
+            }
+            return true;
+        }
+
+        @Override
+        public String toString() {
+            return "MockSnapshotState [state=" + state + "]";
+        }
+    }
 }
index cdcaadf8151999f8df9a5a9961285b36944daaa6..d92f0729f207c077f7b38a1427d62a535e49cd0f 100644 (file)
@@ -14,12 +14,17 @@ import akka.actor.ActorSystem;
 import akka.actor.Props;
 import akka.japi.Procedure;
 import com.google.common.base.Throwables;
+import com.google.common.io.ByteSource;
+import java.io.IOException;
+import java.io.OutputStream;
 import java.io.Serializable;
 import java.util.HashMap;
 import java.util.Map;
 import org.opendaylight.controller.cluster.NonPersistentDataProvider;
 import org.opendaylight.controller.cluster.raft.behaviors.RaftActorBehavior;
+import org.opendaylight.controller.cluster.raft.persisted.ByteState;
 import org.opendaylight.controller.cluster.raft.persisted.SimpleReplicatedLogEntry;
+import org.opendaylight.controller.cluster.raft.persisted.Snapshot.State;
 import org.opendaylight.controller.cluster.raft.policy.RaftPolicy;
 import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload;
 import org.slf4j.Logger;
@@ -120,7 +125,23 @@ public class MockRaftActorContext extends RaftActorContextImpl {
     @Override
     public SnapshotManager getSnapshotManager() {
         SnapshotManager snapshotManager = super.getSnapshotManager();
-        snapshotManager.setCreateSnapshotRunnable(() -> { });
+        snapshotManager.setCreateSnapshotConsumer(out -> { });
+
+        snapshotManager.setSnapshotCohort(new RaftActorSnapshotCohort() {
+            @Override
+            public State deserializeSnapshot(ByteSource snapshotBytes) throws IOException {
+                return ByteState.of(snapshotBytes.read());
+            }
+
+            @Override
+            public void createSnapshot(ActorRef actorRef, java.util.Optional<OutputStream> installSnapshotStream) {
+            }
+
+            @Override
+            public void applySnapshot(State snapshotState) {
+            }
+        });
+
         return snapshotManager;
     }
 
index e0abd1726ffa54cbb7ba13ce0628d9b0181a06df..828e3eb89e09c3a087576bf2e9e3e338bb93caa4 100644 (file)
@@ -10,9 +10,10 @@ package org.opendaylight.controller.cluster.raft;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
-import static org.mockito.AdditionalMatchers.aryEq;
 import static org.mockito.Matchers.any;
 import static org.mockito.Matchers.anyInt;
+import static org.mockito.Matchers.anyObject;
+import static org.mockito.Mockito.doAnswer;
 import static org.mockito.Mockito.doNothing;
 import static org.mockito.Mockito.doReturn;
 import static org.mockito.Mockito.never;
@@ -23,8 +24,11 @@ import akka.persistence.RecoveryCompleted;
 import akka.persistence.SnapshotMetadata;
 import akka.persistence.SnapshotOffer;
 import com.google.common.collect.Sets;
+import java.io.Serializable;
 import java.util.Arrays;
 import java.util.Collections;
+import java.util.List;
+import org.apache.commons.lang3.SerializationUtils;
 import org.hamcrest.Description;
 import org.junit.Before;
 import org.junit.Test;
@@ -36,11 +40,14 @@ import org.mockito.Mockito;
 import org.mockito.MockitoAnnotations;
 import org.opendaylight.controller.cluster.DataPersistenceProvider;
 import org.opendaylight.controller.cluster.PersistentDataProvider;
+import org.opendaylight.controller.cluster.raft.MockRaftActor.MockSnapshotState;
+import org.opendaylight.controller.cluster.raft.MockRaftActorContext.MockPayload;
 import org.opendaylight.controller.cluster.raft.persisted.ApplyJournalEntries;
 import org.opendaylight.controller.cluster.raft.persisted.DeleteEntries;
 import org.opendaylight.controller.cluster.raft.persisted.ServerConfigurationPayload;
 import org.opendaylight.controller.cluster.raft.persisted.ServerInfo;
 import org.opendaylight.controller.cluster.raft.persisted.SimpleReplicatedLogEntry;
+import org.opendaylight.controller.cluster.raft.persisted.Snapshot;
 import org.opendaylight.controller.cluster.raft.persisted.UpdateElectionTerm;
 import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload;
 import org.slf4j.Logger;
@@ -62,6 +69,9 @@ public class RaftActorRecoverySupportTest {
     @Mock
     private RaftActorRecoveryCohort mockCohort;
 
+    @Mock
+    private RaftActorSnapshotCohort mockSnapshotCohort;
+
     @Mock
     PersistentDataProvider mockPersistentProvider;
 
@@ -163,7 +173,44 @@ public class RaftActorRecoverySupportTest {
         replicatedLog.append(new SimpleReplicatedLogEntry(2, 1, new MockRaftActorContext.MockPayload("2")));
         replicatedLog.append(new SimpleReplicatedLogEntry(3, 1, new MockRaftActorContext.MockPayload("3")));
 
-        byte[] snapshotBytes = {1,2,3,4,5};
+        ReplicatedLogEntry unAppliedEntry1 = new SimpleReplicatedLogEntry(4, 1,
+                new MockRaftActorContext.MockPayload("4", 4));
+
+        ReplicatedLogEntry unAppliedEntry2 = new SimpleReplicatedLogEntry(5, 1,
+                new MockRaftActorContext.MockPayload("5", 5));
+
+        long lastAppliedDuringSnapshotCapture = 3;
+        long lastIndexDuringSnapshotCapture = 5;
+        long electionTerm = 2;
+        String electionVotedFor = "member-2";
+
+        MockSnapshotState snapshotState = new MockSnapshotState(Arrays.asList(new MockPayload("1")));
+        Snapshot snapshot = Snapshot.create(snapshotState,
+                Arrays.asList(unAppliedEntry1, unAppliedEntry2), lastIndexDuringSnapshotCapture, 1,
+                lastAppliedDuringSnapshotCapture, 1, electionTerm, electionVotedFor, null);
+
+        SnapshotMetadata metadata = new SnapshotMetadata("test", 6, 12345);
+        SnapshotOffer snapshotOffer = new SnapshotOffer(metadata , snapshot);
+
+        sendMessageToSupport(snapshotOffer);
+
+        assertEquals("Journal log size", 2, context.getReplicatedLog().size());
+        assertEquals("Journal data size", 9, context.getReplicatedLog().dataSize());
+        assertEquals("Last index", lastIndexDuringSnapshotCapture, context.getReplicatedLog().lastIndex());
+        assertEquals("Last applied", lastAppliedDuringSnapshotCapture, context.getLastApplied());
+        assertEquals("Commit index", lastAppliedDuringSnapshotCapture, context.getCommitIndex());
+        assertEquals("Snapshot term", 1, context.getReplicatedLog().getSnapshotTerm());
+        assertEquals("Snapshot index", lastAppliedDuringSnapshotCapture, context.getReplicatedLog().getSnapshotIndex());
+        assertEquals("Election term", electionTerm, context.getTermInformation().getCurrentTerm());
+        assertEquals("Election votedFor", electionVotedFor, context.getTermInformation().getVotedFor());
+        assertFalse("Dynamic server configuration", context.isDynamicServerConfigurationInUse());
+
+        verify(mockCohort).applyRecoverySnapshot(snapshotState);
+    }
+
+    @Deprecated
+    @Test
+    public void testOnSnapshotOfferWithPreCarbonSnapshot() {
 
         ReplicatedLogEntry unAppliedEntry1 = new SimpleReplicatedLogEntry(4, 1,
                 new MockRaftActorContext.MockPayload("4", 4));
@@ -176,12 +223,21 @@ public class RaftActorRecoverySupportTest {
         long electionTerm = 2;
         String electionVotedFor = "member-2";
 
-        Snapshot snapshot = Snapshot.create(snapshotBytes, Arrays.asList(unAppliedEntry1, unAppliedEntry2),
-                lastIndexDuringSnapshotCapture, 1, lastAppliedDuringSnapshotCapture, 1, electionTerm, electionVotedFor);
+        List<Object> snapshotData = Arrays.asList(new MockPayload("1"));
+        final MockSnapshotState snapshotState = new MockSnapshotState(snapshotData);
+
+        org.opendaylight.controller.cluster.raft.Snapshot snapshot = org.opendaylight.controller.cluster.raft.Snapshot
+            .create(SerializationUtils.serialize((Serializable) snapshotData),
+                Arrays.asList(unAppliedEntry1, unAppliedEntry2), lastIndexDuringSnapshotCapture, 1,
+                lastAppliedDuringSnapshotCapture, 1, electionTerm, electionVotedFor, null);
 
         SnapshotMetadata metadata = new SnapshotMetadata("test", 6, 12345);
         SnapshotOffer snapshotOffer = new SnapshotOffer(metadata , snapshot);
 
+        doAnswer(invocation -> new MockSnapshotState(SerializationUtils.deserialize(
+            invocation.getArgumentAt(0, byte[].class))))
+                .when(mockCohort).deserializePreCarbonSnapshot(any(byte[].class));
+
         sendMessageToSupport(snapshotOffer);
 
         assertEquals("Journal log size", 2, context.getReplicatedLog().size());
@@ -195,7 +251,7 @@ public class RaftActorRecoverySupportTest {
         assertEquals("Election votedFor", electionVotedFor, context.getTermInformation().getVotedFor());
         assertFalse("Dynamic server configuration", context.isDynamicServerConfigurationInUse());
 
-        verify(mockCohort).applyRecoverySnapshot(snapshotBytes);
+        verify(mockCohort).applyRecoverySnapshot(snapshotState);
     }
 
     @Test
@@ -255,11 +311,12 @@ public class RaftActorRecoverySupportTest {
 
     @Test
     public void testDataRecoveredWithPersistenceDisabled() {
-        doNothing().when(mockCohort).applyRecoverySnapshot(aryEq(new byte[0]));
+        doNothing().when(mockCohort).applyRecoverySnapshot(anyObject());
         doReturn(false).when(mockPersistence).isRecoveryApplicable();
         doReturn(10L).when(mockPersistentProvider).getLastSequenceNumber();
 
-        Snapshot snapshot = Snapshot.create(new byte[]{1}, Collections.<ReplicatedLogEntry>emptyList(), 3, 1, 3, 1);
+        Snapshot snapshot = Snapshot.create(new MockSnapshotState(Arrays.asList(new MockPayload("1"))),
+                Collections.<ReplicatedLogEntry>emptyList(), 3, 1, 3, 1, -1, null, null);
         SnapshotOffer snapshotOffer = new SnapshotOffer(new SnapshotMetadata("test", 6, 12345), snapshot);
 
         sendMessageToSupport(snapshotOffer);
@@ -285,7 +342,7 @@ public class RaftActorRecoverySupportTest {
 
         sendMessageToSupport(RecoveryCompleted.getInstance(), true);
 
-        verify(mockCohort).applyRecoverySnapshot(aryEq(new byte[0]));
+        verify(mockCohort, never()).applyRecoverySnapshot(anyObject());
         verify(mockCohort, never()).getRestoreFromSnapshot();
         verifyNoMoreInteractions(mockCohort);
 
@@ -389,7 +446,8 @@ public class RaftActorRecoverySupportTest {
                                                         new ServerInfo("follower1", true),
                                                         new ServerInfo("follower2", true)));
 
-        Snapshot snapshot = Snapshot.create(new byte[]{1}, Collections.<ReplicatedLogEntry>emptyList(),
+        MockSnapshotState snapshotState = new MockSnapshotState(Arrays.asList(new MockPayload("1")));
+        Snapshot snapshot = Snapshot.create(snapshotState, Collections.<ReplicatedLogEntry>emptyList(),
                 -1, -1, -1, -1, electionTerm, electionVotedFor, serverPayload);
 
         SnapshotMetadata metadata = new SnapshotMetadata("test", 6, 12345);
index a19281ffd8cbedd86062c008edd65cc6c2bbb8f4..71412f3e052394e506cab3df3b0a8585a5aa2370 100644 (file)
@@ -26,11 +26,15 @@ import com.google.common.base.Stopwatch;
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.Maps;
 import com.google.common.collect.Sets;
+import com.google.common.io.ByteSource;
+import java.io.OutputStream;
+import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.TimeUnit;
+import org.apache.commons.lang3.SerializationUtils;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
@@ -58,6 +62,7 @@ import org.opendaylight.controller.cluster.raft.messages.ServerChangeStatus;
 import org.opendaylight.controller.cluster.raft.messages.ServerRemoved;
 import org.opendaylight.controller.cluster.raft.messages.UnInitializedFollowerSnapshotReply;
 import org.opendaylight.controller.cluster.raft.persisted.ApplyJournalEntries;
+import org.opendaylight.controller.cluster.raft.persisted.ByteState;
 import org.opendaylight.controller.cluster.raft.persisted.ServerConfigurationPayload;
 import org.opendaylight.controller.cluster.raft.persisted.ServerInfo;
 import org.opendaylight.controller.cluster.raft.persisted.SimpleReplicatedLogEntry;
@@ -167,8 +172,7 @@ public class RaftActorServerConfigurationSupportTest extends AbstractActorTest {
         // Leader should install snapshot - capture and verify ApplySnapshot contents
 
         ApplySnapshot applySnapshot = expectFirstMatching(newFollowerCollectorActor, ApplySnapshot.class);
-        @SuppressWarnings("unchecked")
-        List<Object> snapshotState = (List<Object>) MockRaftActor.toObject(applySnapshot.getSnapshot().getState());
+        List<Object> snapshotState = MockRaftActor.fromState(applySnapshot.getSnapshot().getState());
         assertEquals("Snapshot state", snapshotState, leaderRaftActor.getState());
 
         AddServerReply addServerReply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"), AddServerReply.class);
@@ -248,8 +252,7 @@ public class RaftActorServerConfigurationSupportTest extends AbstractActorTest {
         // Leader should install snapshot - capture and verify ApplySnapshot contents
 
         ApplySnapshot applySnapshot = expectFirstMatching(newFollowerCollectorActor, ApplySnapshot.class);
-        @SuppressWarnings("unchecked")
-        List<Object> snapshotState = (List<Object>) MockRaftActor.toObject(applySnapshot.getSnapshot().getState());
+        List<Object> snapshotState = MockRaftActor.fromState(applySnapshot.getSnapshot().getState());
         assertEquals("Snapshot state", snapshotState, leaderRaftActor.getState());
 
         AddServerReply addServerReply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"), AddServerReply.class);
@@ -1518,12 +1521,19 @@ public class RaftActorServerConfigurationSupportTest extends AbstractActorTest {
             super(id, peerAddresses, config, persistent, collectorActor);
             snapshotCohortDelegate = new RaftActorSnapshotCohort() {
                 @Override
-                public void createSnapshot(ActorRef actorRef) {
-                    actorRef.tell(new CaptureSnapshotReply(new byte[0]), actorRef);
+                public void createSnapshot(ActorRef actorRef, java.util.Optional<OutputStream> installSnapshotStream) {
+                    actorRef.tell(new CaptureSnapshotReply(ByteState.empty(), installSnapshotStream), actorRef);
                 }
 
                 @Override
-                public void applySnapshot(byte[] snapshotBytes) {
+                public void applySnapshot(
+                        org.opendaylight.controller.cluster.raft.persisted.Snapshot.State snapshotState) {
+                }
+
+                @Override
+                public org.opendaylight.controller.cluster.raft.persisted.Snapshot.State deserializeSnapshot(
+                        ByteSource snapshotBytes) {
+                    throw new UnsupportedOperationException();
                 }
             };
         }
@@ -1564,12 +1574,13 @@ public class RaftActorServerConfigurationSupportTest extends AbstractActorTest {
 
         @Override
         @SuppressWarnings("checkstyle:IllegalCatch")
-        public void createSnapshot(ActorRef actorRef) {
-            try {
-                actorRef.tell(new CaptureSnapshotReply(RaftActorTest.fromObject(getState()).toByteArray()), actorRef);
-            } catch (Exception e) {
-                LOG.error("createSnapshot failed", e);
+        public void createSnapshot(ActorRef actorRef, java.util.Optional<OutputStream> installSnapshotStream) {
+            MockSnapshotState snapshotState = new MockSnapshotState(new ArrayList<>(getState()));
+            if (installSnapshotStream.isPresent()) {
+                SerializationUtils.serialize(snapshotState, installSnapshotStream.get());
             }
+
+            actorRef.tell(new CaptureSnapshotReply(snapshotState, installSnapshotStream), actorRef);
         }
 
         static Props props(Map<String, String> peerAddresses, RaftActorContext fromContext) {
index 45b89b7e01c42ff93ca4bc972b9084450d84ccda..495cc6d6af490df19ae44919af6f9b7a2bb8dacd 100644 (file)
@@ -10,15 +10,17 @@ package org.opendaylight.controller.cluster.raft;
 import static org.junit.Assert.assertEquals;
 import static org.mockito.Matchers.anyLong;
 import static org.mockito.Matchers.eq;
-import static org.mockito.Matchers.same;
 import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.verify;
 
 import akka.actor.ActorRef;
 import akka.persistence.SaveSnapshotFailure;
 import akka.persistence.SaveSnapshotSuccess;
 import akka.persistence.SnapshotMetadata;
+import java.io.OutputStream;
 import java.util.Collections;
+import java.util.Optional;
 import org.junit.Before;
 import org.junit.Test;
 import org.mockito.Mock;
@@ -27,6 +29,8 @@ import org.opendaylight.controller.cluster.DataPersistenceProvider;
 import org.opendaylight.controller.cluster.raft.base.messages.ApplySnapshot;
 import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshotReply;
 import org.opendaylight.controller.cluster.raft.behaviors.RaftActorBehavior;
+import org.opendaylight.controller.cluster.raft.persisted.ByteState;
+import org.opendaylight.controller.cluster.raft.persisted.Snapshot;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -95,8 +99,8 @@ public class RaftActorSnapshotMessageSupportTest {
         long lastIndexDuringSnapshotCapture = 2;
         byte[] snapshotBytes = {1,2,3,4,5};
 
-        Snapshot snapshot = Snapshot.create(snapshotBytes, Collections.<ReplicatedLogEntry>emptyList(),
-                lastIndexDuringSnapshotCapture, 1, lastAppliedDuringSnapshotCapture, 1);
+        Snapshot snapshot = Snapshot.create(ByteState.of(snapshotBytes), Collections.<ReplicatedLogEntry>emptyList(),
+                lastIndexDuringSnapshotCapture, 1, lastAppliedDuringSnapshotCapture, 1, -1, null, null);
 
         ApplySnapshot applySnapshot = new ApplySnapshot(snapshot);
         sendMessageToSupport(applySnapshot);
@@ -106,11 +110,11 @@ public class RaftActorSnapshotMessageSupportTest {
 
     @Test
     public void testOnCaptureSnapshotReply() {
+        ByteState state = ByteState.of(new byte[]{1,2,3,4,5});
+        Optional<OutputStream> optionalStream = Optional.of(mock(OutputStream.class));
+        sendMessageToSupport(new CaptureSnapshotReply(state, optionalStream));
 
-        byte[] snapshot = {1,2,3,4,5};
-        sendMessageToSupport(new CaptureSnapshotReply(snapshot));
-
-        verify(mockSnapshotManager).persist(same(snapshot), anyLong());
+        verify(mockSnapshotManager).persist(eq(state), eq(optionalStream), anyLong());
     }
 
     @Test
index edc51092d66b8ec7cc776cbe4b337cdbc95f7909..d674afb6205aa7f8357a1f0f185e35aec5df8966 100644 (file)
@@ -8,7 +8,6 @@
 
 package org.opendaylight.controller.cluster.raft;
 
-import static org.junit.Assert.assertArrayEquals;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertNotSame;
@@ -63,10 +62,10 @@ import org.opendaylight.controller.cluster.NonPersistentDataProvider;
 import org.opendaylight.controller.cluster.PersistentDataProvider;
 import org.opendaylight.controller.cluster.notifications.LeaderStateChanged;
 import org.opendaylight.controller.cluster.notifications.RoleChanged;
+import org.opendaylight.controller.cluster.raft.MockRaftActor.MockSnapshotState;
 import org.opendaylight.controller.cluster.raft.MockRaftActorContext.MockPayload;
 import org.opendaylight.controller.cluster.raft.base.messages.ApplySnapshot;
 import org.opendaylight.controller.cluster.raft.base.messages.ApplyState;
-import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshot;
 import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshotReply;
 import org.opendaylight.controller.cluster.raft.base.messages.LeaderTransitioning;
 import org.opendaylight.controller.cluster.raft.base.messages.SendHeartBeat;
@@ -79,10 +78,13 @@ import org.opendaylight.controller.cluster.raft.client.messages.GetSnapshotReply
 import org.opendaylight.controller.cluster.raft.messages.AppendEntries;
 import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply;
 import org.opendaylight.controller.cluster.raft.persisted.ApplyJournalEntries;
+import org.opendaylight.controller.cluster.raft.persisted.ByteState;
 import org.opendaylight.controller.cluster.raft.persisted.DeleteEntries;
+import org.opendaylight.controller.cluster.raft.persisted.EmptyState;
 import org.opendaylight.controller.cluster.raft.persisted.ServerConfigurationPayload;
 import org.opendaylight.controller.cluster.raft.persisted.ServerInfo;
 import org.opendaylight.controller.cluster.raft.persisted.SimpleReplicatedLogEntry;
+import org.opendaylight.controller.cluster.raft.persisted.Snapshot;
 import org.opendaylight.controller.cluster.raft.persisted.UpdateElectionTerm;
 import org.opendaylight.controller.cluster.raft.policy.DisableElectionsRaftPolicy;
 import org.opendaylight.controller.cluster.raft.utils.InMemoryJournal;
@@ -153,15 +155,14 @@ public class RaftActorTest extends AbstractActorTest {
         int lastIndexDuringSnapshotCapture = 4;
 
         // 4 messages as part of snapshot, which are applied to state
-        ByteString snapshotBytes = fromObject(Arrays.asList(
+        MockSnapshotState snapshotState = new MockSnapshotState(Arrays.asList(
                 new MockRaftActorContext.MockPayload("A"),
                 new MockRaftActorContext.MockPayload("B"),
                 new MockRaftActorContext.MockPayload("C"),
                 new MockRaftActorContext.MockPayload("D")));
 
-        Snapshot snapshot = Snapshot.create(snapshotBytes.toByteArray(),
-                snapshotUnappliedEntries, lastIndexDuringSnapshotCapture, 1,
-                lastAppliedDuringSnapshotCapture, 1);
+        Snapshot snapshot = Snapshot.create(snapshotState, snapshotUnappliedEntries, lastIndexDuringSnapshotCapture, 1,
+                lastAppliedDuringSnapshotCapture, 1, -1, null, null);
         InMemorySnapshotStore.addSnapshot(persistenceId, snapshot);
 
         // add more entries after snapshot is taken
@@ -292,7 +293,8 @@ public class RaftActorTest extends AbstractActorTest {
         RaftActorRecoverySupport mockSupport = mock(RaftActorRecoverySupport.class);
         mockRaftActor.setRaftActorRecoverySupport(mockSupport );
 
-        Snapshot snapshot = Snapshot.create(new byte[]{1}, Collections.<ReplicatedLogEntry>emptyList(), 3, 1, 3, 1);
+        Snapshot snapshot = Snapshot.create(ByteState.of(new byte[]{1}),
+                Collections.<ReplicatedLogEntry>emptyList(), 3, 1, 3, 1, -1, null, null);
         SnapshotOffer snapshotOffer = new SnapshotOffer(new SnapshotMetadata("test", 6, 12345), snapshot);
         mockRaftActor.handleRecover(snapshotOffer);
 
@@ -337,11 +339,8 @@ public class RaftActorTest extends AbstractActorTest {
         doReturn(true).when(mockSupport).handleSnapshotMessage(same(applySnapshot), any(ActorRef.class));
         mockRaftActor.handleCommand(applySnapshot);
 
-        CaptureSnapshot captureSnapshot = new CaptureSnapshot(1, 1, 1, 1, 0, 1, null);
-        doReturn(true).when(mockSupport).handleSnapshotMessage(same(captureSnapshot), any(ActorRef.class));
-        mockRaftActor.handleCommand(captureSnapshot);
-
-        CaptureSnapshotReply captureSnapshotReply = new CaptureSnapshotReply(new byte[0]);
+        CaptureSnapshotReply captureSnapshotReply = new CaptureSnapshotReply(ByteState.empty(),
+                java.util.Optional.empty());
         doReturn(true).when(mockSupport).handleSnapshotMessage(same(captureSnapshotReply), any(ActorRef.class));
         mockRaftActor.handleCommand(captureSnapshotReply);
 
@@ -362,7 +361,6 @@ public class RaftActorTest extends AbstractActorTest {
         mockRaftActor.handleCommand(GetSnapshot.INSTANCE);
 
         verify(mockSupport).handleSnapshotMessage(same(applySnapshot), any(ActorRef.class));
-        verify(mockSupport).handleSnapshotMessage(same(captureSnapshot), any(ActorRef.class));
         verify(mockSupport).handleSnapshotMessage(same(captureSnapshotReply), any(ActorRef.class));
         verify(mockSupport).handleSnapshotMessage(same(saveSnapshotSuccess), any(ActorRef.class));
         verify(mockSupport).handleSnapshotMessage(same(saveSnapshotFailure), any(ActorRef.class));
@@ -592,7 +590,7 @@ public class RaftActorTest extends AbstractActorTest {
         leaderActor.getRaftActorContext().getSnapshotManager().capture(
                 new SimpleReplicatedLogEntry(6, 1, new MockRaftActorContext.MockPayload("x")), 4);
 
-        verify(leaderActor.snapshotCohortDelegate).createSnapshot(any(ActorRef.class));
+        verify(leaderActor.snapshotCohortDelegate).createSnapshot(anyObject(), anyObject());
 
         assertEquals(8, leaderActor.getReplicatedLog().size());
 
@@ -611,14 +609,14 @@ public class RaftActorTest extends AbstractActorTest {
 
         assertEquals(8, leaderActor.getReplicatedLog().size());
 
-        ByteString snapshotBytes = fromObject(Arrays.asList(
+        MockSnapshotState snapshotState = new MockSnapshotState(Arrays.asList(
                 new MockRaftActorContext.MockPayload("foo-0"),
                 new MockRaftActorContext.MockPayload("foo-1"),
                 new MockRaftActorContext.MockPayload("foo-2"),
                 new MockRaftActorContext.MockPayload("foo-3"),
                 new MockRaftActorContext.MockPayload("foo-4")));
 
-        leaderActor.getRaftActorContext().getSnapshotManager().persist(snapshotBytes.toByteArray(),
+        leaderActor.getRaftActorContext().getSnapshotManager().persist(snapshotState, java.util.Optional.empty(),
                 Runtime.getRuntime().totalMemory());
 
         assertTrue(leaderActor.getRaftActorContext().getSnapshotManager().isCapturing());
@@ -684,7 +682,7 @@ public class RaftActorTest extends AbstractActorTest {
         followerActor.getRaftActorContext().getSnapshotManager().capture(
                 new SimpleReplicatedLogEntry(5, 1, new MockRaftActorContext.MockPayload("D")), 4);
 
-        verify(followerActor.snapshotCohortDelegate).createSnapshot(any(ActorRef.class));
+        verify(followerActor.snapshotCohortDelegate).createSnapshot(anyObject(), anyObject());
 
         assertEquals(6, followerActor.getReplicatedLog().size());
 
@@ -711,7 +709,8 @@ public class RaftActorTest extends AbstractActorTest {
                 new MockRaftActorContext.MockPayload("foo-2"),
                 new MockRaftActorContext.MockPayload("foo-3"),
                 new MockRaftActorContext.MockPayload("foo-4")));
-        followerActor.onReceiveCommand(new CaptureSnapshotReply(snapshotBytes.toByteArray()));
+        followerActor.onReceiveCommand(new CaptureSnapshotReply(ByteState.of(snapshotBytes.toByteArray()),
+                java.util.Optional.empty()));
         assertTrue(followerActor.getRaftActorContext().getSnapshotManager().isCapturing());
 
         // The commit is needed to complete the snapshot creation process
@@ -802,7 +801,8 @@ public class RaftActorTest extends AbstractActorTest {
                 new MockRaftActorContext.MockPayload("foo-2"),
                 new MockRaftActorContext.MockPayload("foo-3"),
                 new MockRaftActorContext.MockPayload("foo-4")));
-        leaderActor.onReceiveCommand(new CaptureSnapshotReply(snapshotBytes.toByteArray()));
+        leaderActor.onReceiveCommand(new CaptureSnapshotReply(ByteState.of(snapshotBytes.toByteArray()),
+                java.util.Optional.empty()));
         assertTrue(leaderActor.getRaftActorContext().getSnapshotManager().isCapturing());
 
         assertEquals("Real snapshot didn't clear the log till replicatedToAllIndex", 0,
@@ -848,7 +848,8 @@ public class RaftActorTest extends AbstractActorTest {
                 leaderActor.getReplicatedLog().last(), -1, "member1");
 
         // Now send a CaptureSnapshotReply
-        mockActorRef.tell(new CaptureSnapshotReply(fromObject("foo").toByteArray()), mockActorRef);
+        mockActorRef.tell(new CaptureSnapshotReply(ByteState.of(fromObject("foo").toByteArray()),
+                java.util.Optional.empty()), mockActorRef);
 
         // Trimming log in this scenario is a no-op
         assertEquals(-1, leaderActor.getReplicatedLog().getSnapshotIndex());
@@ -888,7 +889,8 @@ public class RaftActorTest extends AbstractActorTest {
                 new MockRaftActorContext.MockPayload("duh"), false);
 
         // Now send a CaptureSnapshotReply
-        mockActorRef.tell(new CaptureSnapshotReply(fromObject("foo").toByteArray()), mockActorRef);
+        mockActorRef.tell(new CaptureSnapshotReply(ByteState.of(fromObject("foo").toByteArray()),
+                java.util.Optional.empty()), mockActorRef);
 
         // Trimming log in this scenario is a no-op
         assertEquals(3, leaderActor.getReplicatedLog().getSnapshotIndex());
@@ -1038,10 +1040,12 @@ public class RaftActorTest extends AbstractActorTest {
         raftActorRef.tell(GetSnapshot.INSTANCE, kit.getRef());
 
         ArgumentCaptor<ActorRef> replyActor = ArgumentCaptor.forClass(ActorRef.class);
-        verify(mockRaftActor.snapshotCohortDelegate, timeout(5000)).createSnapshot(replyActor.capture());
+        verify(mockRaftActor.snapshotCohortDelegate, timeout(5000)).createSnapshot(replyActor.capture(),
+                eq(java.util.Optional.empty()));
 
         byte[] stateSnapshot = new byte[]{1,2,3};
-        replyActor.getValue().tell(new CaptureSnapshotReply(stateSnapshot), ActorRef.noSender());
+        replyActor.getValue().tell(new CaptureSnapshotReply(ByteState.of(stateSnapshot), java.util.Optional.empty()),
+                ActorRef.noSender());
 
         GetSnapshotReply reply = kit.expectMsgClass(GetSnapshotReply.class);
 
@@ -1053,7 +1057,7 @@ public class RaftActorTest extends AbstractActorTest {
         assertEquals("getLastAppliedTerm", term, replySnapshot.getLastAppliedTerm());
         assertEquals("getLastIndex", 2L, replySnapshot.getLastIndex());
         assertEquals("getLastTerm", term, replySnapshot.getLastTerm());
-        assertArrayEquals("getState", stateSnapshot, replySnapshot.getState());
+        assertEquals("getState", ByteState.of(stateSnapshot), replySnapshot.getState());
         assertEquals("getUnAppliedEntries size", 1, replySnapshot.getUnAppliedEntries().size());
         assertEquals("UnApplied entry index ", 2L, replySnapshot.getUnAppliedEntries().get(0).getIndex());
 
@@ -1076,7 +1080,7 @@ public class RaftActorTest extends AbstractActorTest {
 
         raftActorRef.tell(GetSnapshot.INSTANCE, kit.getRef());
         reply = kit.expectMsgClass(GetSnapshotReply.class);
-        verify(mockRaftActor.snapshotCohortDelegate, never()).createSnapshot(any(ActorRef.class));
+        verify(mockRaftActor.snapshotCohortDelegate, never()).createSnapshot(anyObject(), anyObject());
 
         assertEquals("getId", persistenceId, reply.getId());
         replySnapshot = SerializationUtils.deserialize(reply.getSnapshot());
@@ -1086,7 +1090,7 @@ public class RaftActorTest extends AbstractActorTest {
         assertEquals("getLastAppliedTerm", -1L, replySnapshot.getLastAppliedTerm());
         assertEquals("getLastIndex", -1L, replySnapshot.getLastIndex());
         assertEquals("getLastTerm", -1L, replySnapshot.getLastTerm());
-        assertEquals("getState length", 0, replySnapshot.getState().length);
+        assertEquals("getState type", EmptyState.INSTANCE, replySnapshot.getState());
         assertEquals("getUnAppliedEntries size", 0, replySnapshot.getUnAppliedEntries().size());
 
         TEST_LOG.info("testGetSnapshot ending");
@@ -1106,15 +1110,14 @@ public class RaftActorTest extends AbstractActorTest {
         int snapshotLastApplied = 3;
         int snapshotLastIndex = 4;
 
-        List<MockPayload> state = Arrays.asList(
+        MockSnapshotState snapshotState = new MockSnapshotState(Arrays.asList(
                 new MockRaftActorContext.MockPayload("A"),
                 new MockRaftActorContext.MockPayload("B"),
                 new MockRaftActorContext.MockPayload("C"),
-                new MockRaftActorContext.MockPayload("D"));
-        ByteString stateBytes = fromObject(state);
+                new MockRaftActorContext.MockPayload("D")));
 
-        Snapshot snapshot = Snapshot.create(stateBytes.toByteArray(), snapshotUnappliedEntries,
-                snapshotLastIndex, 1, snapshotLastApplied, 1, 1, "member-1");
+        Snapshot snapshot = Snapshot.create(snapshotState, snapshotUnappliedEntries,
+                snapshotLastIndex, 1, snapshotLastApplied, 1, 1, "member-1", null);
 
         InMemorySnapshotStore.addSnapshotSavedLatch(persistenceId);
 
@@ -1132,24 +1135,24 @@ public class RaftActorTest extends AbstractActorTest {
         assertEquals("getLastAppliedTerm", snapshot.getLastAppliedTerm(), savedSnapshot.getLastAppliedTerm());
         assertEquals("getLastIndex", snapshot.getLastIndex(), savedSnapshot.getLastIndex());
         assertEquals("getLastTerm", snapshot.getLastTerm(), savedSnapshot.getLastTerm());
-        assertArrayEquals("getState", snapshot.getState(), savedSnapshot.getState());
+        assertEquals("getState", snapshot.getState(), savedSnapshot.getState());
         assertEquals("getUnAppliedEntries", snapshot.getUnAppliedEntries(), savedSnapshot.getUnAppliedEntries());
 
-        verify(mockRaftActor.snapshotCohortDelegate, timeout(5000)).applySnapshot(any(byte[].class));
+        verify(mockRaftActor.snapshotCohortDelegate, timeout(5000)).applySnapshot(any(Snapshot.State.class));
 
         RaftActorContext context = mockRaftActor.getRaftActorContext();
         assertEquals("Journal log size", 1, context.getReplicatedLog().size());
         assertEquals("Last index", snapshotLastIndex, context.getReplicatedLog().lastIndex());
         assertEquals("Last applied", snapshotLastApplied, context.getLastApplied());
         assertEquals("Commit index", snapshotLastApplied, context.getCommitIndex());
-        assertEquals("Recovered state", state, mockRaftActor.getState());
+        assertEquals("Recovered state", snapshotState.getState(), mockRaftActor.getState());
         assertEquals("Current term", 1L, context.getTermInformation().getCurrentTerm());
         assertEquals("Voted for", "member-1", context.getTermInformation().getVotedFor());
 
         // Test with data persistence disabled
 
-        snapshot = Snapshot.create(new byte[0], Collections.<ReplicatedLogEntry>emptyList(),
-                -1, -1, -1, -1, 5, "member-1");
+        snapshot = Snapshot.create(EmptyState.INSTANCE, Collections.<ReplicatedLogEntry>emptyList(),
+                -1, -1, -1, -1, 5, "member-1", null);
 
         persistenceId = factory.generateActorId("test-actor-");
 
@@ -1179,8 +1182,8 @@ public class RaftActorTest extends AbstractActorTest {
         config.setCustomRaftPolicyImplementationClass(DisableElectionsRaftPolicy.class.getName());
 
         List<MockPayload> state = Arrays.asList(new MockRaftActorContext.MockPayload("A"));
-        Snapshot snapshot = Snapshot.create(fromObject(state).toByteArray(), Arrays.<ReplicatedLogEntry>asList(),
-                5, 2, 5, 2, 2, "member-1");
+        Snapshot snapshot = Snapshot.create(ByteState.of(fromObject(state).toByteArray()),
+                Arrays.<ReplicatedLogEntry>asList(), 5, 2, 5, 2, 2, "member-1", null);
 
         InMemoryJournal.addEntry(persistenceId, 1, new SimpleReplicatedLogEntry(0, 1,
                 new MockRaftActorContext.MockPayload("B")));
@@ -1193,7 +1196,7 @@ public class RaftActorTest extends AbstractActorTest {
         mockRaftActor.waitForRecoveryComplete();
 
         Uninterruptibles.sleepUninterruptibly(500, TimeUnit.MILLISECONDS);
-        verify(mockRaftActor.snapshotCohortDelegate, never()).applySnapshot(any(byte[].class));
+        verify(mockRaftActor.snapshotCohortDelegate, never()).applySnapshot(any(Snapshot.State.class));
 
         RaftActorContext context = mockRaftActor.getRaftActorContext();
         assertEquals("Journal log size", 1, context.getReplicatedLog().size());
index 657905e8223075cad7b8a43c04de9360d467f2b4..2a3a6c1a7a25ce2b45f3b0051b82993b16c8a5a9 100644 (file)
@@ -18,6 +18,7 @@ import java.util.List;
 import org.junit.Before;
 import org.junit.Test;
 import org.opendaylight.controller.cluster.raft.persisted.ApplyJournalEntries;
+import org.opendaylight.controller.cluster.raft.persisted.Snapshot;
 import org.opendaylight.controller.cluster.raft.utils.InMemoryJournal;
 import org.opendaylight.controller.cluster.raft.utils.InMemorySnapshotStore;
 import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor;
@@ -82,8 +83,7 @@ public class RecoveryIntegrationSingleNodeTest extends AbstractRaftActorIntegrat
         List<Snapshot> persistedSnapshots = InMemorySnapshotStore.getSnapshots(persistenceId, Snapshot.class);
         assertEquals(1, persistedSnapshots.size());
 
-        @SuppressWarnings("unchecked")
-        List<Object> snapshottedState = (List<Object>)MockRaftActor.toObject(persistedSnapshots.get(0).getState());
+        List<Object> snapshottedState = MockRaftActor.fromState(persistedSnapshots.get(0).getState());
         assertEquals("Incorrect Snapshot", Lists.newArrayList(payload0, payload1, payload2, payload3),
                 snapshottedState);
 
index 0254c6db6ec211b0513861a3b3d0cc458066796d..e53dfe671494e6f01f675c29eb31d12cac1a1355 100644 (file)
@@ -20,6 +20,7 @@ import org.opendaylight.controller.cluster.raft.messages.AppendEntries;
 import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply;
 import org.opendaylight.controller.cluster.raft.persisted.ApplyJournalEntries;
 import org.opendaylight.controller.cluster.raft.persisted.SimpleReplicatedLogEntry;
+import org.opendaylight.controller.cluster.raft.persisted.Snapshot;
 import org.opendaylight.controller.cluster.raft.persisted.UpdateElectionTerm;
 import org.opendaylight.controller.cluster.raft.utils.InMemoryJournal;
 import org.opendaylight.controller.cluster.raft.utils.InMemorySnapshotStore;
index 8a915495e4a90af1ccc0421cfc32a6fcce3c75e9..0dbcb1fbd0943ec0e26328910f97060826ce13a1 100644 (file)
@@ -21,6 +21,7 @@ import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.TimeUnit;
 import javax.annotation.Nullable;
+import org.apache.commons.lang3.SerializationUtils;
 import org.junit.Assert;
 import org.junit.Test;
 import org.opendaylight.controller.cluster.raft.MockRaftActorContext.MockPayload;
@@ -37,6 +38,7 @@ import org.opendaylight.controller.cluster.raft.persisted.ApplyJournalEntries;
 import org.opendaylight.controller.cluster.raft.persisted.ServerConfigurationPayload;
 import org.opendaylight.controller.cluster.raft.persisted.ServerInfo;
 import org.opendaylight.controller.cluster.raft.persisted.SimpleReplicatedLogEntry;
+import org.opendaylight.controller.cluster.raft.persisted.Snapshot;
 import org.opendaylight.controller.cluster.raft.persisted.UpdateElectionTerm;
 import org.opendaylight.controller.cluster.raft.utils.InMemoryJournal;
 import org.opendaylight.controller.cluster.raft.utils.InMemorySnapshotStore;
@@ -553,7 +555,7 @@ public class ReplicationAndSnapshotsWithLaggingFollowerIntegrationTest extends A
         List<ReplicatedLogEntry> unAppliedEntry = persistedSnapshot.getUnAppliedEntries();
         assertEquals("Persisted Snapshot getUnAppliedEntries size", 0, unAppliedEntry.size());
 
-        int snapshotSize = persistedSnapshot.getState().length;
+        int snapshotSize = SerializationUtils.serialize(persistedSnapshot.getState()).length;
         final int expTotalChunks = snapshotSize / SNAPSHOT_CHUNK_SIZE
                 + (snapshotSize % SNAPSHOT_CHUNK_SIZE > 0 ? 1 : 0);
 
index fab250bd31e9a8fef7199b8f412f4fa7e57ee8d2..7591c2f7ec9f0d85cf3d85225703e4c6cc7f5cc4 100644 (file)
@@ -14,6 +14,8 @@ import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
 import static org.mockito.Matchers.any;
 import static org.mockito.Matchers.anyLong;
+import static org.mockito.Matchers.anyObject;
+import static org.mockito.Mockito.doNothing;
 import static org.mockito.Mockito.doReturn;
 import static org.mockito.Mockito.doThrow;
 import static org.mockito.Mockito.mock;
@@ -25,7 +27,10 @@ import static org.mockito.Mockito.verify;
 import akka.actor.ActorRef;
 import akka.persistence.SnapshotSelectionCriteria;
 import akka.testkit.TestActorRef;
+import java.io.OutputStream;
 import java.util.Arrays;
+import java.util.Optional;
+import java.util.function.Consumer;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
@@ -38,7 +43,9 @@ import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshot;
 import org.opendaylight.controller.cluster.raft.base.messages.SendInstallSnapshot;
 import org.opendaylight.controller.cluster.raft.base.messages.SnapshotComplete;
 import org.opendaylight.controller.cluster.raft.behaviors.RaftActorBehavior;
+import org.opendaylight.controller.cluster.raft.persisted.ByteState;
 import org.opendaylight.controller.cluster.raft.persisted.SimpleReplicatedLogEntry;
+import org.opendaylight.controller.cluster.raft.persisted.Snapshot;
 import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor;
 import org.slf4j.LoggerFactory;
 
@@ -60,7 +67,7 @@ public class SnapshotManagerTest extends AbstractActorTest {
     private RaftActorBehavior mockRaftActorBehavior;
 
     @Mock
-    private Runnable mockProcedure;
+    private Consumer<Optional<OutputStream>> mockProcedure;
 
     @Mock
     private ElectionTerm mockElectionTerm;
@@ -95,7 +102,7 @@ public class SnapshotManagerTest extends AbstractActorTest {
         actorRef = factory.createTestActor(MessageCollectorActor.props(), factory.generateActorId("test-"));
         doReturn(actorRef).when(mockRaftActorContext).getActor();
 
-        snapshotManager.setCreateSnapshotRunnable(mockProcedure);
+        snapshotManager.setCreateSnapshotConsumer(mockProcedure);
     }
 
     @After
@@ -108,6 +115,7 @@ public class SnapshotManagerTest extends AbstractActorTest {
         assertEquals(false, snapshotManager.isCapturing());
     }
 
+    @SuppressWarnings({ "unchecked", "rawtypes" })
     @Test
     public void testCaptureToInstall() throws Exception {
 
@@ -117,7 +125,9 @@ public class SnapshotManagerTest extends AbstractActorTest {
 
         assertEquals(true, snapshotManager.isCapturing());
 
-        verify(mockProcedure).run();
+        ArgumentCaptor<Optional> outputStream = ArgumentCaptor.forClass(Optional.class);
+        verify(mockProcedure).accept(outputStream.capture());
+        assertEquals("isPresent", true, outputStream.getValue().isPresent());
 
         CaptureSnapshot captureSnapshot = snapshotManager.getCaptureSnapshot();
 
@@ -135,6 +145,7 @@ public class SnapshotManagerTest extends AbstractActorTest {
         actorRef.underlyingActor().clear();
     }
 
+    @SuppressWarnings({ "rawtypes", "unchecked" })
     @Test
     public void testCapture() throws Exception {
         boolean capture = snapshotManager.capture(new SimpleReplicatedLogEntry(9, 1,
@@ -144,7 +155,9 @@ public class SnapshotManagerTest extends AbstractActorTest {
 
         assertEquals(true, snapshotManager.isCapturing());
 
-        verify(mockProcedure).run();
+        ArgumentCaptor<Optional> outputStream = ArgumentCaptor.forClass(Optional.class);
+        verify(mockProcedure).accept(outputStream.capture());
+        assertEquals("isPresent", false, outputStream.getValue().isPresent());
 
         CaptureSnapshot captureSnapshot = snapshotManager.getCaptureSnapshot();
 
@@ -164,6 +177,7 @@ public class SnapshotManagerTest extends AbstractActorTest {
 
     }
 
+    @SuppressWarnings({ "unchecked", "rawtypes" })
     @Test
     public void testCaptureWithNullLastLogEntry() throws Exception {
         boolean capture = snapshotManager.capture(null, 1);
@@ -172,7 +186,9 @@ public class SnapshotManagerTest extends AbstractActorTest {
 
         assertEquals(true, snapshotManager.isCapturing());
 
-        verify(mockProcedure).run();
+        ArgumentCaptor<Optional> outputStream = ArgumentCaptor.forClass(Optional.class);
+        verify(mockProcedure).accept(outputStream.capture());
+        assertEquals("isPresent", false, outputStream.getValue().isPresent());
 
         CaptureSnapshot captureSnapshot = snapshotManager.getCaptureSnapshot();
 
@@ -193,7 +209,7 @@ public class SnapshotManagerTest extends AbstractActorTest {
 
     @Test
     public void testCaptureWithCreateProcedureError() throws Exception {
-        doThrow(new RuntimeException("mock")).when(mockProcedure).run();
+        doThrow(new RuntimeException("mock")).when(mockProcedure).accept(anyObject());
 
         boolean capture = snapshotManager.capture(new SimpleReplicatedLogEntry(9, 1,
                 new MockRaftActorContext.MockPayload()), 9);
@@ -202,9 +218,10 @@ public class SnapshotManagerTest extends AbstractActorTest {
 
         assertEquals(false, snapshotManager.isCapturing());
 
-        verify(mockProcedure).run();
+        verify(mockProcedure).accept(anyObject());
     }
 
+    @SuppressWarnings("unchecked")
     @Test
     public void testIllegalCapture() throws Exception {
         boolean capture = snapshotManager.capture(new SimpleReplicatedLogEntry(9, 1,
@@ -212,7 +229,7 @@ public class SnapshotManagerTest extends AbstractActorTest {
 
         assertTrue(capture);
 
-        verify(mockProcedure).run();
+        verify(mockProcedure).accept(anyObject());
 
         reset(mockProcedure);
 
@@ -222,11 +239,11 @@ public class SnapshotManagerTest extends AbstractActorTest {
 
         assertFalse(capture);
 
-        verify(mockProcedure, never()).run();
+        verify(mockProcedure, never()).accept(anyObject());
     }
 
     @Test
-    public void testPersistWhenReplicatedToAllIndexMinusOne() {
+    public void testPersistWhenReplicatedToAllIndexMinusOne() throws Exception {
         doReturn(7L).when(mockReplicatedLog).getSnapshotIndex();
         doReturn(1L).when(mockReplicatedLog).getSnapshotTerm();
 
@@ -245,8 +262,8 @@ public class SnapshotManagerTest extends AbstractActorTest {
         // when replicatedToAllIndex = -1
         snapshotManager.capture(lastLogEntry, -1);
 
-        byte[] bytes = new byte[] {1,2,3,4,5,6,7,8,9,10};
-        snapshotManager.persist(bytes, Runtime.getRuntime().totalMemory());
+        ByteState snapshotState = ByteState.of(new byte[] {1,2,3,4,5,6,7,8,9,10});
+        snapshotManager.persist(snapshotState, Optional.empty(), Runtime.getRuntime().totalMemory());
 
         ArgumentCaptor<Snapshot> snapshotArgumentCaptor = ArgumentCaptor.forClass(Snapshot.class);
         verify(mockDataPersistenceProvider).saveSnapshot(snapshotArgumentCaptor.capture());
@@ -257,7 +274,7 @@ public class SnapshotManagerTest extends AbstractActorTest {
         assertEquals("getLastIndex", 9L, snapshot.getLastIndex());
         assertEquals("getLastAppliedTerm", 2L, snapshot.getLastAppliedTerm());
         assertEquals("getLastAppliedIndex", 8L, snapshot.getLastAppliedIndex());
-        assertArrayEquals("getState", bytes, snapshot.getState());
+        assertEquals("getState", snapshotState, snapshot.getState());
         assertEquals("getUnAppliedEntries", Arrays.asList(lastLogEntry), snapshot.getUnAppliedEntries());
         assertEquals("electionTerm", mockElectionTerm.getCurrentTerm(), snapshot.getElectionTerm());
         assertEquals("electionVotedFor", mockElectionTerm.getVotedFor(), snapshot.getElectionVotedFor());
@@ -266,7 +283,7 @@ public class SnapshotManagerTest extends AbstractActorTest {
     }
 
     @Test
-    public void testPersistWhenReplicatedToAllIndexNotMinus() {
+    public void testPersistWhenReplicatedToAllIndexNotMinus() throws Exception {
         doReturn(45L).when(mockReplicatedLog).getSnapshotIndex();
         doReturn(6L).when(mockReplicatedLog).getSnapshotTerm();
         ReplicatedLogEntry replicatedLogEntry = mock(ReplicatedLogEntry.class);
@@ -277,8 +294,8 @@ public class SnapshotManagerTest extends AbstractActorTest {
         // when replicatedToAllIndex != -1
         snapshotManager.capture(new SimpleReplicatedLogEntry(9, 6, new MockRaftActorContext.MockPayload()), 9);
 
-        byte[] bytes = new byte[] {1,2,3,4,5,6,7,8,9,10};
-        snapshotManager.persist(bytes, Runtime.getRuntime().totalMemory());
+        ByteState snapshotState = ByteState.of(new byte[] {1,2,3,4,5,6,7,8,9,10});
+        snapshotManager.persist(snapshotState, Optional.empty(), Runtime.getRuntime().totalMemory());
 
         ArgumentCaptor<Snapshot> snapshotArgumentCaptor = ArgumentCaptor.forClass(Snapshot.class);
         verify(mockDataPersistenceProvider).saveSnapshot(snapshotArgumentCaptor.capture());
@@ -289,7 +306,7 @@ public class SnapshotManagerTest extends AbstractActorTest {
         assertEquals("getLastIndex", 9L, snapshot.getLastIndex());
         assertEquals("getLastAppliedTerm", 6L, snapshot.getLastAppliedTerm());
         assertEquals("getLastAppliedIndex", 9L, snapshot.getLastAppliedIndex());
-        assertArrayEquals("getState", bytes, snapshot.getState());
+        assertEquals("getState", snapshotState, snapshot.getState());
         assertEquals("getUnAppliedEntries size", 0, snapshot.getUnAppliedEntries().size());
 
         verify(mockReplicatedLog).snapshotPreCommit(9L, 6L);
@@ -304,7 +321,7 @@ public class SnapshotManagerTest extends AbstractActorTest {
         // when replicatedToAllIndex = -1
         snapshotManager.capture(new SimpleReplicatedLogEntry(9, 6, new MockRaftActorContext.MockPayload()), -1);
 
-        snapshotManager.persist(new byte[]{}, Runtime.getRuntime().totalMemory());
+        snapshotManager.persist(ByteState.empty(), Optional.empty(), Runtime.getRuntime().totalMemory());
 
         verify(mockDataPersistenceProvider).saveSnapshot(any(Snapshot.class));
 
@@ -330,7 +347,7 @@ public class SnapshotManagerTest extends AbstractActorTest {
         snapshotManager.capture(new SimpleReplicatedLogEntry(9, 6,
                 new MockRaftActorContext.MockPayload()), replicatedToAllIndex);
 
-        snapshotManager.persist(new byte[]{}, 2000000L);
+        snapshotManager.persist(ByteState.empty(), Optional.empty(), 2000000L);
 
         verify(mockDataPersistenceProvider).saveSnapshot(any(Snapshot.class));
 
@@ -339,9 +356,11 @@ public class SnapshotManagerTest extends AbstractActorTest {
         verify(mockRaftActorBehavior).setReplicatedToAllIndex(replicatedToAllIndex);
     }
 
+    @SuppressWarnings({ "rawtypes", "unchecked" })
     @Test
-    public void testPersistSendInstallSnapshot() {
+    public void testPersistSendInstallSnapshot() throws Exception {
         doReturn(Integer.MAX_VALUE).when(mockReplicatedLog).dataSize();
+        doNothing().when(mockProcedure).accept(anyObject());
 
         // when replicatedToAllIndex = -1
         boolean capture = snapshotManager.captureToInstall(new SimpleReplicatedLogEntry(9, 6,
@@ -349,9 +368,17 @@ public class SnapshotManagerTest extends AbstractActorTest {
 
         assertTrue(capture);
 
-        byte[] bytes = new byte[] {1,2,3,4,5,6,7,8,9,10};
+        ByteState snapshotState = ByteState.of(new byte[] {1,2,3,4,5,6,7,8,9,10});
 
-        snapshotManager.persist(bytes, Runtime.getRuntime().totalMemory());
+        ArgumentCaptor<Optional> installSnapshotStreamCapture = ArgumentCaptor.forClass(Optional.class);
+        verify(mockProcedure).accept(installSnapshotStreamCapture.capture());
+
+        Optional<OutputStream> installSnapshotStream = installSnapshotStreamCapture.getValue();
+        assertEquals("isPresent", true, installSnapshotStream.isPresent());
+
+        installSnapshotStream.get().write(snapshotState.getBytes());
+
+        snapshotManager.persist(snapshotState, installSnapshotStream, Runtime.getRuntime().totalMemory());
 
         assertEquals(true, snapshotManager.isCapturing());
 
@@ -366,12 +393,13 @@ public class SnapshotManagerTest extends AbstractActorTest {
 
         SendInstallSnapshot sendInstallSnapshot = sendInstallSnapshotArgumentCaptor.getValue();
 
-        assertTrue(Arrays.equals(bytes, sendInstallSnapshot.getSnapshot().getState()));
+        assertEquals("state", snapshotState, sendInstallSnapshot.getSnapshot().getState());
+        assertArrayEquals("state", snapshotState.getBytes(), sendInstallSnapshot.getSnapshotBytes().read());
     }
 
     @Test
     public void testCallingPersistWithoutCaptureWillDoNothing() {
-        snapshotManager.persist(new byte[]{}, Runtime.getRuntime().totalMemory());
+        snapshotManager.persist(ByteState.empty(), Optional.empty(), Runtime.getRuntime().totalMemory());
 
         verify(mockDataPersistenceProvider, never()).saveSnapshot(any(Snapshot.class));
 
@@ -385,18 +413,15 @@ public class SnapshotManagerTest extends AbstractActorTest {
         doReturn(Integer.MAX_VALUE).when(mockReplicatedLog).dataSize();
 
         // when replicatedToAllIndex = -1
-        snapshotManager.captureToInstall(new SimpleReplicatedLogEntry(9, 6,
-                new MockRaftActorContext.MockPayload()), -1, "follower-1");
+        snapshotManager.capture(new SimpleReplicatedLogEntry(9, 6, new MockRaftActorContext.MockPayload()), -1);
 
-        snapshotManager.persist(new byte[]{}, Runtime.getRuntime().totalMemory());
+        snapshotManager.persist(ByteState.empty(), Optional.empty(), Runtime.getRuntime().totalMemory());
 
-        snapshotManager.persist(new byte[]{}, Runtime.getRuntime().totalMemory());
+        snapshotManager.persist(ByteState.empty(), Optional.empty(), Runtime.getRuntime().totalMemory());
 
         verify(mockDataPersistenceProvider).saveSnapshot(any(Snapshot.class));
 
         verify(mockReplicatedLog).snapshotPreCommit(9L, 6L);
-
-        verify(mockRaftActorBehavior).handleMessage(any(ActorRef.class), any(SendInstallSnapshot.class));
     }
 
     @Test
@@ -404,10 +429,9 @@ public class SnapshotManagerTest extends AbstractActorTest {
         doReturn(50L).when(mockDataPersistenceProvider).getLastSequenceNumber();
 
         // when replicatedToAllIndex = -1
-        snapshotManager.captureToInstall(new SimpleReplicatedLogEntry(9, 6,
-                new MockRaftActorContext.MockPayload()), -1, "follower-1");
+        snapshotManager.capture(new SimpleReplicatedLogEntry(9, 6, new MockRaftActorContext.MockPayload()), -1);
 
-        snapshotManager.persist(new byte[]{}, Runtime.getRuntime().totalMemory());
+        snapshotManager.persist(ByteState.empty(), Optional.empty(), Runtime.getRuntime().totalMemory());
 
         assertEquals(true, snapshotManager.isCapturing());
 
@@ -433,8 +457,7 @@ public class SnapshotManagerTest extends AbstractActorTest {
     @Test
     public void testCommitBeforePersist() {
         // when replicatedToAllIndex = -1
-        snapshotManager.captureToInstall(new SimpleReplicatedLogEntry(9, 6,
-                new MockRaftActorContext.MockPayload()), -1, "follower-1");
+        snapshotManager.capture(new SimpleReplicatedLogEntry(9, 6, new MockRaftActorContext.MockPayload()), -1);
 
         snapshotManager.commit(100L, 0);
 
@@ -463,10 +486,9 @@ public class SnapshotManagerTest extends AbstractActorTest {
         doReturn(50L).when(mockDataPersistenceProvider).getLastSequenceNumber();
 
         // when replicatedToAllIndex = -1
-        snapshotManager.captureToInstall(new SimpleReplicatedLogEntry(9, 6,
-                new MockRaftActorContext.MockPayload()), -1, "follower-1");
+        snapshotManager.capture(new SimpleReplicatedLogEntry(9, 6, new MockRaftActorContext.MockPayload()), -1);
 
-        snapshotManager.persist(new byte[]{}, Runtime.getRuntime().totalMemory());
+        snapshotManager.persist(ByteState.empty(), Optional.empty(), Runtime.getRuntime().totalMemory());
 
         snapshotManager.commit(100L, 0);
 
@@ -482,10 +504,9 @@ public class SnapshotManagerTest extends AbstractActorTest {
     @Test
     public void testRollback() {
         // when replicatedToAllIndex = -1
-        snapshotManager.captureToInstall(new SimpleReplicatedLogEntry(9, 6,
-                new MockRaftActorContext.MockPayload()), -1, "follower-1");
+        snapshotManager.capture(new SimpleReplicatedLogEntry(9, 6, new MockRaftActorContext.MockPayload()), -1);
 
-        snapshotManager.persist(new byte[]{}, Runtime.getRuntime().totalMemory());
+        snapshotManager.persist(ByteState.empty(), Optional.empty(), Runtime.getRuntime().totalMemory());
 
         snapshotManager.rollback();
 
@@ -498,8 +519,7 @@ public class SnapshotManagerTest extends AbstractActorTest {
     @Test
     public void testRollbackBeforePersist() {
         // when replicatedToAllIndex = -1
-        snapshotManager.captureToInstall(new SimpleReplicatedLogEntry(9, 6,
-                new MockRaftActorContext.MockPayload()), -1, "follower-1");
+        snapshotManager.capture(new SimpleReplicatedLogEntry(9, 6, new MockRaftActorContext.MockPayload()), -1);
 
         snapshotManager.rollback();
 
@@ -516,10 +536,9 @@ public class SnapshotManagerTest extends AbstractActorTest {
     @Test
     public void testCallingRollbackMultipleTimesCausesNoHarm() {
         // when replicatedToAllIndex = -1
-        snapshotManager.captureToInstall(new SimpleReplicatedLogEntry(9, 6,
-                new MockRaftActorContext.MockPayload()), -1, "follower-1");
+        snapshotManager.capture(new SimpleReplicatedLogEntry(9, 6, new MockRaftActorContext.MockPayload()), -1);
 
-        snapshotManager.persist(new byte[]{}, Runtime.getRuntime().totalMemory());
+        snapshotManager.persist(ByteState.empty(), Optional.empty(), Runtime.getRuntime().totalMemory());
 
         snapshotManager.rollback();
 
@@ -622,8 +641,8 @@ public class SnapshotManagerTest extends AbstractActorTest {
 
     @Test
     public void testTrimLogAfterCaptureToInstall() {
-        boolean capture = snapshotManager.captureToInstall(new SimpleReplicatedLogEntry(9, 1,
-                new MockRaftActorContext.MockPayload()), 9, "follower-1");
+        boolean capture = snapshotManager.capture(new SimpleReplicatedLogEntry(9, 1,
+                new MockRaftActorContext.MockPayload()), 9);
 
         assertTrue(capture);
 
index 9cf4015108269aaf0370f6e70ea19523832781c1..b4a8c27b7c9215e0f8ac1406c3fa7ab33ad5f110 100644 (file)
@@ -25,6 +25,7 @@ import org.opendaylight.controller.cluster.raft.MockRaftActorContext.MockPayload
  *
  * @author Thomas Pantelis
  */
+@Deprecated
 public class SnapshotTest {
 
     @Test
index 8cb914c2267b51375ef819ce26f9ff0818882347..487a4fab900eabff0070239b5fa36330dfd48c0e 100644 (file)
@@ -29,8 +29,10 @@ import com.google.common.base.Stopwatch;
 import com.google.common.base.Throwables;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableMap;
+import com.google.common.io.ByteSource;
 import com.google.common.util.concurrent.Uninterruptibles;
 import com.google.protobuf.ByteString;
+import java.io.OutputStream;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
@@ -44,12 +46,11 @@ import org.junit.Test;
 import org.opendaylight.controller.cluster.raft.DefaultConfigParamsImpl;
 import org.opendaylight.controller.cluster.raft.MockRaftActor;
 import org.opendaylight.controller.cluster.raft.MockRaftActor.Builder;
+import org.opendaylight.controller.cluster.raft.MockRaftActor.MockSnapshotState;
 import org.opendaylight.controller.cluster.raft.MockRaftActorContext;
 import org.opendaylight.controller.cluster.raft.RaftActorContext;
 import org.opendaylight.controller.cluster.raft.RaftActorSnapshotCohort;
-import org.opendaylight.controller.cluster.raft.RaftActorTest;
 import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
-import org.opendaylight.controller.cluster.raft.Snapshot;
 import org.opendaylight.controller.cluster.raft.base.messages.ApplySnapshot;
 import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshotReply;
 import org.opendaylight.controller.cluster.raft.base.messages.ElectionTimeout;
@@ -63,9 +64,12 @@ import org.opendaylight.controller.cluster.raft.messages.RaftRPC;
 import org.opendaylight.controller.cluster.raft.messages.RequestVote;
 import org.opendaylight.controller.cluster.raft.messages.RequestVoteReply;
 import org.opendaylight.controller.cluster.raft.persisted.ApplyJournalEntries;
+import org.opendaylight.controller.cluster.raft.persisted.ByteState;
 import org.opendaylight.controller.cluster.raft.persisted.ServerConfigurationPayload;
 import org.opendaylight.controller.cluster.raft.persisted.ServerInfo;
 import org.opendaylight.controller.cluster.raft.persisted.SimpleReplicatedLogEntry;
+import org.opendaylight.controller.cluster.raft.persisted.Snapshot;
+import org.opendaylight.controller.cluster.raft.persisted.Snapshot.State;
 import org.opendaylight.controller.cluster.raft.persisted.UpdateElectionTerm;
 import org.opendaylight.controller.cluster.raft.policy.DisableElectionsRaftPolicy;
 import org.opendaylight.controller.cluster.raft.utils.InMemoryJournal;
@@ -809,7 +813,7 @@ public class FollowerTest extends AbstractRaftActorBehaviorTest<Follower> {
 
         follower = createBehavior(context);
 
-        ByteString bsSnapshot  = createSnapshot();
+        ByteString bsSnapshot = createSnapshot();
         int offset = 0;
         int snapshotLength = bsSnapshot.size();
         int chunkSize = 50;
@@ -838,7 +842,8 @@ public class FollowerTest extends AbstractRaftActorBehaviorTest<Follower> {
         assertEquals("getLastAppliedIndex", lastInstallSnapshot.getLastIncludedIndex(),
                 snapshot.getLastAppliedIndex());
         assertEquals("getLastTerm", lastInstallSnapshot.getLastIncludedTerm(), snapshot.getLastTerm());
-        Assert.assertArrayEquals("getState", bsSnapshot.toByteArray(), snapshot.getState());
+        assertEquals("getState type", ByteState.class, snapshot.getState().getClass());
+        Assert.assertArrayEquals("getState", bsSnapshot.toByteArray(), ((ByteState)snapshot.getState()).getBytes());
         assertEquals("getElectionTerm", 1, snapshot.getElectionTerm());
         assertEquals("getElectionVotedFor", "leader", snapshot.getElectionVotedFor());
         applySnapshot.getCallback().onSuccess();
@@ -1154,7 +1159,7 @@ public class FollowerTest extends AbstractRaftActorBehaviorTest<Follower> {
         assertEquals("Snapshot getLastTerm", 1, snapshot.getLastTerm());
         assertEquals("Snapshot getLastIndex", 1, snapshot.getLastIndex());
         assertEquals("Snapshot state", ImmutableList.of(entries.get(0).getData(), entries.get(1).getData()),
-                MockRaftActor.toObject(snapshot.getState()));
+                MockRaftActor.fromState(snapshot.getState()));
     }
 
     @Test
@@ -1208,7 +1213,7 @@ public class FollowerTest extends AbstractRaftActorBehaviorTest<Follower> {
         assertEquals("Snapshot getLastTerm", 1, snapshot.getLastTerm());
         assertEquals("Snapshot getLastIndex", 2, snapshot.getLastIndex());
         assertEquals("Snapshot state", ImmutableList.of(entries.get(0).getData(), entries.get(1).getData(),
-                entries.get(2).getData()), MockRaftActor.toObject(snapshot.getState()));
+                entries.get(2).getData()), MockRaftActor.fromState(snapshot.getState()));
 
         assertEquals("Journal size", 0, followerRaftActor.get().getReplicatedLog().size());
         assertEquals("Snapshot index", 2, followerRaftActor.get().getReplicatedLog().getSnapshotIndex());
@@ -1283,24 +1288,29 @@ public class FollowerTest extends AbstractRaftActorBehaviorTest<Follower> {
         assertEquals("Snapshot getLastTerm", 1, snapshot.getLastTerm());
         assertEquals("Snapshot getLastIndex", 2, snapshot.getLastIndex());
         assertEquals("Snapshot state", ImmutableList.of(entries.get(0).getData()),
-                MockRaftActor.toObject(snapshot.getState()));
+                MockRaftActor.fromState(snapshot.getState()));
     }
 
     @SuppressWarnings("checkstyle:IllegalCatch")
     private RaftActorSnapshotCohort newRaftActorSnapshotCohort(final AtomicReference<MockRaftActor> followerRaftActor) {
         RaftActorSnapshotCohort snapshotCohort = new RaftActorSnapshotCohort() {
             @Override
-            public void createSnapshot(ActorRef actorRef) {
+            public void createSnapshot(ActorRef actorRef, java.util.Optional<OutputStream> installSnapshotStream) {
                 try {
-                    actorRef.tell(new CaptureSnapshotReply(RaftActorTest.fromObject(
-                            followerRaftActor.get().getState()).toByteArray()), actorRef);
+                    actorRef.tell(new CaptureSnapshotReply(new MockSnapshotState(followerRaftActor.get().getState()),
+                            installSnapshotStream), actorRef);
                 } catch (Exception e) {
                     Throwables.propagate(e);
                 }
             }
 
             @Override
-            public void applySnapshot(byte[] snapshotBytes) {
+            public void applySnapshot(State snapshotState) {
+            }
+
+            @Override
+            public State deserializeSnapshot(ByteSource snapshotBytes) {
+                throw new UnsupportedOperationException();
             }
         };
         return snapshotCohort;
index fbfdea0249ea684585a4de01895701b0c6fcd154..99b647b000f3851e1777396dc56e196430dba562 100644 (file)
@@ -25,6 +25,7 @@ import akka.actor.Terminated;
 import akka.testkit.JavaTestKit;
 import akka.testkit.TestActorRef;
 import com.google.common.collect.ImmutableMap;
+import com.google.common.io.ByteSource;
 import com.google.common.util.concurrent.Uninterruptibles;
 import com.google.protobuf.ByteString;
 import java.util.Arrays;
@@ -43,7 +44,6 @@ import org.opendaylight.controller.cluster.raft.RaftActorLeadershipTransferCohor
 import org.opendaylight.controller.cluster.raft.RaftState;
 import org.opendaylight.controller.cluster.raft.RaftVersions;
 import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
-import org.opendaylight.controller.cluster.raft.Snapshot;
 import org.opendaylight.controller.cluster.raft.VotingState;
 import org.opendaylight.controller.cluster.raft.base.messages.ApplyState;
 import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshot;
@@ -52,6 +52,7 @@ import org.opendaylight.controller.cluster.raft.base.messages.Replicate;
 import org.opendaylight.controller.cluster.raft.base.messages.SendHeartBeat;
 import org.opendaylight.controller.cluster.raft.base.messages.SendInstallSnapshot;
 import org.opendaylight.controller.cluster.raft.base.messages.TimeoutNow;
+import org.opendaylight.controller.cluster.raft.behaviors.AbstractLeader.SnapshotHolder;
 import org.opendaylight.controller.cluster.raft.messages.AppendEntries;
 import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply;
 import org.opendaylight.controller.cluster.raft.messages.InstallSnapshot;
@@ -59,7 +60,9 @@ import org.opendaylight.controller.cluster.raft.messages.InstallSnapshotReply;
 import org.opendaylight.controller.cluster.raft.messages.RaftRPC;
 import org.opendaylight.controller.cluster.raft.messages.RequestVoteReply;
 import org.opendaylight.controller.cluster.raft.persisted.ApplyJournalEntries;
+import org.opendaylight.controller.cluster.raft.persisted.ByteState;
 import org.opendaylight.controller.cluster.raft.persisted.SimpleReplicatedLogEntry;
+import org.opendaylight.controller.cluster.raft.persisted.Snapshot;
 import org.opendaylight.controller.cluster.raft.policy.DefaultRaftPolicy;
 import org.opendaylight.controller.cluster.raft.policy.RaftPolicy;
 import org.opendaylight.controller.cluster.raft.utils.ForwardMessageToBehaviorActor;
@@ -577,8 +580,9 @@ public class LeaderTest extends AbstractLeaderTest<Leader> {
         leader.markFollowerActive(FOLLOWER_ID);
 
         ByteString bs = toByteString(leadersSnapshot);
-        leader.setSnapshot(Snapshot.create(bs.toByteArray(), Collections.<ReplicatedLogEntry>emptyList(),
-                commitIndex, snapshotTerm, commitIndex, snapshotTerm));
+        leader.setSnapshot(new SnapshotHolder(Snapshot.create(ByteState.of(bs.toByteArray()),
+                Collections.<ReplicatedLogEntry>emptyList(), commitIndex, snapshotTerm, commitIndex, snapshotTerm,
+                -1, null, null), ByteSource.wrap(bs.toByteArray())));
         LeaderInstallSnapshotState fts = new LeaderInstallSnapshotState(
                 actorContext.getConfigParams().getSnapshotChunkSize(), leader.logName());
         fts.setSnapshotBytes(bs);
@@ -700,7 +704,6 @@ public class LeaderTest extends AbstractLeaderTest<Leader> {
 
         CaptureSnapshot cs = actorContext.getSnapshotManager().getCaptureSnapshot();
 
-        assertTrue(cs.isInstallSnapshotInitiated());
         assertEquals(3, cs.getLastAppliedIndex());
         assertEquals(1, cs.getLastAppliedTerm());
         assertEquals(4, cs.getLastIndex());
@@ -763,7 +766,6 @@ public class LeaderTest extends AbstractLeaderTest<Leader> {
 
         CaptureSnapshot cs = actorContext.getSnapshotManager().getCaptureSnapshot();
 
-        assertTrue(cs.isInstallSnapshotInitiated());
         assertEquals(3, cs.getLastAppliedIndex());
         assertEquals(1, cs.getLastAppliedTerm());
         assertEquals(4, cs.getLastIndex());
@@ -810,11 +812,12 @@ public class LeaderTest extends AbstractLeaderTest<Leader> {
         leader.getFollower(FOLLOWER_ID).setMatchIndex(-1);
         leader.getFollower(FOLLOWER_ID).setNextIndex(0);
 
-        Snapshot snapshot = Snapshot.create(toByteString(leadersSnapshot).toByteArray(),
-                Collections.<ReplicatedLogEntry>emptyList(),
-                lastAppliedIndex, snapshotTerm, lastAppliedIndex, snapshotTerm);
+        byte[] bytes = toByteString(leadersSnapshot).toByteArray();
+        Snapshot snapshot = Snapshot.create(ByteState.of(bytes), Collections.<ReplicatedLogEntry>emptyList(),
+                lastAppliedIndex, snapshotTerm, lastAppliedIndex, snapshotTerm, -1, null, null);
 
-        RaftActorBehavior raftBehavior = leader.handleMessage(leaderActor, new SendInstallSnapshot(snapshot));
+        RaftActorBehavior raftBehavior = leader.handleMessage(leaderActor,
+                new SendInstallSnapshot(snapshot, ByteSource.wrap(bytes)));
 
         assertTrue(raftBehavior instanceof Leader);
 
@@ -861,11 +864,12 @@ public class LeaderTest extends AbstractLeaderTest<Leader> {
         leader.getFollower(FOLLOWER_ID).setMatchIndex(-1);
         leader.getFollower(FOLLOWER_ID).setNextIndex(-1);
 
-        Snapshot snapshot = Snapshot.create(toByteString(leadersSnapshot).toByteArray(),
-                Collections.<ReplicatedLogEntry>emptyList(),
-                lastAppliedIndex, snapshotTerm, lastAppliedIndex, snapshotTerm);
+        byte[] bytes = toByteString(leadersSnapshot).toByteArray();
+        Snapshot snapshot = Snapshot.create(ByteState.of(bytes), Collections.<ReplicatedLogEntry>emptyList(),
+                lastAppliedIndex, snapshotTerm, lastAppliedIndex, snapshotTerm, -1, null, null);
 
-        RaftActorBehavior raftBehavior = leader.handleMessage(leaderActor, new SendInstallSnapshot(snapshot));
+        RaftActorBehavior raftBehavior = leader.handleMessage(leaderActor,
+                new SendInstallSnapshot(snapshot, ByteSource.wrap(bytes)));
 
         assertTrue(raftBehavior instanceof Leader);
 
@@ -915,8 +919,9 @@ public class LeaderTest extends AbstractLeaderTest<Leader> {
         actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString());
 
         ByteString bs = toByteString(leadersSnapshot);
-        leader.setSnapshot(Snapshot.create(bs.toByteArray(), Collections.<ReplicatedLogEntry>emptyList(),
-                commitIndex, snapshotTerm, commitIndex, snapshotTerm));
+        leader.setSnapshot(new SnapshotHolder(Snapshot.create(ByteState.of(bs.toByteArray()),
+                Collections.<ReplicatedLogEntry>emptyList(), commitIndex, snapshotTerm, commitIndex, snapshotTerm,
+                -1, null, null), ByteSource.wrap(bs.toByteArray())));
         LeaderInstallSnapshotState fts = new LeaderInstallSnapshotState(
                 actorContext.getConfigParams().getSnapshotChunkSize(), leader.logName());
         fts.setSnapshotBytes(bs);
@@ -983,11 +988,11 @@ public class LeaderTest extends AbstractLeaderTest<Leader> {
         actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString());
 
         ByteString bs = toByteString(leadersSnapshot);
-        Snapshot snapshot = Snapshot.create(bs.toByteArray(), Collections.<ReplicatedLogEntry>emptyList(),
-                commitIndex, snapshotTerm, commitIndex, snapshotTerm);
-        leader.setSnapshot(snapshot);
+        Snapshot snapshot = Snapshot.create(ByteState.of(bs.toByteArray()),
+                Collections.<ReplicatedLogEntry>emptyList(), commitIndex, snapshotTerm, commitIndex, snapshotTerm,
+                -1, null, null);
 
-        leader.handleMessage(leaderActor, new SendInstallSnapshot(snapshot));
+        leader.handleMessage(leaderActor, new SendInstallSnapshot(snapshot, ByteSource.wrap(bs.toByteArray())));
 
         InstallSnapshot installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor,
                 InstallSnapshot.class);
@@ -1057,12 +1062,12 @@ public class LeaderTest extends AbstractLeaderTest<Leader> {
         actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString());
 
         ByteString bs = toByteString(leadersSnapshot);
-        Snapshot snapshot = Snapshot.create(bs.toByteArray(), Collections.<ReplicatedLogEntry>emptyList(),
-                commitIndex, snapshotTerm, commitIndex, snapshotTerm);
-        leader.setSnapshot(snapshot);
+        Snapshot snapshot = Snapshot.create(ByteState.of(bs.toByteArray()),
+                Collections.<ReplicatedLogEntry>emptyList(), commitIndex, snapshotTerm, commitIndex, snapshotTerm,
+                -1, null, null);
 
         Uninterruptibles.sleepUninterruptibly(1, TimeUnit.SECONDS);
-        leader.handleMessage(leaderActor, new SendInstallSnapshot(snapshot));
+        leader.handleMessage(leaderActor, new SendInstallSnapshot(snapshot, ByteSource.wrap(bs.toByteArray())));
 
         InstallSnapshot installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor,
                 InstallSnapshot.class);
@@ -1122,11 +1127,11 @@ public class LeaderTest extends AbstractLeaderTest<Leader> {
         actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString());
 
         ByteString bs = toByteString(leadersSnapshot);
-        Snapshot snapshot = Snapshot.create(bs.toByteArray(), Collections.<ReplicatedLogEntry>emptyList(),
-                commitIndex, snapshotTerm, commitIndex, snapshotTerm);
-        leader.setSnapshot(snapshot);
+        Snapshot snapshot = Snapshot.create(ByteState.of(bs.toByteArray()),
+                Collections.<ReplicatedLogEntry>emptyList(), commitIndex, snapshotTerm, commitIndex, snapshotTerm,
+                -1, null, null);
 
-        leader.handleMessage(leaderActor, new SendInstallSnapshot(snapshot));
+        leader.handleMessage(leaderActor, new SendInstallSnapshot(snapshot, ByteSource.wrap(bs.toByteArray())));
 
         InstallSnapshot installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor,
                 InstallSnapshot.class);
diff --git a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/persisted/ByteState.java b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/persisted/ByteState.java
new file mode 100644 (file)
index 0000000..42411a9
--- /dev/null
@@ -0,0 +1,70 @@
+/*
+ * Copyright (c) 2017 Brocade Communications Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.raft.persisted;
+
+import com.google.common.base.Preconditions;
+import java.util.Arrays;
+import javax.annotation.Nonnull;
+
+/**
+ * Snapshot State implementation backed by a byte[].
+ *
+ * @author Thomas Pantelis
+ */
+public class ByteState implements Snapshot.State {
+    private static final long serialVersionUID = 1L;
+
+    private final byte[] bytes;
+
+    private ByteState(@Nonnull byte[] bytes) {
+        this.bytes = Preconditions.checkNotNull(bytes);
+    }
+
+    public static ByteState of(@Nonnull byte[] bytes) {
+        return new ByteState(bytes);
+    }
+
+    public static ByteState empty() {
+        return new ByteState(new byte[0]);
+    }
+
+    public byte[] getBytes() {
+        return bytes;
+    }
+
+    @Override
+    public int hashCode() {
+        final int prime = 31;
+        int result = 1;
+        result = prime * result + Arrays.hashCode(bytes);
+        return result;
+    }
+
+    @Override
+    public boolean equals(Object obj) {
+        if (this == obj) {
+            return true;
+        }
+        if (obj == null) {
+            return false;
+        }
+        if (getClass() != obj.getClass()) {
+            return false;
+        }
+        ByteState other = (ByteState) obj;
+        if (!Arrays.equals(bytes, other.bytes)) {
+            return false;
+        }
+        return true;
+    }
+
+    @Override
+    public String toString() {
+        return "ByteState [bytes=" + Arrays.toString(bytes) + "]";
+    }
+}
diff --git a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/persisted/EmptyStateTest.java b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/persisted/EmptyStateTest.java
new file mode 100644 (file)
index 0000000..963580c
--- /dev/null
@@ -0,0 +1,28 @@
+/*
+ * Copyright (c) 2017 Brocade Communications Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.raft.persisted;
+
+import static org.junit.Assert.assertSame;
+
+import org.apache.commons.lang.SerializationUtils;
+import org.junit.Test;
+
+/**
+ * Unit tests for EmptyState.
+ *
+ * @author Thomas Pantelis
+ *
+ */
+public class EmptyStateTest {
+
+    @Test
+    public void testSerialization() {
+        EmptyState cloned = (EmptyState) SerializationUtils.clone(EmptyState.INSTANCE);
+        assertSame("cloned", EmptyState.INSTANCE, cloned);
+    }
+}
diff --git a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/persisted/SnapshotTest.java b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/persisted/SnapshotTest.java
new file mode 100644 (file)
index 0000000..19f0ec1
--- /dev/null
@@ -0,0 +1,59 @@
+/*
+ * Copyright (c) 2017 Brocade Communications Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.raft.persisted;
+
+import static org.junit.Assert.assertEquals;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import org.apache.commons.lang.SerializationUtils;
+import org.junit.Test;
+import org.opendaylight.controller.cluster.raft.MockRaftActorContext.MockPayload;
+import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
+
+/**
+ * Unit tests for Snapshot.
+ *
+ * @author Thomas Pantelis
+ */
+public class SnapshotTest {
+
+    @Test
+    public void testSerialization() throws Exception {
+        testSerialization(new byte[]{1, 2, 3, 4, 5, 6, 7}, Arrays.asList(
+                new SimpleReplicatedLogEntry(6, 2, new MockPayload("payload"))));
+        testSerialization(new byte[]{1, 2, 3, 4, 5, 6, 7, 8, 9}, Collections.emptyList());
+    }
+
+    private void testSerialization(byte[] state, List<ReplicatedLogEntry> unapplied) throws Exception {
+        long lastIndex = 6;
+        long lastTerm = 2;
+        long lastAppliedIndex = 5;
+        long lastAppliedTerm = 1;
+        long electionTerm = 3;
+        String electionVotedFor = "member-1";
+        ServerConfigurationPayload serverConfig = new ServerConfigurationPayload(Arrays.asList(
+                new ServerInfo("1", true), new ServerInfo("2", false)));
+
+        Snapshot expected = Snapshot.create(ByteState.of(state), unapplied, lastIndex, lastTerm, lastAppliedIndex,
+                lastAppliedTerm, electionTerm, electionVotedFor, serverConfig);
+        Snapshot cloned = (Snapshot) SerializationUtils.clone(expected);
+
+        assertEquals("lastIndex", expected.getLastIndex(), cloned.getLastIndex());
+        assertEquals("lastTerm", expected.getLastTerm(), cloned.getLastTerm());
+        assertEquals("lastAppliedIndex", expected.getLastAppliedIndex(), cloned.getLastAppliedIndex());
+        assertEquals("lastAppliedTerm", expected.getLastAppliedTerm(), cloned.getLastAppliedTerm());
+        assertEquals("unAppliedEntries", expected.getUnAppliedEntries(), cloned.getUnAppliedEntries());
+        assertEquals("electionTerm", expected.getElectionTerm(), cloned.getElectionTerm());
+        assertEquals("electionVotedFor", expected.getElectionVotedFor(), cloned.getElectionVotedFor());
+        assertEquals("state", expected.getState(), cloned.getState());
+        assertEquals("serverConfig", expected.getServerConfiguration().getServerConfig(),
+                cloned.getServerConfiguration().getServerConfig());
+    }
+}
index 5e0d3407fb99baa3d6ff39a598f0de2ecc70b975..e87f083fb1ad283190ba74aee77b48ce966bcb60 100644 (file)
@@ -10,9 +10,13 @@ package org.opendaylight.controller.cluster.datastore;
 import com.google.common.base.Preconditions;
 import com.google.common.base.Throwables;
 import java.io.File;
+import java.io.IOException;
 import org.opendaylight.controller.cluster.datastore.persisted.ShardDataTreeSnapshot;
+import org.opendaylight.controller.cluster.datastore.persisted.ShardSnapshotState;
 import org.opendaylight.controller.cluster.datastore.utils.NormalizedNodeXMLOutput;
 import org.opendaylight.controller.cluster.raft.RaftActorRecoveryCohort;
+import org.opendaylight.controller.cluster.raft.persisted.Snapshot;
+import org.opendaylight.controller.cluster.raft.persisted.Snapshot.State;
 import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload;
 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
 import org.slf4j.Logger;
@@ -86,24 +90,21 @@ class ShardRecoveryCoordinator implements RaftActorRecoveryCohort {
      */
     @Override
     @SuppressWarnings("checkstyle:IllegalCatch")
-    public void applyRecoverySnapshot(final byte[] snapshotBytes) {
-        log.debug("{}: Applying recovered snapshot", shardName);
-
-        final ShardDataTreeSnapshot snapshot;
-        try {
-            snapshot = ShardDataTreeSnapshot.deserialize(snapshotBytes);
-        } catch (Exception e) {
-            log.error("{}: failed to deserialize snapshot", shardName, e);
-            throw Throwables.propagate(e);
+    public void applyRecoverySnapshot(final Snapshot.State snapshotState) {
+        if (!(snapshotState instanceof ShardSnapshotState)) {
+            log.debug("{}: applyRecoverySnapshot ignoring snapshot: {}", snapshotState);
         }
 
+        log.debug("{}: Applying recovered snapshot", shardName);
+
+        ShardDataTreeSnapshot shardSnapshot = ((ShardSnapshotState)snapshotState).getSnapshot();
         try {
-            store.applyRecoverySnapshot(snapshot);
+            store.applyRecoverySnapshot(shardSnapshot);
         } catch (Exception e) {
-            final File f = writeRoot("snapshot", snapshot.getRootNode().orElse(null));
+            final File f = writeRoot("snapshot", shardSnapshot.getRootNode().orElse(null));
             throw new IllegalStateException(String.format(
                     "%s: Failed to apply recovery snapshot %s. Node data was written to file %s",
-                    shardName, snapshot, f), e);
+                    shardName, shardSnapshot, f), e);
         }
     }
 
@@ -111,4 +112,15 @@ class ShardRecoveryCoordinator implements RaftActorRecoveryCohort {
     public byte[] getRestoreFromSnapshot() {
         return restoreFromSnapshot;
     }
+
+    @Override
+    @Deprecated
+    public State deserializePreCarbonSnapshot(byte[] from) {
+        try {
+            return new ShardSnapshotState(ShardDataTreeSnapshot.deserialize(from));
+        } catch (IOException e) {
+            log.error("{}: failed to deserialize snapshot", shardName, e);
+            throw Throwables.propagate(e);
+        }
+    }
 }
index 6dc3f03081b71fe18c94e10dfb28d39c7d13d985..0627c0a9e552c1e81bf0e1c5b51177fa03517f0e 100644 (file)
@@ -10,7 +10,11 @@ package org.opendaylight.controller.cluster.datastore;
 import akka.actor.ActorContext;
 import akka.actor.ActorRef;
 import com.google.common.base.Preconditions;
+import com.google.common.io.ByteSource;
 import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.Optional;
 import org.opendaylight.controller.cluster.access.concepts.ClientIdentifier;
 import org.opendaylight.controller.cluster.access.concepts.FrontendIdentifier;
 import org.opendaylight.controller.cluster.access.concepts.FrontendType;
@@ -18,7 +22,10 @@ import org.opendaylight.controller.cluster.access.concepts.LocalHistoryIdentifie
 import org.opendaylight.controller.cluster.access.concepts.MemberName;
 import org.opendaylight.controller.cluster.datastore.actors.ShardSnapshotActor;
 import org.opendaylight.controller.cluster.datastore.persisted.ShardDataTreeSnapshot;
+import org.opendaylight.controller.cluster.datastore.persisted.ShardSnapshotState;
 import org.opendaylight.controller.cluster.raft.RaftActorSnapshotCohort;
+import org.opendaylight.controller.cluster.raft.persisted.Snapshot;
+import org.opendaylight.controller.cluster.raft.persisted.Snapshot.State;
 import org.slf4j.Logger;
 
 /**
@@ -56,28 +63,26 @@ class ShardSnapshotCohort implements RaftActorSnapshotCohort {
     }
 
     @Override
-    public void createSnapshot(final ActorRef actorRef) {
+    public void createSnapshot(final ActorRef actorRef, final Optional<OutputStream> installSnapshotStream) {
         // Forward the request to the snapshot actor
-        ShardSnapshotActor.requestSnapshot(snapshotActor, store.takeStateSnapshot(), actorRef);
+        ShardSnapshotActor.requestSnapshot(snapshotActor, store.takeStateSnapshot(), installSnapshotStream, actorRef);
     }
 
     @Override
     @SuppressWarnings("checkstyle:IllegalCatch")
-    public void applySnapshot(final byte[] snapshotBytes) {
+    public void applySnapshot(final Snapshot.State snapshotState) {
+        if (!(snapshotState instanceof ShardSnapshotState)) {
+            log.debug("{}: applySnapshot ignoring snapshot: {}", snapshotState);
+        }
+
+        final ShardDataTreeSnapshot snapshot = ((ShardSnapshotState)snapshotState).getSnapshot();
+
         // Since this will be done only on Recovery or when this actor is a Follower
         // we can safely commit everything in here. We not need to worry about event notifications
         // as they would have already been disabled on the follower
 
         log.info("{}: Applying snapshot", logId);
 
-        final ShardDataTreeSnapshot snapshot;
-        try {
-            snapshot = ShardDataTreeSnapshot.deserialize(snapshotBytes);
-        } catch (IOException e) {
-            log.error("{}: Failed to deserialize snapshot", logId, e);
-            return;
-        }
-
         try {
             store.applySnapshot(snapshot);
         } catch (Exception e) {
@@ -87,4 +92,11 @@ class ShardSnapshotCohort implements RaftActorSnapshotCohort {
 
         log.info("{}: Done applying snapshot", logId);
     }
+
+    @Override
+    public State deserializeSnapshot(ByteSource snapshotBytes) throws IOException {
+        try (final InputStream is = snapshotBytes.openStream()) {
+            return new ShardSnapshotState(ShardDataTreeSnapshot.deserialize(is));
+        }
+    }
 }
index 00bb424850027bd211b560edb025386da0894b00..e7529fb4bfcf3b9f1003fce0c03ae26fd943c3a6 100644 (file)
@@ -10,9 +10,15 @@ package org.opendaylight.controller.cluster.datastore.actors;
 import akka.actor.ActorRef;
 import akka.actor.Props;
 import com.google.common.base.Preconditions;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.util.Optional;
 import org.opendaylight.controller.cluster.common.actor.AbstractUntypedActorWithMetering;
 import org.opendaylight.controller.cluster.datastore.persisted.ShardDataTreeSnapshot;
+import org.opendaylight.controller.cluster.datastore.persisted.ShardSnapshotState;
 import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshotReply;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
  * This is an offload actor, which is given an isolated snapshot of the data tree. It performs the potentially
@@ -21,13 +27,18 @@ import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshotRep
  * @author Robert Varga
  */
 public final class ShardSnapshotActor extends AbstractUntypedActorWithMetering {
+    private static final Logger LOG = LoggerFactory.getLogger(ShardSnapshotActor.class);
+
     // Internal message
     private static final class SerializeSnapshot {
         private final ShardDataTreeSnapshot snapshot;
+        private final Optional<OutputStream> installSnapshotStream;
         private final ActorRef replyTo;
 
-        SerializeSnapshot(final ShardDataTreeSnapshot snapshot, final ActorRef replyTo) {
+        SerializeSnapshot(final ShardDataTreeSnapshot snapshot, final Optional<OutputStream> installSnapshotStream,
+                final ActorRef replyTo) {
             this.snapshot = Preconditions.checkNotNull(snapshot);
+            this.installSnapshotStream = Preconditions.checkNotNull(installSnapshotStream);
             this.replyTo = Preconditions.checkNotNull(replyTo);
         }
 
@@ -35,6 +46,10 @@ public final class ShardSnapshotActor extends AbstractUntypedActorWithMetering {
             return snapshot;
         }
 
+        Optional<OutputStream> getInstallSnapshotStream() {
+            return installSnapshotStream;
+        }
+
         ActorRef getReplyTo() {
             return replyTo;
         }
@@ -50,16 +65,39 @@ public final class ShardSnapshotActor extends AbstractUntypedActorWithMetering {
     @Override
     protected void handleReceive(final Object message) throws Exception {
         if (message instanceof SerializeSnapshot) {
-            final SerializeSnapshot request = (SerializeSnapshot) message;
-            request.getReplyTo().tell(new CaptureSnapshotReply(request.getSnapshot().serialize()), ActorRef.noSender());
+            onSerializeSnapshot((SerializeSnapshot) message);
         } else {
             unknownMessage(message);
         }
     }
 
-    public static void requestSnapshot(final ActorRef snapshotActor, ShardDataTreeSnapshot snapshot,
-            final ActorRef replyTo) {
-        snapshotActor.tell(new SerializeSnapshot(snapshot, replyTo), ActorRef.noSender());
+    private void onSerializeSnapshot(SerializeSnapshot request) {
+        Optional<OutputStream> installSnapshotStream = request.getInstallSnapshotStream();
+        if (installSnapshotStream.isPresent()) {
+            try {
+                request.getSnapshot().serialize(installSnapshotStream.get());
+            } catch (IOException e) {
+                // TODO - we should communicate the failure in the CaptureSnapshotReply.
+                LOG.error("Error serializing snapshot", e);
+            }
+        }
+
+        request.getReplyTo().tell(new CaptureSnapshotReply(new ShardSnapshotState(request.getSnapshot()),
+                installSnapshotStream), ActorRef.noSender());
+    }
+
+    /**
+     * Sends a request to a ShardSnapshotActor to process a snapshot and send a CaptureSnapshotReply.
+     *
+     * @param snapshotActor the ShardSnapshotActor
+     * @param snapshot the snapshot to process
+     * @param installSnapshotStream Optional OutputStream that is present if the snapshot is to also be installed
+     *        on a follower.
+     * @param replyTo the actor to which to send the CaptureSnapshotReply
+     */
+    public static void requestSnapshot(final ActorRef snapshotActor, final ShardDataTreeSnapshot snapshot,
+            final Optional<OutputStream> installSnapshotStream, final ActorRef replyTo) {
+        snapshotActor.tell(new SerializeSnapshot(snapshot, installSnapshotStream, replyTo), ActorRef.noSender());
     }
 
     public static Props props() {
index 48d267342149e3bbcef9a4b3bc21123701c1510b..1d9b58f27049046fdee8f62d2e690e2344439821 100644 (file)
@@ -8,12 +8,12 @@
 package org.opendaylight.controller.cluster.datastore.persisted;
 
 import com.google.common.base.Verify;
-import java.io.ByteArrayOutputStream;
 import java.io.DataInputStream;
 import java.io.DataOutputStream;
 import java.io.IOException;
 import java.io.ObjectInputStream;
 import java.io.ObjectOutputStream;
+import java.io.OutputStream;
 import java.util.Optional;
 import javax.annotation.Nonnull;
 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
@@ -93,15 +93,11 @@ abstract class AbstractVersionedShardDataTreeSnapshot extends ShardDataTreeSnaps
     }
 
     @Override
-    public final byte[] serialize() throws IOException {
-        try (final ByteArrayOutputStream bos = new ByteArrayOutputStream()) {
-            try (final DataOutputStream dos = new DataOutputStream(bos)) {
-                final PayloadVersion version = version();
-                version.writeTo(dos);
-                versionedSerialize(dos, version);
-            }
-
-            return bos.toByteArray();
+    public void serialize(final OutputStream os) throws IOException {
+        try (final DataOutputStream dos = new DataOutputStream(os)) {
+            final PayloadVersion version = version();
+            version.writeTo(dos);
+            versionedSerialize(dos, version);
         }
     }
 }
index 3aca7f2314da6e469d495961c8a063a965508b30..7be0d85c7a2f00cf557fc17ff379bbab4ecdb2dd 100644 (file)
@@ -8,6 +8,9 @@
 package org.opendaylight.controller.cluster.datastore.persisted;
 
 import com.google.common.annotations.Beta;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.io.OutputStream;
 import java.util.Optional;
 import javax.annotation.Nullable;
 import org.opendaylight.controller.cluster.datastore.node.utils.stream.SerializationUtils;
@@ -33,7 +36,9 @@ public final class PreBoronShardDataTreeSnapshot extends ShardDataTreeSnapshot {
     }
 
     @Override
-    public byte[] serialize() {
-        return SerializationUtils.serializeNormalizedNode(rootNode);
+    public void serialize(OutputStream os) throws IOException {
+        try (final DataOutputStream dos = new DataOutputStream(os)) {
+            SerializationUtils.serializeNormalizedNode(rootNode, dos);
+        }
     }
-}
\ No newline at end of file
+}
index 6cb047fc375552e8bd77af204ca5bc7196898e00..7b8382e9c2536bf44b6dea553e8b81f5c855aeeb 100644 (file)
@@ -12,8 +12,8 @@ import java.io.ByteArrayInputStream;
 import java.io.DataInputStream;
 import java.io.IOException;
 import java.io.InputStream;
+import java.io.OutputStream;
 import java.util.Optional;
-import javax.annotation.Nonnull;
 import org.opendaylight.controller.cluster.datastore.node.utils.stream.SerializationUtils;
 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
 import org.slf4j.Logger;
@@ -32,6 +32,7 @@ public abstract class ShardDataTreeSnapshot {
         // Hidden to prevent subclassing from outside of this package
     }
 
+    @Deprecated
     public static ShardDataTreeSnapshot deserialize(final byte[] bytes) throws IOException {
         /**
          * Unfortunately versions prior to Boron did not include any way to evolve the snapshot format and contained
@@ -50,18 +51,7 @@ public abstract class ShardDataTreeSnapshot {
 
         try {
             try (final InputStream is = new ByteArrayInputStream(bytes)) {
-                try (final DataInputStream dis = new DataInputStream(is)) {
-                    final ShardDataTreeSnapshot ret = AbstractVersionedShardDataTreeSnapshot.deserialize(dis);
-
-                    // Make sure we consume all bytes, otherwise something went very wrong
-                    final int bytesLeft = dis.available();
-                    if (bytesLeft != 0) {
-                        throw new IOException("Deserialization left " + bytesLeft + " in the buffer");
-                    }
-
-
-                    return ret;
-                }
+                return deserialize(is);
             }
         } catch (IOException e) {
             LOG.debug("Failed to deserialize versioned stream, attempting pre-Lithium ProtoBuf", e);
@@ -69,6 +59,21 @@ public abstract class ShardDataTreeSnapshot {
         }
     }
 
+    public static ShardDataTreeSnapshot deserialize(final InputStream is) throws IOException {
+        try (final DataInputStream dis = new DataInputStream(is)) {
+            final ShardDataTreeSnapshot ret = AbstractVersionedShardDataTreeSnapshot.deserialize(dis);
+
+            // Make sure we consume all bytes, otherwise something went very wrong
+            final int bytesLeft = dis.available();
+            if (bytesLeft != 0) {
+                throw new IOException("Deserialization left " + bytesLeft + " in the buffer");
+            }
+
+
+            return ret;
+        }
+    }
+
     /**
      * Get the root data node contained in this snapshot.
      *
@@ -76,14 +81,9 @@ public abstract class ShardDataTreeSnapshot {
      */
     public abstract Optional<NormalizedNode<?, ?>> getRootNode();
 
-    /**
-     * Serialize this snapshot into a byte array for persistence.
-     *
-     * @return Serialized snapshot
-     * @throws IOException when a serialization problem occurs
-     */
-    public abstract @Nonnull byte[] serialize() throws IOException;
+    public abstract void serialize(final OutputStream os) throws IOException;
 
+    @Deprecated
     private static boolean isLegacyStream(final byte[] bytes) {
         if (bytes.length < 2) {
             // Versioned streams have at least two bytes
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/persisted/ShardSnapshotState.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/persisted/ShardSnapshotState.java
new file mode 100644 (file)
index 0000000..d4556cd
--- /dev/null
@@ -0,0 +1,104 @@
+/*
+ * Copyright (c) 2017 Brocade Communications Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.datastore.persisted;
+
+import com.google.common.base.Preconditions;
+import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
+import java.io.Externalizable;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.ObjectInput;
+import java.io.ObjectOutput;
+import java.io.OutputStream;
+import javax.annotation.Nonnull;
+import org.opendaylight.controller.cluster.raft.persisted.Snapshot;
+
+/**
+ * Encapsulates the snapshot State for a Shard.
+ *
+ * @author Thomas Pantelis
+ */
+public class ShardSnapshotState implements Snapshot.State {
+    private static final long serialVersionUID = 1L;
+
+    private static final class Proxy implements Externalizable {
+        private static final long serialVersionUID = 1L;
+
+        private ShardSnapshotState snapshotState;
+
+        // checkstyle flags the public modifier as redundant which really doesn't make sense since it clearly isn't
+        // redundant. It is explicitly needed for Java serialization to be able to create instances via reflection.
+        @SuppressWarnings("checkstyle:RedundantModifier")
+        public Proxy() {
+            // For Externalizable
+        }
+
+        Proxy(final ShardSnapshotState snapshotState) {
+            this.snapshotState = snapshotState;
+        }
+
+        @Override
+        public void writeExternal(final ObjectOutput out) throws IOException {
+            snapshotState.snapshot.serialize(toOutputStream(out));
+        }
+
+        private OutputStream toOutputStream(final ObjectOutput out) {
+            if (out instanceof OutputStream) {
+                return (OutputStream) out;
+            }
+
+            return new OutputStream() {
+                @Override
+                public void write(int value) throws IOException {
+                    out.write(value);
+                }
+            };
+        }
+
+        @Override
+        public void readExternal(final ObjectInput in) throws IOException, ClassNotFoundException {
+            snapshotState = new ShardSnapshotState(ShardDataTreeSnapshot.deserialize(toInputStream(in)));
+        }
+
+        private InputStream toInputStream(final ObjectInput in) {
+            if (in instanceof InputStream) {
+                return (InputStream) in;
+            }
+
+            return new InputStream() {
+                @Override
+                public int read() throws IOException {
+                    return in.read();
+                }
+            };
+        }
+
+        private Object readResolve() {
+            return snapshotState;
+        }
+    }
+
+    @SuppressFBWarnings(value = "SE_BAD_FIELD", justification = "This field is not Serializable but this class "
+            + "implements writeReplace to delegate serialization to a Proxy class and thus instances of this class "
+            + "aren't serialized. FindBugs does not recognize this.")
+    private final ShardDataTreeSnapshot snapshot;
+
+    public ShardSnapshotState(@Nonnull ShardDataTreeSnapshot snapshot) {
+        this.snapshot = Preconditions.checkNotNull(snapshot);
+    }
+
+    @Nonnull
+    public ShardDataTreeSnapshot getSnapshot() {
+        return snapshot;
+    }
+
+    @SuppressWarnings("static-method")
+    private Object writeReplace() {
+        return new Proxy(this);
+    }
+}
index 3de3e9ac222f9a8e429c94dbc16f90a9e1a1aa0a..e73fa08970cb12e1cb70afb463091e5fb692b9a4 100644 (file)
@@ -56,9 +56,10 @@ import org.opendaylight.controller.cluster.datastore.modification.MutableComposi
 import org.opendaylight.controller.cluster.datastore.modification.WriteModification;
 import org.opendaylight.controller.cluster.datastore.persisted.CommitTransactionPayload;
 import org.opendaylight.controller.cluster.datastore.persisted.PreBoronShardDataTreeSnapshot;
+import org.opendaylight.controller.cluster.datastore.persisted.ShardSnapshotState;
 import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
-import org.opendaylight.controller.cluster.raft.Snapshot;
 import org.opendaylight.controller.cluster.raft.TestActorFactory;
+import org.opendaylight.controller.cluster.raft.persisted.Snapshot;
 import org.opendaylight.controller.cluster.raft.utils.InMemoryJournal;
 import org.opendaylight.controller.cluster.raft.utils.InMemorySnapshotStore;
 import org.opendaylight.controller.md.cluster.datastore.model.CarsModel;
@@ -349,8 +350,8 @@ public abstract class AbstractShardTest extends AbstractActorTest {
         final NormalizedNode<?, ?> root = readStore(testStore, YangInstanceIdentifier.EMPTY);
 
         InMemorySnapshotStore.addSnapshot(shardID.toString(), Snapshot.create(
-                new PreBoronShardDataTreeSnapshot(root).serialize(),
-                Collections.<ReplicatedLogEntry>emptyList(), 0, 1, -1, -1));
+                new ShardSnapshotState(new PreBoronShardDataTreeSnapshot(root)),
+                Collections.<ReplicatedLogEntry>emptyList(), 0, 1, -1, -1, 1, null, null));
         return testStore;
     }
 
index cb2284a30c6c7b103d86fea1c12d0881d37e1e86..1b885c467f2d9f428fd393d399b80b94fa73c478 100644 (file)
@@ -30,6 +30,7 @@ import com.google.common.util.concurrent.ListenableFuture;
 import com.google.common.util.concurrent.MoreExecutors;
 import com.google.common.util.concurrent.Uninterruptibles;
 import com.typesafe.config.ConfigFactory;
+import java.io.ByteArrayOutputStream;
 import java.io.IOException;
 import java.math.BigInteger;
 import java.util.ArrayList;
@@ -52,10 +53,12 @@ import org.opendaylight.controller.cluster.datastore.messages.DatastoreSnapshot;
 import org.opendaylight.controller.cluster.datastore.messages.FindLocalShard;
 import org.opendaylight.controller.cluster.datastore.messages.LocalShardFound;
 import org.opendaylight.controller.cluster.datastore.persisted.MetadataShardDataTreeSnapshot;
+import org.opendaylight.controller.cluster.datastore.persisted.ShardSnapshotState;
 import org.opendaylight.controller.cluster.datastore.utils.MockDataChangeListener;
 import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
-import org.opendaylight.controller.cluster.raft.Snapshot;
+import org.opendaylight.controller.cluster.raft.persisted.Snapshot;
 import org.opendaylight.controller.cluster.raft.utils.InMemoryJournal;
+import org.opendaylight.controller.cluster.raft.utils.InMemorySnapshotStore;
 import org.opendaylight.controller.md.cluster.datastore.model.CarsModel;
 import org.opendaylight.controller.md.cluster.datastore.model.PeopleModel;
 import org.opendaylight.controller.md.cluster.datastore.model.SchemaContextHelper;
@@ -180,7 +183,6 @@ public class DistributedDataStoreIntegrationTest {
 
     @Test
     public void testReadWriteTransactionWithSingleShard() throws Exception {
-        System.setProperty("shard.persistent", "true");
         new IntegrationTestKit(getSystem(), datastoreContextBuilder) {
             {
                 try (AbstractDataStore dataStore = setupDistributedDataStore(
@@ -1255,8 +1257,9 @@ public class DistributedDataStoreIntegrationTest {
                 AbstractShardTest.writeToStore(dataTree, CarsModel.BASE_PATH, carsNode);
                 NormalizedNode<?, ?> root = AbstractShardTest.readStore(dataTree, YangInstanceIdentifier.EMPTY);
 
-                final Snapshot carsSnapshot = Snapshot.create(new MetadataShardDataTreeSnapshot(root).serialize(),
-                        Collections.<ReplicatedLogEntry>emptyList(), 2, 1, 2, 1, 1, "member-1");
+                final Snapshot carsSnapshot = Snapshot.create(
+                        new ShardSnapshotState(new MetadataShardDataTreeSnapshot(root)),
+                        Collections.<ReplicatedLogEntry>emptyList(), 2, 1, 2, 1, 1, "member-1", null);
 
                 NormalizedNode<?, ?> peopleNode = PeopleModel.create();
                 dataTree = InMemoryDataTreeFactory.getInstance().create(TreeType.OPERATIONAL);
@@ -1264,8 +1267,9 @@ public class DistributedDataStoreIntegrationTest {
                 AbstractShardTest.writeToStore(dataTree, PeopleModel.BASE_PATH, peopleNode);
                 root = AbstractShardTest.readStore(dataTree, YangInstanceIdentifier.EMPTY);
 
-                Snapshot peopleSnapshot = Snapshot.create(new MetadataShardDataTreeSnapshot(root).serialize(),
-                        Collections.<ReplicatedLogEntry>emptyList(), 2, 1, 2, 1, 1, "member-1");
+                Snapshot peopleSnapshot = Snapshot.create(
+                        new ShardSnapshotState(new MetadataShardDataTreeSnapshot(root)),
+                        Collections.<ReplicatedLogEntry>emptyList(), 2, 1, 2, 1, 1, "member-1", null);
 
                 restoreFromSnapshot = new DatastoreSnapshot(name, null,
                         Arrays.asList(
@@ -1290,4 +1294,41 @@ public class DistributedDataStoreIntegrationTest {
             }
         };
     }
+
+    @Test
+    @Deprecated
+    public void testRecoveryFromPreCarbonSnapshot() throws Exception {
+        new IntegrationTestKit(getSystem(), datastoreContextBuilder) {
+            {
+                final String name = "testRecoveryFromPreCarbonSnapshot";
+
+                ContainerNode carsNode = CarsModel.newCarsNode(
+                        CarsModel.newCarsMapNode(CarsModel.newCarEntry("optima", BigInteger.valueOf(20000L)),
+                                CarsModel.newCarEntry("sportage", BigInteger.valueOf(30000L))));
+
+                DataTree dataTree = InMemoryDataTreeFactory.getInstance().create(TreeType.OPERATIONAL);
+                dataTree.setSchemaContext(SchemaContextHelper.full());
+                AbstractShardTest.writeToStore(dataTree, CarsModel.BASE_PATH, carsNode);
+                NormalizedNode<?, ?> root = AbstractShardTest.readStore(dataTree, YangInstanceIdentifier.EMPTY);
+
+                final ByteArrayOutputStream bos = new ByteArrayOutputStream();
+                new MetadataShardDataTreeSnapshot(root).serialize(bos);
+                final org.opendaylight.controller.cluster.raft.Snapshot snapshot =
+                    org.opendaylight.controller.cluster.raft.Snapshot.create(bos.toByteArray(),
+                            Collections.<ReplicatedLogEntry>emptyList(), 2, 1, 2, 1, 1, "member-1", null);
+
+                InMemorySnapshotStore.addSnapshot("member-1-shard-cars-" + name, snapshot);
+
+                try (AbstractDataStore dataStore = setupDistributedDataStore(name, "module-shards-member1.conf",
+                        true, "cars")) {
+
+                    DOMStoreReadTransaction readTx = dataStore.newReadOnlyTransaction();
+
+                    Optional<NormalizedNode<?, ?>> optional = readTx.read(CarsModel.BASE_PATH).get(5, TimeUnit.SECONDS);
+                    assertEquals("isPresent", true, optional.isPresent());
+                    assertEquals("Data node", carsNode, optional.get());
+                }
+            }
+        };
+    }
 }
index ff44f01402f33f3f57f0bab01d169e78d3eeab1d..501da5c7ebbb6d7be496601b5a4846b1b00b435e 100644 (file)
@@ -36,6 +36,7 @@ import com.google.common.util.concurrent.Uninterruptibles;
 import com.typesafe.config.ConfigFactory;
 import java.math.BigInteger;
 import java.util.Arrays;
+import java.util.Collections;
 import java.util.LinkedList;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
@@ -56,10 +57,14 @@ import org.opendaylight.controller.cluster.datastore.messages.ReadyLocalTransact
 import org.opendaylight.controller.cluster.datastore.messages.ReadyTransactionReply;
 import org.opendaylight.controller.cluster.datastore.modification.MergeModification;
 import org.opendaylight.controller.cluster.datastore.modification.WriteModification;
+import org.opendaylight.controller.cluster.datastore.persisted.MetadataShardDataTreeSnapshot;
+import org.opendaylight.controller.cluster.datastore.persisted.ShardSnapshotState;
 import org.opendaylight.controller.cluster.raft.client.messages.Shutdown;
 import org.opendaylight.controller.cluster.raft.persisted.ApplyJournalEntries;
+import org.opendaylight.controller.cluster.raft.persisted.Snapshot;
 import org.opendaylight.controller.cluster.raft.policy.DisableElectionsRaftPolicy;
 import org.opendaylight.controller.cluster.raft.utils.InMemoryJournal;
+import org.opendaylight.controller.cluster.raft.utils.InMemorySnapshotStore;
 import org.opendaylight.controller.md.cluster.datastore.model.CarsModel;
 import org.opendaylight.controller.md.cluster.datastore.model.PeopleModel;
 import org.opendaylight.controller.md.cluster.datastore.model.SchemaContextHelper;
@@ -131,6 +136,9 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest {
 
     @Before
     public void setUp() {
+        InMemoryJournal.clear();
+        InMemorySnapshotStore.clear();
+
         leaderSystem = ActorSystem.create("cluster-test", ConfigFactory.load().getConfig("Member1"));
         Cluster.get(leaderSystem).join(MEMBER_1_ADDRESS);
 
@@ -153,6 +161,9 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest {
         JavaTestKit.shutdownActorSystem(leaderSystem);
         JavaTestKit.shutdownActorSystem(followerSystem);
         JavaTestKit.shutdownActorSystem(follower2System);
+
+        InMemoryJournal.clear();
+        InMemorySnapshotStore.clear();
     }
 
     private void initDatastoresWithCars(final String type) {
@@ -1012,6 +1023,56 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest {
         }
     }
 
+    @Test
+    public void testInstallSnapshot() throws Exception {
+        final String testName = "testInstallSnapshot";
+        final String leaderCarShardName = "member-1-shard-cars-" + testName;
+        final String followerCarShardName = "member-2-shard-cars-" + testName;
+
+        // Setup a saved snapshot on the leader. The follower will startup with no data and the leader should
+        // install a snapshot to sync the follower.
+
+        TipProducingDataTree tree = InMemoryDataTreeFactory.getInstance().create(TreeType.CONFIGURATION);
+        tree.setSchemaContext(SchemaContextHelper.full());
+
+        ContainerNode carsNode = CarsModel.newCarsNode(
+                CarsModel.newCarsMapNode(CarsModel.newCarEntry("optima", BigInteger.valueOf(20000))));
+        AbstractShardTest.writeToStore(tree, CarsModel.BASE_PATH, carsNode);
+
+        NormalizedNode<?, ?> snapshotRoot = AbstractShardTest.readStore(tree, YangInstanceIdentifier.EMPTY);
+        Snapshot initialSnapshot = Snapshot.create(
+                new ShardSnapshotState(new MetadataShardDataTreeSnapshot(snapshotRoot)),
+                Collections.emptyList(), 5, 1, 5, 1, 1, null, null);
+        InMemorySnapshotStore.addSnapshot(leaderCarShardName, initialSnapshot);
+
+        InMemorySnapshotStore.addSnapshotSavedLatch(leaderCarShardName);
+        InMemorySnapshotStore.addSnapshotSavedLatch(followerCarShardName);
+
+        initDatastoresWithCars(testName);
+
+        Optional<NormalizedNode<?, ?>> readOptional = leaderDistributedDataStore.newReadOnlyTransaction().read(
+                CarsModel.BASE_PATH).checkedGet(5, TimeUnit.SECONDS);
+        assertEquals("isPresent", true, readOptional.isPresent());
+        assertEquals("Node", carsNode, readOptional.get());
+
+        verifySnapshot(InMemorySnapshotStore.waitForSavedSnapshot(leaderCarShardName, Snapshot.class),
+                initialSnapshot, snapshotRoot);
+
+        verifySnapshot(InMemorySnapshotStore.waitForSavedSnapshot(followerCarShardName, Snapshot.class),
+                initialSnapshot, snapshotRoot);
+    }
+
+    private static void verifySnapshot(Snapshot actual, Snapshot expected, NormalizedNode<?, ?> expRoot) {
+        assertEquals("Snapshot getLastAppliedTerm", expected.getLastAppliedTerm(), actual.getLastAppliedTerm());
+        assertEquals("Snapshot getLastAppliedIndex", expected.getLastAppliedIndex(), actual.getLastAppliedIndex());
+        assertEquals("Snapshot getLastTerm", expected.getLastTerm(), actual.getLastTerm());
+        assertEquals("Snapshot getLastIndex", expected.getLastIndex(), actual.getLastIndex());
+        assertEquals("Snapshot state type", ShardSnapshotState.class, actual.getState().getClass());
+        MetadataShardDataTreeSnapshot shardSnapshot =
+                (MetadataShardDataTreeSnapshot) ((ShardSnapshotState)actual.getState()).getSnapshot();
+        assertEquals("Snapshot root node", expRoot, shardSnapshot.getRootNode().get());
+    }
+
     private static void sendDatastoreContextUpdate(final AbstractDataStore dataStore, final Builder builder) {
         final Builder newBuilder = DatastoreContext.newBuilderFrom(builder.build());
         DatastoreContextFactory mockContextFactory = Mockito.mock(DatastoreContextFactory.class);
index 262758f4aacdf9c4239659f0014b2cafb9ea580f..736e29170364bf0af6a6f3ecdffcdcd69125311d 100644 (file)
@@ -17,7 +17,8 @@ import org.junit.Before;
 import org.junit.Test;
 import org.mockito.Mockito;
 import org.opendaylight.controller.cluster.datastore.persisted.CommitTransactionPayload;
-import org.opendaylight.controller.cluster.datastore.persisted.PreBoronShardDataTreeSnapshot;
+import org.opendaylight.controller.cluster.datastore.persisted.MetadataShardDataTreeSnapshot;
+import org.opendaylight.controller.cluster.datastore.persisted.ShardSnapshotState;
 import org.opendaylight.controller.md.cluster.datastore.model.CarsModel;
 import org.opendaylight.controller.md.cluster.datastore.model.PeopleModel;
 import org.opendaylight.controller.md.cluster.datastore.model.SchemaContextHelper;
@@ -133,7 +134,7 @@ public class ShardRecoveryCoordinatorTest extends AbstractTest {
         return shardDataTree.readNode(PeopleModel.BASE_PATH);
     }
 
-    private static byte[] createSnapshot() {
+    private static ShardSnapshotState createSnapshot() {
         final TipProducingDataTree dataTree = InMemoryDataTreeFactory.getInstance().create(TreeType.OPERATIONAL);
         dataTree.setSchemaContext(SchemaContextHelper.select(SchemaContextHelper.CARS_YANG,
                 SchemaContextHelper.PEOPLE_YANG));
@@ -147,7 +148,7 @@ public class ShardRecoveryCoordinatorTest extends AbstractTest {
         modification.ready();
         dataTree.commit(dataTree.prepare(modification));
 
-        return new PreBoronShardDataTreeSnapshot(dataTree.takeSnapshot().readNode(YangInstanceIdentifier.EMPTY).get())
-                .serialize();
+        return new ShardSnapshotState(new MetadataShardDataTreeSnapshot(dataTree.takeSnapshot().readNode(
+                YangInstanceIdentifier.EMPTY).get()));
     }
 }
index ab07b667b2d405f566b9902aa377e3022a6634fa..2edbff2380b5ba96b7622a97a5a453cb30976898 100644 (file)
@@ -77,14 +77,13 @@ import org.opendaylight.controller.cluster.datastore.messages.UpdateSchemaContex
 import org.opendaylight.controller.cluster.datastore.modification.MergeModification;
 import org.opendaylight.controller.cluster.datastore.modification.WriteModification;
 import org.opendaylight.controller.cluster.datastore.persisted.MetadataShardDataTreeSnapshot;
-import org.opendaylight.controller.cluster.datastore.persisted.ShardDataTreeSnapshot;
+import org.opendaylight.controller.cluster.datastore.persisted.ShardSnapshotState;
 import org.opendaylight.controller.cluster.datastore.utils.MockDataChangeListener;
 import org.opendaylight.controller.cluster.datastore.utils.MockDataTreeChangeListener;
 import org.opendaylight.controller.cluster.notifications.RegisterRoleChangeListener;
 import org.opendaylight.controller.cluster.notifications.RegisterRoleChangeListenerReply;
 import org.opendaylight.controller.cluster.raft.RaftActorContext;
 import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
-import org.opendaylight.controller.cluster.raft.Snapshot;
 import org.opendaylight.controller.cluster.raft.base.messages.ApplySnapshot;
 import org.opendaylight.controller.cluster.raft.base.messages.ElectionTimeout;
 import org.opendaylight.controller.cluster.raft.base.messages.FollowerInitialSyncUpStatus;
@@ -97,6 +96,7 @@ import org.opendaylight.controller.cluster.raft.messages.RequestVote;
 import org.opendaylight.controller.cluster.raft.messages.ServerRemoved;
 import org.opendaylight.controller.cluster.raft.persisted.ApplyJournalEntries;
 import org.opendaylight.controller.cluster.raft.persisted.SimpleReplicatedLogEntry;
+import org.opendaylight.controller.cluster.raft.persisted.Snapshot;
 import org.opendaylight.controller.cluster.raft.policy.DisableElectionsRaftPolicy;
 import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload;
 import org.opendaylight.controller.cluster.raft.utils.InMemoryJournal;
@@ -442,8 +442,9 @@ public class ShardTest extends AbstractShardTest {
         final YangInstanceIdentifier root = YangInstanceIdentifier.EMPTY;
         final NormalizedNode<?,?> expected = readStore(store, root);
 
-        final Snapshot snapshot = Snapshot.create(new MetadataShardDataTreeSnapshot(expected).serialize(),
-                Collections.<ReplicatedLogEntry>emptyList(), 1, 2, 3, 4);
+        final Snapshot snapshot = Snapshot.create(
+                new ShardSnapshotState(new MetadataShardDataTreeSnapshot(expected)),
+                Collections.<ReplicatedLogEntry>emptyList(), 1, 2, 3, 4, -1, null, null);
 
         shard.tell(new ApplySnapshot(snapshot), ActorRef.noSender());
 
@@ -1975,8 +1976,8 @@ public class ShardTest extends AbstractShardTest {
 
             private void verifySnapshot(final Snapshot snapshot, final NormalizedNode<?, ?> expectedRoot)
                     throws IOException {
-                final NormalizedNode<?, ?> actual = ShardDataTreeSnapshot.deserialize(snapshot.getState()).getRootNode()
-                        .get();
+                final NormalizedNode<?, ?> actual = ((ShardSnapshotState)snapshot.getState()).getSnapshot()
+                        .getRootNode().get();
                 assertEquals("Root node", expectedRoot, actual);
             }
         };
index 65a8ac9ce8605c1c19493fab000c865696ec8725..5cf81834d8d40c4ec2c5ca9cac78cb987492d025 100644 (file)
@@ -9,16 +9,17 @@ package org.opendaylight.controller.cluster.datastore.actors;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertTrue;
 
 import akka.actor.ActorRef;
 import akka.testkit.JavaTestKit;
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
 import java.util.Optional;
 import org.junit.Test;
 import org.opendaylight.controller.cluster.datastore.AbstractActorTest;
 import org.opendaylight.controller.cluster.datastore.persisted.MetadataShardDataTreeSnapshot;
-import org.opendaylight.controller.cluster.datastore.persisted.PreBoronShardDataTreeSnapshot;
 import org.opendaylight.controller.cluster.datastore.persisted.ShardDataTreeSnapshot;
+import org.opendaylight.controller.cluster.datastore.persisted.ShardSnapshotState;
 import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshotReply;
 import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
@@ -27,41 +28,42 @@ import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
 public class ShardSnapshotActorTest extends AbstractActorTest {
     private static final NormalizedNode<?, ?> DATA = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
 
-    private static void testSerializeSnapshot(final String testName, final ShardDataTreeSnapshot snapshot)
-            throws Exception {
+    private static void testSerializeSnapshot(final String testName, final ShardDataTreeSnapshot snapshot,
+            final boolean withInstallSnapshot) throws Exception {
         new JavaTestKit(getSystem()) {
             {
-
                 final ActorRef snapshotActor = getSystem().actorOf(ShardSnapshotActor.props(), testName);
                 watch(snapshotActor);
 
                 final NormalizedNode<?, ?> expectedRoot = snapshot.getRootNode().get();
 
-                ShardSnapshotActor.requestSnapshot(snapshotActor, snapshot, getRef());
+                ByteArrayOutputStream installSnapshotStream = withInstallSnapshot ? new ByteArrayOutputStream() : null;
+                ShardSnapshotActor.requestSnapshot(snapshotActor, snapshot,
+                        Optional.ofNullable(installSnapshotStream), getRef());
 
                 final CaptureSnapshotReply reply = expectMsgClass(duration("3 seconds"), CaptureSnapshotReply.class);
-                assertNotNull("getSnapshot is null", reply.getSnapshot());
-
-                final ShardDataTreeSnapshot actual = ShardDataTreeSnapshot.deserialize(reply.getSnapshot());
-                assertNotNull(actual);
-                assertEquals(snapshot.getClass(), actual.getClass());
+                assertNotNull("getSnapshotState is null", reply.getSnapshotState());
+                assertEquals("SnapshotState type", ShardSnapshotState.class, reply.getSnapshotState().getClass());
+                assertEquals("Snapshot", snapshot, ((ShardSnapshotState)reply.getSnapshotState()).getSnapshot());
 
-                final Optional<NormalizedNode<?, ?>> maybeNode = actual.getRootNode();
-                assertTrue(maybeNode.isPresent());
+                if (installSnapshotStream != null) {
+                    final ShardDataTreeSnapshot deserialized = ShardDataTreeSnapshot.deserialize(
+                            new ByteArrayInputStream(installSnapshotStream.toByteArray()));
+                    assertEquals("Deserialized snapshot type", snapshot.getClass(), deserialized.getClass());
 
-                assertEquals("Root node", expectedRoot, maybeNode.get());
+                    final Optional<NormalizedNode<?, ?>> maybeNode = deserialized.getRootNode();
+                    assertEquals("isPresent", true, maybeNode.isPresent());
+                    assertEquals("Root node", expectedRoot, maybeNode.get());
+                }
             }
         };
     }
 
     @Test
     public void testSerializeBoronSnapshot() throws Exception {
-        testSerializeSnapshot("testSerializeBoronSnapshot", new MetadataShardDataTreeSnapshot(DATA));
-    }
-
-    @Deprecated
-    @Test
-    public void testSerializeLegacySnapshot() throws Exception {
-        testSerializeSnapshot("testSerializeLegacySnapshot", new PreBoronShardDataTreeSnapshot(DATA));
+        testSerializeSnapshot("testSerializeBoronSnapshotWithInstallSnapshot",
+                new MetadataShardDataTreeSnapshot(DATA), true);
+        testSerializeSnapshot("testSerializeBoronSnapshotWithoutInstallSnapshot",
+                new MetadataShardDataTreeSnapshot(DATA), false);
     }
 }
index 7be91313acbea3785cd32e3bc99c3ccc7b256514..e909e79a79cace27bc657465ee10cbb5bb7637a2 100644 (file)
@@ -10,6 +10,8 @@ package org.opendaylight.controller.cluster.datastore.persisted;
 import static org.junit.Assert.assertEquals;
 
 import com.google.common.collect.ImmutableMap;
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
 import java.io.Externalizable;
 import java.io.IOException;
 import java.io.ObjectInput;
@@ -37,9 +39,11 @@ public class ShardDataTreeSnapshotTest {
                 .withChild(ImmutableNodes.leafNode(TestModel.DESC_QNAME, "foo")).build();
 
         MetadataShardDataTreeSnapshot snapshot = new MetadataShardDataTreeSnapshot(expectedNode);
-        byte[] serialized = snapshot.serialize();
+        ByteArrayOutputStream bos = new ByteArrayOutputStream();
+        snapshot.serialize(bos);
 
-        ShardDataTreeSnapshot deserialized = ShardDataTreeSnapshot.deserialize(serialized);
+        ShardDataTreeSnapshot deserialized = ShardDataTreeSnapshot.deserialize(
+                new ByteArrayInputStream(bos.toByteArray()));
 
         Optional<NormalizedNode<?, ?>> actualNode = deserialized.getRootNode();
         assertEquals("rootNode present", true, actualNode.isPresent());
@@ -57,9 +61,11 @@ public class ShardDataTreeSnapshotTest {
         Map<Class<? extends ShardDataTreeSnapshotMetadata<?>>, ShardDataTreeSnapshotMetadata<?>> expMetadata =
                 ImmutableMap.of(TestShardDataTreeSnapshotMetadata.class, new TestShardDataTreeSnapshotMetadata("test"));
         MetadataShardDataTreeSnapshot snapshot = new MetadataShardDataTreeSnapshot(expectedNode, expMetadata);
-        byte[] serialized = snapshot.serialize();
+        ByteArrayOutputStream bos = new ByteArrayOutputStream();
+        snapshot.serialize(bos);
 
-        ShardDataTreeSnapshot deserialized = ShardDataTreeSnapshot.deserialize(serialized);
+        ShardDataTreeSnapshot deserialized = ShardDataTreeSnapshot.deserialize(
+                new ByteArrayInputStream(bos.toByteArray()));
 
         Optional<NormalizedNode<?, ?>> actualNode = deserialized.getRootNode();
         assertEquals("rootNode present", true, actualNode.isPresent());
@@ -69,15 +75,17 @@ public class ShardDataTreeSnapshotTest {
     }
 
     @Test
+    @Deprecated
     public void testPreBoronShardDataTreeSnapshot() throws Exception {
         NormalizedNode<?, ?> expectedNode = ImmutableContainerNodeBuilder.create()
                 .withNodeIdentifier(new YangInstanceIdentifier.NodeIdentifier(TestModel.TEST_QNAME))
                 .withChild(ImmutableNodes.leafNode(TestModel.DESC_QNAME, "foo")).build();
 
         PreBoronShardDataTreeSnapshot snapshot = new PreBoronShardDataTreeSnapshot(expectedNode);
-        byte[] serialized = snapshot.serialize();
+        ByteArrayOutputStream bos = new ByteArrayOutputStream();
+        snapshot.serialize(bos);
 
-        ShardDataTreeSnapshot deserialized = ShardDataTreeSnapshot.deserialize(serialized);
+        ShardDataTreeSnapshot deserialized = ShardDataTreeSnapshot.deserialize(bos.toByteArray());
 
         Optional<NormalizedNode<?, ?>> actualNode = deserialized.getRootNode();
         assertEquals("rootNode present", true, actualNode.isPresent());
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/persisted/ShardSnapshotStateTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/persisted/ShardSnapshotStateTest.java
new file mode 100644 (file)
index 0000000..82b9f45
--- /dev/null
@@ -0,0 +1,42 @@
+/*
+ * Copyright (c) 2017 Brocade Communications Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.datastore.persisted;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+
+import org.apache.commons.lang.SerializationUtils;
+import org.junit.Test;
+import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
+import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
+import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
+import org.opendaylight.yangtools.yang.data.impl.schema.builder.impl.ImmutableContainerNodeBuilder;
+
+/**
+ * Unit tests for ShardSnapshotState.
+ *
+ * @author Thomas Pantelis
+ */
+public class ShardSnapshotStateTest {
+
+    @Test
+    public void testSerialization() {
+        NormalizedNode<?, ?> expectedNode = ImmutableContainerNodeBuilder.create()
+                .withNodeIdentifier(new YangInstanceIdentifier.NodeIdentifier(TestModel.TEST_QNAME))
+                .withChild(ImmutableNodes.leafNode(TestModel.DESC_QNAME, "foo")).build();
+
+        ShardSnapshotState expected = new ShardSnapshotState(new MetadataShardDataTreeSnapshot(expectedNode));
+        ShardSnapshotState cloned = (ShardSnapshotState) SerializationUtils.clone(expected);
+
+        assertNotNull("getSnapshot is null", cloned.getSnapshot());
+        assertEquals("getSnapshot type", MetadataShardDataTreeSnapshot.class, cloned.getSnapshot().getClass());
+        assertEquals("getRootNode", expectedNode,
+                ((MetadataShardDataTreeSnapshot)cloned.getSnapshot()).getRootNode().get());
+    }
+}