Bug 4564: Implement GetSnapshot message in RaftActor 85/29085/10
authorTom Pantelis <tpanteli@brocade.com>
Fri, 30 Oct 2015 21:42:39 +0000 (17:42 -0400)
committerTom Pantelis <tpanteli@brocade.com>
Mon, 9 Nov 2015 03:22:07 +0000 (22:22 -0500)
Added a new client message, GetSnapshot, to return a serialized Snapshot
instance. The implementation just captures the snapshot for return and does
not persist it. If data persistence isn't enabled, it does not initiate a
capture and returns a serialized Snapshot instance containing just the
persistable state, eg election term info.

Change-Id: I9ea7fc8e0e60c4d6874f5eb0188543e1d9b51243
Signed-off-by: Tom Pantelis <tpanteli@brocade.com>
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/GetSnapshotReplyActor.java [new file with mode: 0644]
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/ImmutableElectionTerm.java [new file with mode: 0644]
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActor.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActorSnapshotMessageSupport.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/SnapshotManager.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/client/messages/GetSnapshot.java [new file with mode: 0644]
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/client/messages/GetSnapshotReply.java [new file with mode: 0644]
opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/MockRaftActor.java
opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/RaftActorSnapshotMessageSupportTest.java
opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/RaftActorTest.java

diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/GetSnapshotReplyActor.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/GetSnapshotReplyActor.java
new file mode 100644 (file)
index 0000000..ca09823
--- /dev/null
@@ -0,0 +1,87 @@
+/*
+ * Copyright (c) 2015 Brocade Communications Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.raft;
+
+import akka.actor.ActorRef;
+import akka.actor.PoisonPill;
+import akka.actor.Props;
+import akka.actor.ReceiveTimeout;
+import akka.actor.UntypedActor;
+import com.google.common.base.Preconditions;
+import java.util.concurrent.TimeoutException;
+import org.apache.commons.lang3.SerializationUtils;
+import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshot;
+import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshotReply;
+import org.opendaylight.controller.cluster.raft.client.messages.GetSnapshotReply;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import scala.concurrent.duration.Duration;
+
+/**
+ * Temporary actor used to receive a CaptureSnapshotReply message and return a GetSnapshotReply instance.
+ *
+ * @author Thomas Pantelis
+ */
+class GetSnapshotReplyActor extends UntypedActor {
+    private static final Logger LOG = LoggerFactory.getLogger(GetSnapshotReplyActor.class);
+
+    private final Params params;
+
+    private GetSnapshotReplyActor(Params params) {
+        this.params = params;
+
+        getContext().setReceiveTimeout(params.receiveTimeout);
+    }
+
+    @Override
+    public void onReceive(Object message) {
+        if(message instanceof CaptureSnapshotReply) {
+            Snapshot snapshot = Snapshot.create(((CaptureSnapshotReply)message).getSnapshot(),
+                    params.captureSnapshot.getUnAppliedEntries(),
+                    params.captureSnapshot.getLastIndex(), params.captureSnapshot.getLastTerm(),
+                    params.captureSnapshot.getLastAppliedIndex(), params.captureSnapshot.getLastAppliedTerm(),
+                    params.electionTerm.getCurrentTerm(), params.electionTerm.getVotedFor());
+
+            LOG.debug("{}: Received CaptureSnapshotReply, sending {}", params.id, snapshot);
+
+            params.replyToActor.tell(new GetSnapshotReply(params.id, SerializationUtils.serialize(snapshot)), getSelf());
+            getSelf().tell(PoisonPill.getInstance(), getSelf());
+        } else if (message instanceof ReceiveTimeout) {
+            LOG.warn("{}: Got ReceiveTimeout for inactivity - did not receive CaptureSnapshotReply within {} ms",
+                    params.id, params.receiveTimeout.toMillis());
+
+            params.replyToActor.tell(new akka.actor.Status.Failure(new TimeoutException(String.format(
+                    "Timed out after %d ms while waiting for CaptureSnapshotReply",
+                        params.receiveTimeout.toMillis()))), getSelf());
+            getSelf().tell(PoisonPill.getInstance(), getSelf());
+        }
+    }
+
+    public static Props props(CaptureSnapshot captureSnapshot, ElectionTerm electionTerm, ActorRef replyToActor,
+            Duration receiveTimeout, String id) {
+        return Props.create(GetSnapshotReplyActor.class, new Params(captureSnapshot, electionTerm, replyToActor,
+                receiveTimeout, id));
+    }
+
+    private static final class Params {
+        final CaptureSnapshot captureSnapshot;
+        final ActorRef replyToActor;
+        final ElectionTerm electionTerm;
+        final Duration receiveTimeout;
+        final String id;
+
+        Params(CaptureSnapshot captureSnapshot, ElectionTerm electionTerm, ActorRef replyToActor,
+                Duration receiveTimeout, String id) {
+            this.captureSnapshot = Preconditions.checkNotNull(captureSnapshot);
+            this.electionTerm = Preconditions.checkNotNull(electionTerm);
+            this.replyToActor = Preconditions.checkNotNull(replyToActor);
+            this.receiveTimeout = Preconditions.checkNotNull(receiveTimeout);
+            this.id = Preconditions.checkNotNull(id);
+        }
+    }
+}
diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/ImmutableElectionTerm.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/ImmutableElectionTerm.java
new file mode 100644 (file)
index 0000000..2760b48
--- /dev/null
@@ -0,0 +1,52 @@
+/*
+ * Copyright (c) 2015 Brocade Communications Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.raft;
+
+/**
+ * Immutable implementation of ElectionTerm.
+ *
+ * @author Thomas Pantelis
+ */
+public class ImmutableElectionTerm implements ElectionTerm {
+    private final long currentTerm;
+    private final String votedFor;
+
+    private ImmutableElectionTerm(long currentTerm, String votedFor) {
+        this.currentTerm = currentTerm;
+        this.votedFor = votedFor;
+    }
+
+    @Override
+    public long getCurrentTerm() {
+        return currentTerm;
+    }
+
+    @Override
+    public String getVotedFor() {
+        return votedFor;
+    }
+
+    @Override
+    public void update(long currentTerm, String votedFor) {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public void updateAndPersist(long currentTerm, String votedFor) {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public String toString() {
+        return "ImmutableElectionTerm [currentTerm=" + currentTerm + ", votedFor=" + votedFor + "]";
+    }
+
+    public static ElectionTerm copyOf(ElectionTerm from) {
+        return new ImmutableElectionTerm(from.getCurrentTerm(), from.getVotedFor());
+    }
+}
index 4eb9ad8..5f6f3ec 100644 (file)
@@ -250,7 +250,7 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
             captureSnapshot();
         } else if(message instanceof SwitchBehavior){
             switchBehavior(((SwitchBehavior) message));
-        } else if(!snapshotSupport.handleSnapshotMessage(message)) {
+        } else if(!snapshotSupport.handleSnapshotMessage(message, getSender())) {
             switchBehavior(reusableSwitchBehaviorSupplier.handleMessage(getSender(), message));
         }
     }
index bf0fc10..ec46f30 100644 (file)
@@ -7,13 +7,22 @@
  */
 package org.opendaylight.controller.cluster.raft;
 
+import akka.actor.ActorRef;
 import akka.japi.Procedure;
 import akka.persistence.SaveSnapshotFailure;
 import akka.persistence.SaveSnapshotSuccess;
+import com.google.common.annotations.VisibleForTesting;
+import java.util.Collections;
+import java.util.concurrent.TimeUnit;
+import org.apache.commons.lang3.SerializationUtils;
 import org.opendaylight.controller.cluster.raft.base.messages.ApplySnapshot;
+import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshot;
 import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshotReply;
 import org.opendaylight.controller.cluster.raft.behaviors.RaftActorBehavior;
+import org.opendaylight.controller.cluster.raft.client.messages.GetSnapshot;
+import org.opendaylight.controller.cluster.raft.client.messages.GetSnapshotReply;
 import org.slf4j.Logger;
+import scala.concurrent.duration.Duration;
 
 /**
  * Handles snapshot related messages for a RaftActor.
@@ -42,6 +51,8 @@ class RaftActorSnapshotMessageSupport {
         }
     };
 
+    private Duration snapshotReplyActorTimeout = Duration.create(30, TimeUnit.SECONDS);
+
     RaftActorSnapshotMessageSupport(RaftActorContext context, RaftActorBehavior currentBehavior,
             RaftActorSnapshotCohort cohort) {
         this.context = context;
@@ -53,7 +64,7 @@ class RaftActorSnapshotMessageSupport {
         context.getSnapshotManager().setApplySnapshotProcedure(applySnapshotProcedure);
     }
 
-    boolean handleSnapshotMessage(Object message) {
+    boolean handleSnapshotMessage(Object message, ActorRef sender) {
         if(message instanceof ApplySnapshot ) {
             onApplySnapshot((ApplySnapshot) message);
             return true;
@@ -69,6 +80,9 @@ class RaftActorSnapshotMessageSupport {
         } else if (message.equals(COMMIT_SNAPSHOT)) {
             context.getSnapshotManager().commit(-1, currentBehavior);
             return true;
+        } else if (message instanceof GetSnapshot) {
+            onGetSnapshot(sender);
+            return true;
         } else {
             return false;
         }
@@ -101,4 +115,30 @@ class RaftActorSnapshotMessageSupport {
 
         context.getSnapshotManager().apply(message);
     }
+
+    private void onGetSnapshot(ActorRef sender) {
+        log.debug("{}: onGetSnapshot", context.getId());
+
+        if(context.getPersistenceProvider().isRecoveryApplicable()) {
+            CaptureSnapshot captureSnapshot = context.getSnapshotManager().newCaptureSnapshot(
+                    context.getReplicatedLog().last(), -1, false);
+
+            ActorRef snapshotReplyActor = context.actorOf(GetSnapshotReplyActor.props(captureSnapshot,
+                    ImmutableElectionTerm.copyOf(context.getTermInformation()), sender,
+                    snapshotReplyActorTimeout, context.getId()));
+
+            cohort.createSnapshot(snapshotReplyActor);
+        } else {
+            Snapshot snapshot = Snapshot.create(new byte[0], Collections.<ReplicatedLogEntry>emptyList(), -1, -1, -1, -1,
+                    context.getTermInformation().getCurrentTerm(), context.getTermInformation().getVotedFor());
+
+            sender.tell(new GetSnapshotReply(context.getId(), SerializationUtils.serialize(snapshot)),
+                    context.getActor());
+        }
+    }
+
+    @VisibleForTesting
+    void setSnapshotReplyActorTimeout(Duration snapshotReplyActorTimeout) {
+        this.snapshotReplyActorTimeout = snapshotReplyActorTimeout;
+    }
 }
index 4dbe9ee..9571173 100644 (file)
@@ -116,6 +116,37 @@ public class SnapshotManager implements SnapshotState {
         return context.getId();
     }
 
+    public CaptureSnapshot newCaptureSnapshot(ReplicatedLogEntry lastLogEntry, long replicatedToAllIndex,
+            boolean installSnapshotInitiated) {
+        TermInformationReader lastAppliedTermInfoReader =
+                lastAppliedTermInformationReader.init(context.getReplicatedLog(), context.getLastApplied(),
+                        lastLogEntry, hasFollowers());
+
+        long lastAppliedIndex = lastAppliedTermInfoReader.getIndex();
+        long lastAppliedTerm = lastAppliedTermInfoReader.getTerm();
+
+        TermInformationReader replicatedToAllTermInfoReader =
+                replicatedToAllTermInformationReader.init(context.getReplicatedLog(), replicatedToAllIndex);
+
+        long newReplicatedToAllIndex = replicatedToAllTermInfoReader.getIndex();
+        long newReplicatedToAllTerm = replicatedToAllTermInfoReader.getTerm();
+
+        List<ReplicatedLogEntry> unAppliedEntries = context.getReplicatedLog().getFrom(lastAppliedIndex + 1);
+
+        long lastLogEntryIndex = lastAppliedIndex;
+        long lastLogEntryTerm = lastAppliedTerm;
+        if(lastLogEntry != null) {
+            lastLogEntryIndex = lastLogEntry.getIndex();
+            lastLogEntryTerm = lastLogEntry.getTerm();
+        } else {
+            LOG.warn("Capturing Snapshot : lastLogEntry is null. Using lastAppliedIndex {} and lastAppliedTerm {} instead.",
+                lastAppliedIndex, lastAppliedTerm);
+        }
+
+        return new CaptureSnapshot(lastLogEntryIndex, lastLogEntryTerm, lastAppliedIndex, lastAppliedTerm,
+                newReplicatedToAllIndex, newReplicatedToAllTerm, unAppliedEntries, installSnapshotInitiated);
+    }
+
     private class AbstractSnapshotState implements SnapshotState {
 
         @Override
@@ -200,36 +231,7 @@ public class SnapshotManager implements SnapshotState {
         }
 
         private boolean capture(ReplicatedLogEntry lastLogEntry, long replicatedToAllIndex, String targetFollower) {
-            TermInformationReader lastAppliedTermInfoReader =
-                    lastAppliedTermInformationReader.init(context.getReplicatedLog(), context.getLastApplied(),
-                            lastLogEntry, hasFollowers());
-
-            long lastAppliedIndex = lastAppliedTermInfoReader.getIndex();
-            long lastAppliedTerm = lastAppliedTermInfoReader.getTerm();
-
-            TermInformationReader replicatedToAllTermInfoReader =
-                    replicatedToAllTermInformationReader.init(context.getReplicatedLog(), replicatedToAllIndex);
-
-            long newReplicatedToAllIndex = replicatedToAllTermInfoReader.getIndex();
-            long newReplicatedToAllTerm = replicatedToAllTermInfoReader.getTerm();
-
-            // send a CaptureSnapshot to self to make the expensive operation async.
-
-            List<ReplicatedLogEntry> unAppliedEntries = context.getReplicatedLog().getFrom(lastAppliedIndex + 1);
-
-            long lastLogEntryIndex = lastAppliedIndex;
-            long lastLogEntryTerm = lastAppliedTerm;
-            if(lastLogEntry != null) {
-                lastLogEntryIndex = lastLogEntry.getIndex();
-                lastLogEntryTerm = lastLogEntry.getTerm();
-            } else {
-                LOG.warn("Capturing Snapshot : lastLogEntry is null. Using lastAppliedIndex {} and lastAppliedTerm {} instead.",
-                    lastAppliedIndex, lastAppliedTerm);
-            }
-
-            captureSnapshot = new CaptureSnapshot(lastLogEntryIndex,
-                lastLogEntryTerm, lastAppliedIndex, lastAppliedTerm,
-                    newReplicatedToAllIndex, newReplicatedToAllTerm, unAppliedEntries, targetFollower != null);
+            captureSnapshot = newCaptureSnapshot(lastLogEntry, replicatedToAllIndex, targetFollower != null);
 
             if(captureSnapshot.isInstallSnapshotInitiated()) {
                 LOG.info("{}: Initiating snapshot capture {} to install on {}",
diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/client/messages/GetSnapshot.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/client/messages/GetSnapshot.java
new file mode 100644 (file)
index 0000000..f1d05bf
--- /dev/null
@@ -0,0 +1,21 @@
+/*
+ * Copyright (c) 2015 Brocade Communications Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.raft.client.messages;
+
+/**
+ * Internal client message to get a snapshot of the current state based on whether or not persistence is
+ * enabled. Returns a GetSnapshotReply instance.
+ *
+ * @author Thomas Pantelis
+ */
+public class GetSnapshot {
+    public static final GetSnapshot INSTANCE = new GetSnapshot();
+
+    private GetSnapshot() {
+    }
+}
diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/client/messages/GetSnapshotReply.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/client/messages/GetSnapshotReply.java
new file mode 100644 (file)
index 0000000..f3521de
--- /dev/null
@@ -0,0 +1,41 @@
+/*
+ * Copyright (c) 2015 Brocade Communications Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.raft.client.messages;
+
+import com.google.common.base.Preconditions;
+import javax.annotation.Nonnull;
+
+/**
+ * Reply to GetSnapshot that returns a serialized Snapshot instance.
+ *
+ * @author Thomas Pantelis
+ */
+public class GetSnapshotReply {
+    private final String id;
+    private final byte[] snapshot;
+
+    public GetSnapshotReply(@Nonnull String id, @Nonnull byte[] snapshot) {
+        this.id = Preconditions.checkNotNull(id);
+        this.snapshot = Preconditions.checkNotNull(snapshot);
+    }
+
+    @Nonnull
+    public String getId() {
+        return id;
+    }
+
+    @Nonnull
+    public byte[] getSnapshot() {
+        return snapshot;
+    }
+
+    @Override
+    public String toString() {
+        return "GetSnapshotReply [id=" + id + ", snapshot.length=" + snapshot.length + "]";
+    }
+}
index bb39ed9..f56638b 100644 (file)
@@ -34,7 +34,7 @@ public class MockRaftActor extends RaftActor implements RaftActorRecoveryCohort,
 
     final RaftActor actorDelegate;
     final RaftActorRecoveryCohort recoveryCohortDelegate;
-    final RaftActorSnapshotCohort snapshotCohortDelegate;
+    volatile RaftActorSnapshotCohort snapshotCohortDelegate;
     private final CountDownLatch recoveryComplete = new CountDownLatch(1);
     private final List<Object> state;
     private ActorRef roleChangeNotifier;
@@ -96,7 +96,12 @@ public class MockRaftActor extends RaftActor implements RaftActorRecoveryCohort,
 
     @Override
     protected RaftActorSnapshotMessageSupport newRaftActorSnapshotMessageSupport() {
-        return snapshotMessageSupport != null ? snapshotMessageSupport : super.newRaftActorSnapshotMessageSupport();
+        return snapshotMessageSupport != null ? snapshotMessageSupport :
+            (snapshotMessageSupport = super.newRaftActorSnapshotMessageSupport());
+    }
+
+    public RaftActorSnapshotMessageSupport getSnapshotMessageSupport() {
+        return snapshotMessageSupport;
     }
 
     public void waitForRecoveryComplete() {
index d79a483..94000d0 100644 (file)
@@ -83,7 +83,7 @@ public class RaftActorSnapshotMessageSupportTest {
     }
 
     private void sendMessageToSupport(Object message, boolean expHandled) {
-        boolean handled = support.handleSnapshotMessage(message);
+        boolean handled = support.handleSnapshotMessage(message, mockRaftActorRef);
         assertEquals("complete", expHandled, handled);
     }
 
index 4b43095..941deb5 100644 (file)
@@ -8,6 +8,7 @@
 
 package org.opendaylight.controller.cluster.raft;
 
+import static org.junit.Assert.assertArrayEquals;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertNotSame;
@@ -20,11 +21,14 @@ import static org.mockito.Matchers.eq;
 import static org.mockito.Matchers.same;
 import static org.mockito.Mockito.doReturn;
 import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.reset;
 import static org.mockito.Mockito.timeout;
+import static org.mockito.Mockito.verify;
 import akka.actor.ActorRef;
 import akka.actor.PoisonPill;
 import akka.actor.Props;
+import akka.actor.Status.Failure;
 import akka.actor.Terminated;
 import akka.dispatch.Dispatchers;
 import akka.japi.Procedure;
@@ -47,9 +51,12 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import org.apache.commons.lang3.SerializationUtils;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
+import org.mockito.ArgumentCaptor;
 import org.opendaylight.controller.cluster.DataPersistenceProvider;
 import org.opendaylight.controller.cluster.NonPersistentDataProvider;
 import org.opendaylight.controller.cluster.PersistentDataProvider;
@@ -68,13 +75,17 @@ import org.opendaylight.controller.cluster.raft.base.messages.UpdateElectionTerm
 import org.opendaylight.controller.cluster.raft.behaviors.Follower;
 import org.opendaylight.controller.cluster.raft.behaviors.Leader;
 import org.opendaylight.controller.cluster.raft.behaviors.RaftActorBehavior;
+import org.opendaylight.controller.cluster.raft.client.messages.GetSnapshot;
+import org.opendaylight.controller.cluster.raft.client.messages.GetSnapshotReply;
 import org.opendaylight.controller.cluster.raft.messages.AppendEntries;
 import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply;
+import org.opendaylight.controller.cluster.raft.policy.DisableElectionsRaftPolicy;
 import org.opendaylight.controller.cluster.raft.utils.InMemoryJournal;
 import org.opendaylight.controller.cluster.raft.utils.InMemorySnapshotStore;
 import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+import scala.concurrent.duration.Duration;
 import scala.concurrent.duration.FiniteDuration;
 
 public class RaftActorTest extends AbstractActorTest {
@@ -340,34 +351,40 @@ public class RaftActorTest extends AbstractActorTest {
         mockRaftActor.waitForRecoveryComplete();
 
         ApplySnapshot applySnapshot = new ApplySnapshot(mock(Snapshot.class));
-        doReturn(true).when(mockSupport).handleSnapshotMessage(same(applySnapshot));
+        doReturn(true).when(mockSupport).handleSnapshotMessage(same(applySnapshot), any(ActorRef.class));
         mockRaftActor.handleCommand(applySnapshot);
 
         CaptureSnapshot captureSnapshot = new CaptureSnapshot(1, 1, 1, 1, 0, 1, null);
-        doReturn(true).when(mockSupport).handleSnapshotMessage(same(captureSnapshot));
+        doReturn(true).when(mockSupport).handleSnapshotMessage(same(captureSnapshot), any(ActorRef.class));
         mockRaftActor.handleCommand(captureSnapshot);
 
         CaptureSnapshotReply captureSnapshotReply = new CaptureSnapshotReply(new byte[0]);
-        doReturn(true).when(mockSupport).handleSnapshotMessage(same(captureSnapshotReply));
+        doReturn(true).when(mockSupport).handleSnapshotMessage(same(captureSnapshotReply), any(ActorRef.class));
         mockRaftActor.handleCommand(captureSnapshotReply);
 
         SaveSnapshotSuccess saveSnapshotSuccess = new SaveSnapshotSuccess(mock(SnapshotMetadata.class));
-        doReturn(true).when(mockSupport).handleSnapshotMessage(same(saveSnapshotSuccess));
+        doReturn(true).when(mockSupport).handleSnapshotMessage(same(saveSnapshotSuccess), any(ActorRef.class));
         mockRaftActor.handleCommand(saveSnapshotSuccess);
 
         SaveSnapshotFailure saveSnapshotFailure = new SaveSnapshotFailure(mock(SnapshotMetadata.class), new Throwable());
-        doReturn(true).when(mockSupport).handleSnapshotMessage(same(saveSnapshotFailure));
+        doReturn(true).when(mockSupport).handleSnapshotMessage(same(saveSnapshotFailure), any(ActorRef.class));
         mockRaftActor.handleCommand(saveSnapshotFailure);
 
-        doReturn(true).when(mockSupport).handleSnapshotMessage(same(RaftActorSnapshotMessageSupport.COMMIT_SNAPSHOT));
+        doReturn(true).when(mockSupport).handleSnapshotMessage(same(RaftActorSnapshotMessageSupport.COMMIT_SNAPSHOT),
+                any(ActorRef.class));
         mockRaftActor.handleCommand(RaftActorSnapshotMessageSupport.COMMIT_SNAPSHOT);
 
-        verify(mockSupport).handleSnapshotMessage(same(applySnapshot));
-        verify(mockSupport).handleSnapshotMessage(same(captureSnapshot));
-        verify(mockSupport).handleSnapshotMessage(same(captureSnapshotReply));
-        verify(mockSupport).handleSnapshotMessage(same(saveSnapshotSuccess));
-        verify(mockSupport).handleSnapshotMessage(same(saveSnapshotFailure));
-        verify(mockSupport).handleSnapshotMessage(same(RaftActorSnapshotMessageSupport.COMMIT_SNAPSHOT));
+        doReturn(true).when(mockSupport).handleSnapshotMessage(same(GetSnapshot.INSTANCE), any(ActorRef.class));
+        mockRaftActor.handleCommand(GetSnapshot.INSTANCE);
+
+        verify(mockSupport).handleSnapshotMessage(same(applySnapshot), any(ActorRef.class));
+        verify(mockSupport).handleSnapshotMessage(same(captureSnapshot), any(ActorRef.class));
+        verify(mockSupport).handleSnapshotMessage(same(captureSnapshotReply), any(ActorRef.class));
+        verify(mockSupport).handleSnapshotMessage(same(saveSnapshotSuccess), any(ActorRef.class));
+        verify(mockSupport).handleSnapshotMessage(same(saveSnapshotFailure), any(ActorRef.class));
+        verify(mockSupport).handleSnapshotMessage(same(RaftActorSnapshotMessageSupport.COMMIT_SNAPSHOT),
+                any(ActorRef.class));
+        verify(mockSupport).handleSnapshotMessage(same(GetSnapshot.INSTANCE), any(ActorRef.class));
     }
 
     @Test
@@ -1089,4 +1106,93 @@ public class RaftActorTest extends AbstractActorTest {
         assertEquals("Behavior State", RaftState.Follower,
             mockRaftActor.getCurrentBehavior().state());
     }
+
+    @Test
+    public void testGetSnapshot() throws Exception {
+        TEST_LOG.info("testGetSnapshot starting");
+
+        JavaTestKit kit = new JavaTestKit(getSystem());
+
+        String persistenceId = factory.generateActorId("test-actor-");
+        DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
+        config.setCustomRaftPolicyImplementationClass(DisableElectionsRaftPolicy.class.getName());
+
+        long term = 3;
+        long seqN = 1;
+        InMemoryJournal.addEntry(persistenceId, seqN++, new UpdateElectionTerm(term, "member-1"));
+        InMemoryJournal.addEntry(persistenceId, seqN++, new MockRaftActorContext.MockReplicatedLogEntry(term, 0,
+                new MockRaftActorContext.MockPayload("A")));
+        InMemoryJournal.addEntry(persistenceId, seqN++, new MockRaftActorContext.MockReplicatedLogEntry(term, 1,
+                new MockRaftActorContext.MockPayload("B")));
+        InMemoryJournal.addEntry(persistenceId, seqN++, new ApplyJournalEntries(1));
+        InMemoryJournal.addEntry(persistenceId, seqN++, new MockRaftActorContext.MockReplicatedLogEntry(term, 2,
+                new MockRaftActorContext.MockPayload("C")));
+
+        TestActorRef<MockRaftActor> raftActorRef = factory.createTestActor(MockRaftActor.props(persistenceId,
+                ImmutableMap.<String, String>builder().put("member1", "address").build(), Optional.<ConfigParams>of(config)).
+                    withDispatcher(Dispatchers.DefaultDispatcherId()), persistenceId);
+        MockRaftActor mockRaftActor = raftActorRef.underlyingActor();
+
+        mockRaftActor.waitForRecoveryComplete();
+
+        // Wait for snapshot after recovery
+        verify(mockRaftActor.snapshotCohortDelegate, timeout(5000)).createSnapshot(any(ActorRef.class));
+
+        mockRaftActor.snapshotCohortDelegate = mock(RaftActorSnapshotCohort.class);
+
+        raftActorRef.tell(GetSnapshot.INSTANCE, kit.getRef());
+
+        ArgumentCaptor<ActorRef> replyActor = ArgumentCaptor.forClass(ActorRef.class);
+        verify(mockRaftActor.snapshotCohortDelegate, timeout(5000)).createSnapshot(replyActor.capture());
+
+        byte[] stateSnapshot = new byte[]{1,2,3};
+        replyActor.getValue().tell(new CaptureSnapshotReply(stateSnapshot), ActorRef.noSender());
+
+        GetSnapshotReply reply = kit.expectMsgClass(GetSnapshotReply.class);
+
+        assertEquals("getId", persistenceId, reply.getId());
+        Snapshot replySnapshot = SerializationUtils.deserialize(reply.getSnapshot());
+        assertEquals("getElectionTerm", term, replySnapshot.getElectionTerm());
+        assertEquals("getElectionVotedFor", "member-1", replySnapshot.getElectionVotedFor());
+        assertEquals("getLastAppliedIndex", 1L, replySnapshot.getLastAppliedIndex());
+        assertEquals("getLastAppliedTerm", term, replySnapshot.getLastAppliedTerm());
+        assertEquals("getLastIndex", 2L, replySnapshot.getLastIndex());
+        assertEquals("getLastTerm", term, replySnapshot.getLastTerm());
+        assertArrayEquals("getState", stateSnapshot, replySnapshot.getState());
+        assertEquals("getUnAppliedEntries size", 1, replySnapshot.getUnAppliedEntries().size());
+        assertEquals("UnApplied entry index ", 2L, replySnapshot.getUnAppliedEntries().get(0).getIndex());
+
+        // Test with timeout
+
+        mockRaftActor.getSnapshotMessageSupport().setSnapshotReplyActorTimeout(Duration.create(200, TimeUnit.MILLISECONDS));
+        reset(mockRaftActor.snapshotCohortDelegate);
+
+        raftActorRef.tell(GetSnapshot.INSTANCE, kit.getRef());
+        Failure failure = kit.expectMsgClass(akka.actor.Status.Failure.class);
+        assertEquals("Failure cause type", TimeoutException.class, failure.cause().getClass());
+
+        mockRaftActor.getSnapshotMessageSupport().setSnapshotReplyActorTimeout(Duration.create(30, TimeUnit.SECONDS));
+
+        // Test with persistence disabled.
+
+        mockRaftActor.setPersistence(false);
+        reset(mockRaftActor.snapshotCohortDelegate);
+
+        raftActorRef.tell(GetSnapshot.INSTANCE, kit.getRef());
+        reply = kit.expectMsgClass(GetSnapshotReply.class);
+        verify(mockRaftActor.snapshotCohortDelegate, never()).createSnapshot(any(ActorRef.class));
+
+        assertEquals("getId", persistenceId, reply.getId());
+        replySnapshot = SerializationUtils.deserialize(reply.getSnapshot());
+        assertEquals("getElectionTerm", term, replySnapshot.getElectionTerm());
+        assertEquals("getElectionVotedFor", "member-1", replySnapshot.getElectionVotedFor());
+        assertEquals("getLastAppliedIndex", -1L, replySnapshot.getLastAppliedIndex());
+        assertEquals("getLastAppliedTerm", -1L, replySnapshot.getLastAppliedTerm());
+        assertEquals("getLastIndex", -1L, replySnapshot.getLastIndex());
+        assertEquals("getLastTerm", -1L, replySnapshot.getLastTerm());
+        assertEquals("getState length", 0, replySnapshot.getState().length);
+        assertEquals("getUnAppliedEntries size", 0, replySnapshot.getUnAppliedEntries().size());
+
+        TEST_LOG.info("testGetSnapshot ending");
+    }
 }

©2013 OpenDaylight, A Linux Foundation Collaborative Project. All Rights Reserved.
OpenDaylight is a registered trademark of The OpenDaylight Project, Inc.
Linux Foundation and OpenDaylight are registered trademarks of the Linux Foundation.
Linux is a registered trademark of Linus Torvalds.