--- /dev/null
+/*
+ * Copyright (c) 2015 Brocade Communications Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.raft;
+
+import akka.actor.ActorRef;
+import akka.actor.PoisonPill;
+import akka.actor.Props;
+import akka.actor.ReceiveTimeout;
+import akka.actor.UntypedActor;
+import com.google.common.base.Preconditions;
+import java.util.concurrent.TimeoutException;
+import org.apache.commons.lang3.SerializationUtils;
+import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshot;
+import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshotReply;
+import org.opendaylight.controller.cluster.raft.client.messages.GetSnapshotReply;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import scala.concurrent.duration.Duration;
+
+/**
+ * Temporary actor used to receive a CaptureSnapshotReply message and return a GetSnapshotReply instance.
+ *
+ * @author Thomas Pantelis
+ */
+class GetSnapshotReplyActor extends UntypedActor {
+ private static final Logger LOG = LoggerFactory.getLogger(GetSnapshotReplyActor.class);
+
+ private final Params params;
+
+ private GetSnapshotReplyActor(Params params) {
+ this.params = params;
+
+ getContext().setReceiveTimeout(params.receiveTimeout);
+ }
+
+ @Override
+ public void onReceive(Object message) {
+ if(message instanceof CaptureSnapshotReply) {
+ Snapshot snapshot = Snapshot.create(((CaptureSnapshotReply)message).getSnapshot(),
+ params.captureSnapshot.getUnAppliedEntries(),
+ params.captureSnapshot.getLastIndex(), params.captureSnapshot.getLastTerm(),
+ params.captureSnapshot.getLastAppliedIndex(), params.captureSnapshot.getLastAppliedTerm(),
+ params.electionTerm.getCurrentTerm(), params.electionTerm.getVotedFor());
+
+ LOG.debug("{}: Received CaptureSnapshotReply, sending {}", params.id, snapshot);
+
+ params.replyToActor.tell(new GetSnapshotReply(params.id, SerializationUtils.serialize(snapshot)), getSelf());
+ getSelf().tell(PoisonPill.getInstance(), getSelf());
+ } else if (message instanceof ReceiveTimeout) {
+ LOG.warn("{}: Got ReceiveTimeout for inactivity - did not receive CaptureSnapshotReply within {} ms",
+ params.id, params.receiveTimeout.toMillis());
+
+ params.replyToActor.tell(new akka.actor.Status.Failure(new TimeoutException(String.format(
+ "Timed out after %d ms while waiting for CaptureSnapshotReply",
+ params.receiveTimeout.toMillis()))), getSelf());
+ getSelf().tell(PoisonPill.getInstance(), getSelf());
+ }
+ }
+
+ public static Props props(CaptureSnapshot captureSnapshot, ElectionTerm electionTerm, ActorRef replyToActor,
+ Duration receiveTimeout, String id) {
+ return Props.create(GetSnapshotReplyActor.class, new Params(captureSnapshot, electionTerm, replyToActor,
+ receiveTimeout, id));
+ }
+
+ private static final class Params {
+ final CaptureSnapshot captureSnapshot;
+ final ActorRef replyToActor;
+ final ElectionTerm electionTerm;
+ final Duration receiveTimeout;
+ final String id;
+
+ Params(CaptureSnapshot captureSnapshot, ElectionTerm electionTerm, ActorRef replyToActor,
+ Duration receiveTimeout, String id) {
+ this.captureSnapshot = Preconditions.checkNotNull(captureSnapshot);
+ this.electionTerm = Preconditions.checkNotNull(electionTerm);
+ this.replyToActor = Preconditions.checkNotNull(replyToActor);
+ this.receiveTimeout = Preconditions.checkNotNull(receiveTimeout);
+ this.id = Preconditions.checkNotNull(id);
+ }
+ }
+}
--- /dev/null
+/*
+ * Copyright (c) 2015 Brocade Communications Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.raft;
+
+/**
+ * Immutable implementation of ElectionTerm.
+ *
+ * @author Thomas Pantelis
+ */
+public class ImmutableElectionTerm implements ElectionTerm {
+ private final long currentTerm;
+ private final String votedFor;
+
+ private ImmutableElectionTerm(long currentTerm, String votedFor) {
+ this.currentTerm = currentTerm;
+ this.votedFor = votedFor;
+ }
+
+ @Override
+ public long getCurrentTerm() {
+ return currentTerm;
+ }
+
+ @Override
+ public String getVotedFor() {
+ return votedFor;
+ }
+
+ @Override
+ public void update(long currentTerm, String votedFor) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void updateAndPersist(long currentTerm, String votedFor) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public String toString() {
+ return "ImmutableElectionTerm [currentTerm=" + currentTerm + ", votedFor=" + votedFor + "]";
+ }
+
+ public static ElectionTerm copyOf(ElectionTerm from) {
+ return new ImmutableElectionTerm(from.getCurrentTerm(), from.getVotedFor());
+ }
+}
captureSnapshot();
} else if(message instanceof SwitchBehavior){
switchBehavior(((SwitchBehavior) message));
- } else if(!snapshotSupport.handleSnapshotMessage(message)) {
+ } else if(!snapshotSupport.handleSnapshotMessage(message, getSender())) {
switchBehavior(reusableSwitchBehaviorSupplier.handleMessage(getSender(), message));
}
}
*/
package org.opendaylight.controller.cluster.raft;
+import akka.actor.ActorRef;
import akka.japi.Procedure;
import akka.persistence.SaveSnapshotFailure;
import akka.persistence.SaveSnapshotSuccess;
+import com.google.common.annotations.VisibleForTesting;
+import java.util.Collections;
+import java.util.concurrent.TimeUnit;
+import org.apache.commons.lang3.SerializationUtils;
import org.opendaylight.controller.cluster.raft.base.messages.ApplySnapshot;
+import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshot;
import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshotReply;
import org.opendaylight.controller.cluster.raft.behaviors.RaftActorBehavior;
+import org.opendaylight.controller.cluster.raft.client.messages.GetSnapshot;
+import org.opendaylight.controller.cluster.raft.client.messages.GetSnapshotReply;
import org.slf4j.Logger;
+import scala.concurrent.duration.Duration;
/**
* Handles snapshot related messages for a RaftActor.
}
};
+ private Duration snapshotReplyActorTimeout = Duration.create(30, TimeUnit.SECONDS);
+
RaftActorSnapshotMessageSupport(RaftActorContext context, RaftActorBehavior currentBehavior,
RaftActorSnapshotCohort cohort) {
this.context = context;
context.getSnapshotManager().setApplySnapshotProcedure(applySnapshotProcedure);
}
- boolean handleSnapshotMessage(Object message) {
+ boolean handleSnapshotMessage(Object message, ActorRef sender) {
if(message instanceof ApplySnapshot ) {
onApplySnapshot((ApplySnapshot) message);
return true;
} else if (message.equals(COMMIT_SNAPSHOT)) {
context.getSnapshotManager().commit(-1, currentBehavior);
return true;
+ } else if (message instanceof GetSnapshot) {
+ onGetSnapshot(sender);
+ return true;
} else {
return false;
}
context.getSnapshotManager().apply(message);
}
+
+ private void onGetSnapshot(ActorRef sender) {
+ log.debug("{}: onGetSnapshot", context.getId());
+
+ if(context.getPersistenceProvider().isRecoveryApplicable()) {
+ CaptureSnapshot captureSnapshot = context.getSnapshotManager().newCaptureSnapshot(
+ context.getReplicatedLog().last(), -1, false);
+
+ ActorRef snapshotReplyActor = context.actorOf(GetSnapshotReplyActor.props(captureSnapshot,
+ ImmutableElectionTerm.copyOf(context.getTermInformation()), sender,
+ snapshotReplyActorTimeout, context.getId()));
+
+ cohort.createSnapshot(snapshotReplyActor);
+ } else {
+ Snapshot snapshot = Snapshot.create(new byte[0], Collections.<ReplicatedLogEntry>emptyList(), -1, -1, -1, -1,
+ context.getTermInformation().getCurrentTerm(), context.getTermInformation().getVotedFor());
+
+ sender.tell(new GetSnapshotReply(context.getId(), SerializationUtils.serialize(snapshot)),
+ context.getActor());
+ }
+ }
+
+ @VisibleForTesting
+ void setSnapshotReplyActorTimeout(Duration snapshotReplyActorTimeout) {
+ this.snapshotReplyActorTimeout = snapshotReplyActorTimeout;
+ }
}
return context.getId();
}
+ public CaptureSnapshot newCaptureSnapshot(ReplicatedLogEntry lastLogEntry, long replicatedToAllIndex,
+ boolean installSnapshotInitiated) {
+ TermInformationReader lastAppliedTermInfoReader =
+ lastAppliedTermInformationReader.init(context.getReplicatedLog(), context.getLastApplied(),
+ lastLogEntry, hasFollowers());
+
+ long lastAppliedIndex = lastAppliedTermInfoReader.getIndex();
+ long lastAppliedTerm = lastAppliedTermInfoReader.getTerm();
+
+ TermInformationReader replicatedToAllTermInfoReader =
+ replicatedToAllTermInformationReader.init(context.getReplicatedLog(), replicatedToAllIndex);
+
+ long newReplicatedToAllIndex = replicatedToAllTermInfoReader.getIndex();
+ long newReplicatedToAllTerm = replicatedToAllTermInfoReader.getTerm();
+
+ List<ReplicatedLogEntry> unAppliedEntries = context.getReplicatedLog().getFrom(lastAppliedIndex + 1);
+
+ long lastLogEntryIndex = lastAppliedIndex;
+ long lastLogEntryTerm = lastAppliedTerm;
+ if(lastLogEntry != null) {
+ lastLogEntryIndex = lastLogEntry.getIndex();
+ lastLogEntryTerm = lastLogEntry.getTerm();
+ } else {
+ LOG.warn("Capturing Snapshot : lastLogEntry is null. Using lastAppliedIndex {} and lastAppliedTerm {} instead.",
+ lastAppliedIndex, lastAppliedTerm);
+ }
+
+ return new CaptureSnapshot(lastLogEntryIndex, lastLogEntryTerm, lastAppliedIndex, lastAppliedTerm,
+ newReplicatedToAllIndex, newReplicatedToAllTerm, unAppliedEntries, installSnapshotInitiated);
+ }
+
private class AbstractSnapshotState implements SnapshotState {
@Override
}
private boolean capture(ReplicatedLogEntry lastLogEntry, long replicatedToAllIndex, String targetFollower) {
- TermInformationReader lastAppliedTermInfoReader =
- lastAppliedTermInformationReader.init(context.getReplicatedLog(), context.getLastApplied(),
- lastLogEntry, hasFollowers());
-
- long lastAppliedIndex = lastAppliedTermInfoReader.getIndex();
- long lastAppliedTerm = lastAppliedTermInfoReader.getTerm();
-
- TermInformationReader replicatedToAllTermInfoReader =
- replicatedToAllTermInformationReader.init(context.getReplicatedLog(), replicatedToAllIndex);
-
- long newReplicatedToAllIndex = replicatedToAllTermInfoReader.getIndex();
- long newReplicatedToAllTerm = replicatedToAllTermInfoReader.getTerm();
-
- // send a CaptureSnapshot to self to make the expensive operation async.
-
- List<ReplicatedLogEntry> unAppliedEntries = context.getReplicatedLog().getFrom(lastAppliedIndex + 1);
-
- long lastLogEntryIndex = lastAppliedIndex;
- long lastLogEntryTerm = lastAppliedTerm;
- if(lastLogEntry != null) {
- lastLogEntryIndex = lastLogEntry.getIndex();
- lastLogEntryTerm = lastLogEntry.getTerm();
- } else {
- LOG.warn("Capturing Snapshot : lastLogEntry is null. Using lastAppliedIndex {} and lastAppliedTerm {} instead.",
- lastAppliedIndex, lastAppliedTerm);
- }
-
- captureSnapshot = new CaptureSnapshot(lastLogEntryIndex,
- lastLogEntryTerm, lastAppliedIndex, lastAppliedTerm,
- newReplicatedToAllIndex, newReplicatedToAllTerm, unAppliedEntries, targetFollower != null);
+ captureSnapshot = newCaptureSnapshot(lastLogEntry, replicatedToAllIndex, targetFollower != null);
if(captureSnapshot.isInstallSnapshotInitiated()) {
LOG.info("{}: Initiating snapshot capture {} to install on {}",
--- /dev/null
+/*
+ * Copyright (c) 2015 Brocade Communications Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.raft.client.messages;
+
+/**
+ * Internal client message to get a snapshot of the current state based on whether or not persistence is
+ * enabled. Returns a GetSnapshotReply instance.
+ *
+ * @author Thomas Pantelis
+ */
+public class GetSnapshot {
+ public static final GetSnapshot INSTANCE = new GetSnapshot();
+
+ private GetSnapshot() {
+ }
+}
--- /dev/null
+/*
+ * Copyright (c) 2015 Brocade Communications Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.raft.client.messages;
+
+import com.google.common.base.Preconditions;
+import javax.annotation.Nonnull;
+
+/**
+ * Reply to GetSnapshot that returns a serialized Snapshot instance.
+ *
+ * @author Thomas Pantelis
+ */
+public class GetSnapshotReply {
+ private final String id;
+ private final byte[] snapshot;
+
+ public GetSnapshotReply(@Nonnull String id, @Nonnull byte[] snapshot) {
+ this.id = Preconditions.checkNotNull(id);
+ this.snapshot = Preconditions.checkNotNull(snapshot);
+ }
+
+ @Nonnull
+ public String getId() {
+ return id;
+ }
+
+ @Nonnull
+ public byte[] getSnapshot() {
+ return snapshot;
+ }
+
+ @Override
+ public String toString() {
+ return "GetSnapshotReply [id=" + id + ", snapshot.length=" + snapshot.length + "]";
+ }
+}
final RaftActor actorDelegate;
final RaftActorRecoveryCohort recoveryCohortDelegate;
- final RaftActorSnapshotCohort snapshotCohortDelegate;
+ volatile RaftActorSnapshotCohort snapshotCohortDelegate;
private final CountDownLatch recoveryComplete = new CountDownLatch(1);
private final List<Object> state;
private ActorRef roleChangeNotifier;
@Override
protected RaftActorSnapshotMessageSupport newRaftActorSnapshotMessageSupport() {
- return snapshotMessageSupport != null ? snapshotMessageSupport : super.newRaftActorSnapshotMessageSupport();
+ return snapshotMessageSupport != null ? snapshotMessageSupport :
+ (snapshotMessageSupport = super.newRaftActorSnapshotMessageSupport());
+ }
+
+ public RaftActorSnapshotMessageSupport getSnapshotMessageSupport() {
+ return snapshotMessageSupport;
}
public void waitForRecoveryComplete() {
}
private void sendMessageToSupport(Object message, boolean expHandled) {
- boolean handled = support.handleSnapshotMessage(message);
+ boolean handled = support.handleSnapshotMessage(message, mockRaftActorRef);
assertEquals("complete", expHandled, handled);
}
package org.opendaylight.controller.cluster.raft;
+import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNotSame;
import static org.mockito.Matchers.same;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.reset;
import static org.mockito.Mockito.timeout;
+import static org.mockito.Mockito.verify;
import akka.actor.ActorRef;
import akka.actor.PoisonPill;
import akka.actor.Props;
+import akka.actor.Status.Failure;
import akka.actor.Terminated;
import akka.dispatch.Dispatchers;
import akka.japi.Procedure;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import org.apache.commons.lang3.SerializationUtils;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
+import org.mockito.ArgumentCaptor;
import org.opendaylight.controller.cluster.DataPersistenceProvider;
import org.opendaylight.controller.cluster.NonPersistentDataProvider;
import org.opendaylight.controller.cluster.PersistentDataProvider;
import org.opendaylight.controller.cluster.raft.behaviors.Follower;
import org.opendaylight.controller.cluster.raft.behaviors.Leader;
import org.opendaylight.controller.cluster.raft.behaviors.RaftActorBehavior;
+import org.opendaylight.controller.cluster.raft.client.messages.GetSnapshot;
+import org.opendaylight.controller.cluster.raft.client.messages.GetSnapshotReply;
import org.opendaylight.controller.cluster.raft.messages.AppendEntries;
import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply;
+import org.opendaylight.controller.cluster.raft.policy.DisableElectionsRaftPolicy;
import org.opendaylight.controller.cluster.raft.utils.InMemoryJournal;
import org.opendaylight.controller.cluster.raft.utils.InMemorySnapshotStore;
import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import scala.concurrent.duration.Duration;
import scala.concurrent.duration.FiniteDuration;
public class RaftActorTest extends AbstractActorTest {
mockRaftActor.waitForRecoveryComplete();
ApplySnapshot applySnapshot = new ApplySnapshot(mock(Snapshot.class));
- doReturn(true).when(mockSupport).handleSnapshotMessage(same(applySnapshot));
+ doReturn(true).when(mockSupport).handleSnapshotMessage(same(applySnapshot), any(ActorRef.class));
mockRaftActor.handleCommand(applySnapshot);
CaptureSnapshot captureSnapshot = new CaptureSnapshot(1, 1, 1, 1, 0, 1, null);
- doReturn(true).when(mockSupport).handleSnapshotMessage(same(captureSnapshot));
+ doReturn(true).when(mockSupport).handleSnapshotMessage(same(captureSnapshot), any(ActorRef.class));
mockRaftActor.handleCommand(captureSnapshot);
CaptureSnapshotReply captureSnapshotReply = new CaptureSnapshotReply(new byte[0]);
- doReturn(true).when(mockSupport).handleSnapshotMessage(same(captureSnapshotReply));
+ doReturn(true).when(mockSupport).handleSnapshotMessage(same(captureSnapshotReply), any(ActorRef.class));
mockRaftActor.handleCommand(captureSnapshotReply);
SaveSnapshotSuccess saveSnapshotSuccess = new SaveSnapshotSuccess(mock(SnapshotMetadata.class));
- doReturn(true).when(mockSupport).handleSnapshotMessage(same(saveSnapshotSuccess));
+ doReturn(true).when(mockSupport).handleSnapshotMessage(same(saveSnapshotSuccess), any(ActorRef.class));
mockRaftActor.handleCommand(saveSnapshotSuccess);
SaveSnapshotFailure saveSnapshotFailure = new SaveSnapshotFailure(mock(SnapshotMetadata.class), new Throwable());
- doReturn(true).when(mockSupport).handleSnapshotMessage(same(saveSnapshotFailure));
+ doReturn(true).when(mockSupport).handleSnapshotMessage(same(saveSnapshotFailure), any(ActorRef.class));
mockRaftActor.handleCommand(saveSnapshotFailure);
- doReturn(true).when(mockSupport).handleSnapshotMessage(same(RaftActorSnapshotMessageSupport.COMMIT_SNAPSHOT));
+ doReturn(true).when(mockSupport).handleSnapshotMessage(same(RaftActorSnapshotMessageSupport.COMMIT_SNAPSHOT),
+ any(ActorRef.class));
mockRaftActor.handleCommand(RaftActorSnapshotMessageSupport.COMMIT_SNAPSHOT);
- verify(mockSupport).handleSnapshotMessage(same(applySnapshot));
- verify(mockSupport).handleSnapshotMessage(same(captureSnapshot));
- verify(mockSupport).handleSnapshotMessage(same(captureSnapshotReply));
- verify(mockSupport).handleSnapshotMessage(same(saveSnapshotSuccess));
- verify(mockSupport).handleSnapshotMessage(same(saveSnapshotFailure));
- verify(mockSupport).handleSnapshotMessage(same(RaftActorSnapshotMessageSupport.COMMIT_SNAPSHOT));
+ doReturn(true).when(mockSupport).handleSnapshotMessage(same(GetSnapshot.INSTANCE), any(ActorRef.class));
+ mockRaftActor.handleCommand(GetSnapshot.INSTANCE);
+
+ verify(mockSupport).handleSnapshotMessage(same(applySnapshot), any(ActorRef.class));
+ verify(mockSupport).handleSnapshotMessage(same(captureSnapshot), any(ActorRef.class));
+ verify(mockSupport).handleSnapshotMessage(same(captureSnapshotReply), any(ActorRef.class));
+ verify(mockSupport).handleSnapshotMessage(same(saveSnapshotSuccess), any(ActorRef.class));
+ verify(mockSupport).handleSnapshotMessage(same(saveSnapshotFailure), any(ActorRef.class));
+ verify(mockSupport).handleSnapshotMessage(same(RaftActorSnapshotMessageSupport.COMMIT_SNAPSHOT),
+ any(ActorRef.class));
+ verify(mockSupport).handleSnapshotMessage(same(GetSnapshot.INSTANCE), any(ActorRef.class));
}
@Test
assertEquals("Behavior State", RaftState.Follower,
mockRaftActor.getCurrentBehavior().state());
}
+
+ @Test
+ public void testGetSnapshot() throws Exception {
+ TEST_LOG.info("testGetSnapshot starting");
+
+ JavaTestKit kit = new JavaTestKit(getSystem());
+
+ String persistenceId = factory.generateActorId("test-actor-");
+ DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
+ config.setCustomRaftPolicyImplementationClass(DisableElectionsRaftPolicy.class.getName());
+
+ long term = 3;
+ long seqN = 1;
+ InMemoryJournal.addEntry(persistenceId, seqN++, new UpdateElectionTerm(term, "member-1"));
+ InMemoryJournal.addEntry(persistenceId, seqN++, new MockRaftActorContext.MockReplicatedLogEntry(term, 0,
+ new MockRaftActorContext.MockPayload("A")));
+ InMemoryJournal.addEntry(persistenceId, seqN++, new MockRaftActorContext.MockReplicatedLogEntry(term, 1,
+ new MockRaftActorContext.MockPayload("B")));
+ InMemoryJournal.addEntry(persistenceId, seqN++, new ApplyJournalEntries(1));
+ InMemoryJournal.addEntry(persistenceId, seqN++, new MockRaftActorContext.MockReplicatedLogEntry(term, 2,
+ new MockRaftActorContext.MockPayload("C")));
+
+ TestActorRef<MockRaftActor> raftActorRef = factory.createTestActor(MockRaftActor.props(persistenceId,
+ ImmutableMap.<String, String>builder().put("member1", "address").build(), Optional.<ConfigParams>of(config)).
+ withDispatcher(Dispatchers.DefaultDispatcherId()), persistenceId);
+ MockRaftActor mockRaftActor = raftActorRef.underlyingActor();
+
+ mockRaftActor.waitForRecoveryComplete();
+
+ // Wait for snapshot after recovery
+ verify(mockRaftActor.snapshotCohortDelegate, timeout(5000)).createSnapshot(any(ActorRef.class));
+
+ mockRaftActor.snapshotCohortDelegate = mock(RaftActorSnapshotCohort.class);
+
+ raftActorRef.tell(GetSnapshot.INSTANCE, kit.getRef());
+
+ ArgumentCaptor<ActorRef> replyActor = ArgumentCaptor.forClass(ActorRef.class);
+ verify(mockRaftActor.snapshotCohortDelegate, timeout(5000)).createSnapshot(replyActor.capture());
+
+ byte[] stateSnapshot = new byte[]{1,2,3};
+ replyActor.getValue().tell(new CaptureSnapshotReply(stateSnapshot), ActorRef.noSender());
+
+ GetSnapshotReply reply = kit.expectMsgClass(GetSnapshotReply.class);
+
+ assertEquals("getId", persistenceId, reply.getId());
+ Snapshot replySnapshot = SerializationUtils.deserialize(reply.getSnapshot());
+ assertEquals("getElectionTerm", term, replySnapshot.getElectionTerm());
+ assertEquals("getElectionVotedFor", "member-1", replySnapshot.getElectionVotedFor());
+ assertEquals("getLastAppliedIndex", 1L, replySnapshot.getLastAppliedIndex());
+ assertEquals("getLastAppliedTerm", term, replySnapshot.getLastAppliedTerm());
+ assertEquals("getLastIndex", 2L, replySnapshot.getLastIndex());
+ assertEquals("getLastTerm", term, replySnapshot.getLastTerm());
+ assertArrayEquals("getState", stateSnapshot, replySnapshot.getState());
+ assertEquals("getUnAppliedEntries size", 1, replySnapshot.getUnAppliedEntries().size());
+ assertEquals("UnApplied entry index ", 2L, replySnapshot.getUnAppliedEntries().get(0).getIndex());
+
+ // Test with timeout
+
+ mockRaftActor.getSnapshotMessageSupport().setSnapshotReplyActorTimeout(Duration.create(200, TimeUnit.MILLISECONDS));
+ reset(mockRaftActor.snapshotCohortDelegate);
+
+ raftActorRef.tell(GetSnapshot.INSTANCE, kit.getRef());
+ Failure failure = kit.expectMsgClass(akka.actor.Status.Failure.class);
+ assertEquals("Failure cause type", TimeoutException.class, failure.cause().getClass());
+
+ mockRaftActor.getSnapshotMessageSupport().setSnapshotReplyActorTimeout(Duration.create(30, TimeUnit.SECONDS));
+
+ // Test with persistence disabled.
+
+ mockRaftActor.setPersistence(false);
+ reset(mockRaftActor.snapshotCohortDelegate);
+
+ raftActorRef.tell(GetSnapshot.INSTANCE, kit.getRef());
+ reply = kit.expectMsgClass(GetSnapshotReply.class);
+ verify(mockRaftActor.snapshotCohortDelegate, never()).createSnapshot(any(ActorRef.class));
+
+ assertEquals("getId", persistenceId, reply.getId());
+ replySnapshot = SerializationUtils.deserialize(reply.getSnapshot());
+ assertEquals("getElectionTerm", term, replySnapshot.getElectionTerm());
+ assertEquals("getElectionVotedFor", "member-1", replySnapshot.getElectionVotedFor());
+ assertEquals("getLastAppliedIndex", -1L, replySnapshot.getLastAppliedIndex());
+ assertEquals("getLastAppliedTerm", -1L, replySnapshot.getLastAppliedTerm());
+ assertEquals("getLastIndex", -1L, replySnapshot.getLastIndex());
+ assertEquals("getLastTerm", -1L, replySnapshot.getLastTerm());
+ assertEquals("getState length", 0, replySnapshot.getState().length);
+ assertEquals("getUnAppliedEntries size", 0, replySnapshot.getUnAppliedEntries().size());
+
+ TEST_LOG.info("testGetSnapshot ending");
+ }
}