context.getConfigParams().getJournalRecoveryLogBatchSize());
super.preStart();
+
+ snapshotSupport = newRaftActorSnapshotMessageSupport();
}
@Override
@Override
public void handleCommand(Object message) {
- if(snapshotSupport == null) {
- snapshotSupport = newRaftActorSnapshotMessageSupport();
- }
-
boolean handled = snapshotSupport.handleSnapshotMessage(message);
if(handled) {
return;
import akka.persistence.SaveSnapshotSuccess;
import org.opendaylight.controller.cluster.DataPersistenceProvider;
import org.opendaylight.controller.cluster.raft.base.messages.ApplySnapshot;
-import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshot;
import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshotReply;
import org.opendaylight.controller.cluster.raft.behaviors.RaftActorBehavior;
import org.slf4j.Logger;
this.currentBehavior = currentBehavior;
this.cohort = cohort;
this.log = context.getLogger();
+
+ context.getSnapshotManager().setCreateSnapshotCallable(createSnapshotProcedure);
}
boolean handleSnapshotMessage(Object message) {
} else if (message instanceof SaveSnapshotFailure) {
onSaveSnapshotFailure((SaveSnapshotFailure) message);
return true;
- } else if (message instanceof CaptureSnapshot) {
- onCaptureSnapshot(message);
- return true;
} else if (message instanceof CaptureSnapshotReply) {
onCaptureSnapshotReply(((CaptureSnapshotReply) message).getSnapshot());
return true;
context.getSnapshotManager().persist(persistence, snapshotBytes, currentBehavior, context.getTotalMemory());
}
- private void onCaptureSnapshot(Object message) {
- log.debug("{}: CaptureSnapshot received by actor: {}", context.getId(), message);
-
- context.getSnapshotManager().create(createSnapshotProcedure);
- }
-
private void onSaveSnapshotFailure(SaveSnapshotFailure saveSnapshotFailure) {
log.error("{}: SaveSnapshotFailure received for snapshot Cause:",
context.getId(), saveSnapshotFailure.cause());
import akka.japi.Procedure;
import akka.persistence.SnapshotSelectionCriteria;
+import com.google.common.annotations.VisibleForTesting;
import com.google.protobuf.ByteString;
import java.util.List;
import org.opendaylight.controller.cluster.DataPersistenceProvider;
public class SnapshotManager implements SnapshotState {
private final SnapshotState IDLE = new Idle();
- private final SnapshotState CAPTURING = new Capturing();
private final SnapshotState PERSISTING = new Persisting();
private final SnapshotState CREATING = new Creating();
private CaptureSnapshot captureSnapshot;
private long lastSequenceNumber = -1;
+ private Procedure<Void> createSnapshotProcedure;
+
public SnapshotManager(RaftActorContext context, Logger logger) {
this.context = context;
this.LOG = logger;
return currentState.capture(lastLogEntry, replicatedToAllIndex);
}
- @Override
- public void create(Procedure<Void> callback) {
- currentState.create(callback);
- }
-
@Override
public void persist(DataPersistenceProvider persistenceProvider, byte[] snapshotBytes,
RaftActorBehavior currentBehavior, long totalMemory) {
return currentState.trimLog(desiredTrimIndex, currentBehavior);
}
+ public void setCreateSnapshotCallable(Procedure<Void> createSnapshotProcedure) {
+ this.createSnapshotProcedure = createSnapshotProcedure;
+ }
+
+ @VisibleForTesting
+ public CaptureSnapshot getCaptureSnapshot() {
+ return captureSnapshot;
+ }
+
private boolean hasFollowers(){
return context.getPeerAddresses().keySet().size() > 0;
}
return false;
}
- @Override
- public void create(Procedure<Void> callback) {
- LOG.debug("create should not be called in state {}", this);
- }
-
@Override
public void persist(DataPersistenceProvider persistenceProvider, byte[] snapshotBytes,
RaftActorBehavior currentBehavior, long totalMemory) {
lastLogEntry.getTerm(), lastAppliedIndex, lastAppliedTerm,
newReplicatedToAllIndex, newReplicatedToAllTerm, unAppliedEntries, targetFollower != null);
- SnapshotManager.this.currentState = CAPTURING;
-
if(captureSnapshot.isInstallSnapshotInitiated()) {
LOG.info("{}: Initiating snapshot capture {} to install on {}",
persistenceId(), captureSnapshot, targetFollower);
LOG.debug("lastSequenceNumber prior to capture: {}", lastSequenceNumber);
- context.getActor().tell(captureSnapshot, context.getActor());
+ try {
+ createSnapshotProcedure.apply(null);
+ } catch (Exception e) {
+ LOG.error("Error creating snapshot", e);
+ return false;
+ }
+ SnapshotManager.this.currentState = CREATING;
return true;
}
}
}
- private class Capturing extends AbstractSnapshotState {
-
- @Override
- public boolean isCapturing() {
- return true;
- }
-
- @Override
- public void create(Procedure<Void> callback) {
- try {
- callback.apply(null);
- SnapshotManager.this.currentState = CREATING;
- } catch (Exception e) {
- LOG.error("Unexpected error occurred", e);
- }
- }
-
- @Override
- public String toString() {
- return "Capturing";
- }
-
- }
-
private class Creating extends AbstractSnapshotState {
@Override
package org.opendaylight.controller.cluster.raft;
-import akka.japi.Procedure;
import org.opendaylight.controller.cluster.DataPersistenceProvider;
import org.opendaylight.controller.cluster.raft.behaviors.RaftActorBehavior;
*/
boolean captureToInstall(ReplicatedLogEntry lastLogEntry, long replicatedToAllIndex, String targetFollower);
- /**
- * Create the snapshot
- *
- * @param callback a procedure to be called which should create the snapshot
- */
- void create(Procedure<Void> callback);
-
/**
* Persist the snapshot
*
public SnapshotManager getSnapshotManager() {
if(this.snapshotManager == null){
this.snapshotManager = new SnapshotManager(this, getLogger());
+ this.snapshotManager.setCreateSnapshotCallable(NoopProcedure.<Void>instance());
}
return this.snapshotManager;
}
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.verify;
import akka.actor.ActorRef;
-import akka.japi.Procedure;
import akka.persistence.SaveSnapshotFailure;
import akka.persistence.SaveSnapshotSuccess;
import akka.persistence.SnapshotMetadata;
import java.util.Collections;
import org.junit.Before;
import org.junit.Test;
-import org.mockito.ArgumentCaptor;
import org.mockito.Mock;
import org.mockito.MockitoAnnotations;
import org.opendaylight.controller.cluster.DataPersistenceProvider;
import org.opendaylight.controller.cluster.raft.MockRaftActorContext.MockPayload;
import org.opendaylight.controller.cluster.raft.MockRaftActorContext.MockReplicatedLogEntry;
import org.opendaylight.controller.cluster.raft.base.messages.ApplySnapshot;
-import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshot;
import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshotReply;
import org.opendaylight.controller.cluster.raft.behaviors.RaftActorBehavior;
import org.slf4j.Logger;
verify(mockCohort).applySnapshot(snapshotBytes);
}
- @SuppressWarnings({ "rawtypes", "unchecked" })
- @Test
- public void testOnCaptureSnapshot() throws Exception {
-
- sendMessageToSupport(new CaptureSnapshot(3, 1, 2, 1, 2, 1, null));
-
- ArgumentCaptor<Procedure> procedure = ArgumentCaptor.forClass(Procedure.class);
- verify(mockSnapshotManager).create(procedure.capture());
-
- procedure.getValue().apply(null);
-
- verify(mockCohort).createSnapshot(same(mockRaftActorRef));
- }
-
@Test
public void testOnCaptureSnapshotReply() {
import org.junit.Test;
import org.opendaylight.controller.cluster.raft.MockRaftActorContext.MockPayload;
import org.opendaylight.controller.cluster.raft.base.messages.ApplyJournalEntries;
-import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshot;
import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshotReply;
import org.opendaylight.controller.cluster.raft.messages.AppendEntries;
import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor;
send2InitialPayloads();
// Block these messages initially so we can control the sequence.
- leaderActor.underlyingActor().startDropMessages(CaptureSnapshot.class);
leaderActor.underlyingActor().startDropMessages(CaptureSnapshotReply.class);
follower1Actor.underlyingActor().startDropMessages(AppendEntries.class);
MessageCollectorActor.expectMatching(follower1CollectorActor, AppendEntries.class, 3);
- CaptureSnapshot captureSnapshot = MessageCollectorActor.expectFirstMatching(
- leaderCollectorActor, CaptureSnapshot.class);
-
- // First, deliver the CaptureSnapshot to the leader.
- leaderActor.underlyingActor().stopDropMessages(CaptureSnapshot.class);
- leaderActor.tell(captureSnapshot, leaderActor);
-
// Send another payload.
MockPayload payload4 = sendPayloadData(leaderActor, "four");
}
@Test
- public void testStatePersistedBetweenInitiateSnapshotAndCapture() {
-
- send2InitialPayloads();
-
- // Block these messages initially so we can control the sequence.
- leaderActor.underlyingActor().startDropMessages(CaptureSnapshot.class);
- follower1Actor.underlyingActor().startDropMessages(AppendEntries.class);
-
- MockPayload payload2 = sendPayloadData(leaderActor, "two");
-
- // This should trigger a snapshot.
- MockPayload payload3 = sendPayloadData(leaderActor, "three");
-
- // Send another payload.
- MockPayload payload4 = sendPayloadData(leaderActor, "four");
-
- MessageCollectorActor.expectMatching(follower1CollectorActor, AppendEntries.class, 3);
-
- CaptureSnapshot captureSnapshot = MessageCollectorActor.expectFirstMatching(
- leaderCollectorActor, CaptureSnapshot.class);
-
- // First, deliver the AppendEntries to the follower
- follower1Actor.underlyingActor().stopDropMessages(AppendEntries.class);
-
- MessageCollectorActor.expectMatching(leaderCollectorActor, ApplyJournalEntries.class, 3);
-
- // Now deliver the CaptureSnapshot to the leader.
- leaderActor.underlyingActor().stopDropMessages(CaptureSnapshot.class);
- leaderActor.tell(captureSnapshot, leaderActor);
-
- // Wait for snapshot complete.
- MessageCollectorActor.expectFirstMatching(leaderCollectorActor, SaveSnapshotSuccess.class);
-
- reinstateLeaderActor();
-
- assertEquals("Leader snapshot term", currentTerm, leaderContext.getReplicatedLog().getSnapshotTerm());
- assertEquals("Leader snapshot index", 1, leaderContext.getReplicatedLog().getSnapshotIndex());
- assertEquals("Leader journal log size", 3, leaderContext.getReplicatedLog().size());
- assertEquals("Leader journal last index", 4, leaderContext.getReplicatedLog().lastIndex());
- assertEquals("Leader commit index", 4, leaderContext.getCommitIndex());
- assertEquals("Leader last applied", 4, leaderContext.getLastApplied());
-
- // payloads 2, 3, and 4 were applied after the snapshot was initiated and before it was captured so
- // were included in the snapshot. They were also included as unapplied entries in the snapshot as
- // they weren't yet applied to the state at the time the snapshot was initiated. They were applied to the
- // state on recovery by the ApplyJournalEntries messages which remained in the persisted log.
- // This is a side effect of trimming the persisted log to the sequence number captured at the time
- // the snapshot was initiated.
- assertEquals("Leader state", Arrays.asList(payload0, payload1, payload2, payload3, payload4, payload2,
- payload3, payload4), leaderActor.underlyingActor().getState());
- }
-
- @Test
- public void testApplyJournalEntriesPersistedAfterSnapshotPersisted() {
+ public void testStatePersistedAfterSnapshotPersisted() {
send2InitialPayloads();
import org.opendaylight.controller.cluster.raft.RaftActor.UpdateElectionTerm;
import org.opendaylight.controller.cluster.raft.base.messages.ApplyJournalEntries;
import org.opendaylight.controller.cluster.raft.base.messages.ApplyState;
-import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshot;
import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshotReply;
import org.opendaylight.controller.cluster.raft.messages.AppendEntries;
import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply;
expSnapshotState.add(payload6);
// Delay the CaptureSnapshot message to the leader actor.
- leaderActor.underlyingActor().startDropMessages(CaptureSnapshot.class);
+ leaderActor.underlyingActor().startDropMessages(CaptureSnapshotReply.class);
// Send the payload.
payload7 = sendPayloadData(leaderActor, "seven");
- // Capture the CaptureSnapshot message so we can send it later.
- CaptureSnapshot captureSnapshot = MessageCollectorActor.expectFirstMatching(
- leaderCollectorActor, CaptureSnapshot.class);
+ // Capture the CaptureSnapshotReply message so we can send it later.
+ CaptureSnapshotReply captureSnapshotReply = MessageCollectorActor.expectFirstMatching(leaderCollectorActor,
+ CaptureSnapshotReply.class);
// Wait for the state to be applied in the leader.
ApplyState applyState = MessageCollectorActor.expectFirstMatching(leaderCollectorActor, ApplyState.class);
assertEquals("Leader last applied", 7, leaderContext.getLastApplied());
assertEquals("Leader replicatedToAllIndex", 5, leader.getReplicatedToAllIndex());
- // Now deliver the CaptureSnapshot.
- leaderActor.underlyingActor().stopDropMessages(CaptureSnapshot.class);
- leaderActor.tell(captureSnapshot, leaderActor);
-
- // Wait for CaptureSnapshotReply to complete.
- MessageCollectorActor.expectFirstMatching(leaderCollectorActor, CaptureSnapshotReply.class);
+ // Now deliver the CaptureSnapshotReply.
+ leaderActor.underlyingActor().stopDropMessages(CaptureSnapshotReply.class);
+ leaderActor.tell(captureSnapshotReply, leaderActor);
// Wait for snapshot complete.
MessageCollectorActor.expectFirstMatching(leaderCollectorActor, SaveSnapshotSuccess.class);
assertEquals("Leader journal last index", 7, leaderContext.getReplicatedLog().lastIndex());
assertEquals("Leader commit index", 7, leaderContext.getCommitIndex());
- expSnapshotState.add(payload7);
-
// Verify the persisted snapshot. This should reflect the snapshot index as the last applied
// log entry (7) and shouldn't contain any unapplied entries as we capture persisted the snapshot data
// when the snapshot is created (ie when the CaptureSnapshot is processed).
assertEquals("Follower 2 journal last index", 7, follower2Context.getReplicatedLog().lastIndex());
assertEquals("Follower 2 commit index", 7, follower2Context.getCommitIndex());
+ expSnapshotState.add(payload7);
+
testLog.info("testSecondSnapshot ending");
}
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.anyLong;
import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.reset;
import com.google.common.collect.ImmutableMap;
import java.util.Arrays;
import java.util.HashMap;
-import java.util.List;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
actorRef = factory.createTestActor(MessageCollectorActor.props(), factory.generateActorId("test-"));
doReturn(actorRef).when(mockRaftActorContext).getActor();
+ snapshotManager.setCreateSnapshotCallable(mockProcedure);
}
@After
}
@Test
- public void testCaptureToInstall(){
+ public void testCaptureToInstall() throws Exception {
// Force capturing toInstall = true
snapshotManager.captureToInstall(new MockRaftActorContext.MockReplicatedLogEntry(1, 0,
assertEquals(true, snapshotManager.isCapturing());
- CaptureSnapshot captureSnapshot = MessageCollectorActor.expectFirstMatching(actorRef, CaptureSnapshot.class);
+ verify(mockProcedure).apply(null);
+
+ CaptureSnapshot captureSnapshot = snapshotManager.getCaptureSnapshot();
// LastIndex and LastTerm are picked up from the lastLogEntry
assertEquals(0L, captureSnapshot.getLastIndex());
}
@Test
- public void testCapture(){
+ public void testCapture() throws Exception {
boolean capture = snapshotManager.capture(new MockRaftActorContext.MockReplicatedLogEntry(1,9,
new MockRaftActorContext.MockPayload()), 9);
assertEquals(true, snapshotManager.isCapturing());
- CaptureSnapshot captureSnapshot = MessageCollectorActor.expectFirstMatching(actorRef, CaptureSnapshot.class);
+ verify(mockProcedure).apply(null);
+
+ CaptureSnapshot captureSnapshot = snapshotManager.getCaptureSnapshot();
+
// LastIndex and LastTerm are picked up from the lastLogEntry
assertEquals(9L, captureSnapshot.getLastIndex());
assertEquals(1L, captureSnapshot.getLastTerm());
}
+ @Test
+ public void testCaptureWithCreateProcedureError () throws Exception {
+ doThrow(new Exception("mock")).when(mockProcedure).apply(null);
+
+ boolean capture = snapshotManager.capture(new MockRaftActorContext.MockReplicatedLogEntry(1,9,
+ new MockRaftActorContext.MockPayload()), 9);
+
+ assertFalse(capture);
+
+ assertEquals(false, snapshotManager.isCapturing());
+
+ verify(mockProcedure).apply(null);
+ }
+
@Test
public void testIllegalCapture() throws Exception {
boolean capture = snapshotManager.capture(new MockRaftActorContext.MockReplicatedLogEntry(1,9,
assertTrue(capture);
- List<CaptureSnapshot> allMatching = MessageCollectorActor.getAllMatching(actorRef, CaptureSnapshot.class);
+ verify(mockProcedure).apply(null);
- assertEquals(1, allMatching.size());
+ reset(mockProcedure);
// This will not cause snapshot capture to start again
capture = snapshotManager.capture(new MockRaftActorContext.MockReplicatedLogEntry(1,9,
assertFalse(capture);
- allMatching = MessageCollectorActor.getAllMatching(actorRef, CaptureSnapshot.class);
-
- assertEquals(1, allMatching.size());
+ verify(mockProcedure, never()).apply(null);
}
@Test
// when replicatedToAllIndex = -1
snapshotManager.capture(lastLogEntry, -1);
- snapshotManager.create(mockProcedure);
-
byte[] bytes = new byte[] {1,2,3,4,5,6,7,8,9,10};
snapshotManager.persist(mockDataPersistenceProvider, bytes, mockRaftActorBehavior
, Runtime.getRuntime().totalMemory());
verify(mockReplicatedLog).snapshotPreCommit(7L, 1L);
}
-
- @Test
- public void testCreate() throws Exception {
- // when replicatedToAllIndex = -1
- snapshotManager.capture(new MockRaftActorContext.MockReplicatedLogEntry(6,9,
- new MockRaftActorContext.MockPayload()), -1);
-
- snapshotManager.create(mockProcedure);
-
- verify(mockProcedure).apply(null);
-
- assertEquals("isCapturing", true, snapshotManager.isCapturing());
- }
-
- @Test
- public void testCallingCreateMultipleTimesCausesNoHarm() throws Exception {
- // when replicatedToAllIndex = -1
- snapshotManager.capture(new MockRaftActorContext.MockReplicatedLogEntry(6,9,
- new MockRaftActorContext.MockPayload()), -1);
-
- snapshotManager.create(mockProcedure);
-
- snapshotManager.create(mockProcedure);
-
- verify(mockProcedure, times(1)).apply(null);
- }
-
- @Test
- public void testCallingCreateBeforeCapture() throws Exception {
- snapshotManager.create(mockProcedure);
-
- verify(mockProcedure, times(0)).apply(null);
- }
-
- @Test
- public void testCallingCreateAfterPersist() throws Exception {
- // when replicatedToAllIndex = -1
- snapshotManager.capture(new MockRaftActorContext.MockReplicatedLogEntry(6,9,
- new MockRaftActorContext.MockPayload()), -1);
-
- snapshotManager.create(mockProcedure);
-
- verify(mockProcedure, times(1)).apply(null);
-
- snapshotManager.persist(mockDataPersistenceProvider, new byte[]{}, mockRaftActorBehavior
- , Runtime.getRuntime().totalMemory());
-
- reset(mockProcedure);
-
- snapshotManager.create(mockProcedure);
-
- verify(mockProcedure, never()).apply(null);
- }
-
@Test
public void testPersistWhenReplicatedToAllIndexNotMinus(){
doReturn(45L).when(mockReplicatedLog).getSnapshotIndex();
snapshotManager.capture(new MockRaftActorContext.MockReplicatedLogEntry(6,9,
new MockRaftActorContext.MockPayload()), 9);
- snapshotManager.create(mockProcedure);
-
byte[] bytes = new byte[] {1,2,3,4,5,6,7,8,9,10};
snapshotManager.persist(mockDataPersistenceProvider, bytes, mockRaftActorBehavior
, Runtime.getRuntime().totalMemory());
snapshotManager.capture(new MockRaftActorContext.MockReplicatedLogEntry(6,9,
new MockRaftActorContext.MockPayload()), -1);
- snapshotManager.create(mockProcedure);
-
snapshotManager.persist(mockDataPersistenceProvider, new byte[]{}, mockRaftActorBehavior
, Runtime.getRuntime().totalMemory());
assertTrue(capture);
- snapshotManager.create(mockProcedure);
-
byte[] bytes = new byte[] {1,2,3,4,5,6,7,8,9,10};
snapshotManager.persist(mockDataPersistenceProvider, bytes, mockRaftActorBehavior
snapshotManager.captureToInstall(new MockRaftActorContext.MockReplicatedLogEntry(6, 9,
new MockRaftActorContext.MockPayload()), -1, "follower-1");
- snapshotManager.create(mockProcedure);
-
snapshotManager.persist(mockDataPersistenceProvider, new byte[]{}, mockRaftActorBehavior
, Runtime.getRuntime().totalMemory());
snapshotManager.captureToInstall(new MockRaftActorContext.MockReplicatedLogEntry(6, 9,
new MockRaftActorContext.MockPayload()), -1, "follower-1");
- snapshotManager.create(mockProcedure);
-
snapshotManager.persist(mockDataPersistenceProvider, new byte[]{}, mockRaftActorBehavior
, Runtime.getRuntime().totalMemory());
snapshotManager.captureToInstall(new MockRaftActorContext.MockReplicatedLogEntry(6, 9,
new MockRaftActorContext.MockPayload()), -1, "follower-1");
- snapshotManager.create(mockProcedure);
-
snapshotManager.persist(mockDataPersistenceProvider, new byte[]{}, mockRaftActorBehavior
, Runtime.getRuntime().totalMemory());
snapshotManager.captureToInstall(new MockRaftActorContext.MockReplicatedLogEntry(6, 9,
new MockRaftActorContext.MockPayload()), -1, "follower-1");
- snapshotManager.create(mockProcedure);
-
snapshotManager.persist(mockDataPersistenceProvider, new byte[]{}, mockRaftActorBehavior
, Runtime.getRuntime().totalMemory());
snapshotManager.captureToInstall(new MockRaftActorContext.MockReplicatedLogEntry(6, 9,
new MockRaftActorContext.MockPayload()), -1, "follower-1");
- snapshotManager.create(mockProcedure);
-
snapshotManager.persist(mockDataPersistenceProvider, new byte[]{}, mockRaftActorBehavior
, Runtime.getRuntime().totalMemory());
assertTrue(raftBehavior instanceof Leader);
- MessageCollectorActor.expectFirstMatching(leaderActor, CaptureSnapshot.class);
+ assertEquals("isCapturing", true, actorContext.getSnapshotManager().isCapturing());
}
@Test
leader.handleMessage(leaderActor, new Replicate(null, "state-id", entry));
- CaptureSnapshot cs = MessageCollectorActor.expectFirstMatching(leaderActor, CaptureSnapshot.class);
+ assertEquals("isCapturing", true, actorContext.getSnapshotManager().isCapturing());
+
+ CaptureSnapshot cs = actorContext.getSnapshotManager().getCaptureSnapshot();
assertTrue(cs.isInstallSnapshotInitiated());
assertEquals(3, cs.getLastAppliedIndex());
// if an initiate is started again when first is in progress, it shouldnt initiate Capture
leader.handleMessage(leaderActor, new Replicate(null, "state-id", entry));
- List<CaptureSnapshot> captureSnapshots = MessageCollectorActor.getAllMatching(leaderActor, CaptureSnapshot.class);
- assertEquals("CaptureSnapshot should not get invoked when initiate is in progress", 1, captureSnapshots.size());
+ Assert.assertSame("CaptureSnapshot instance", cs, actorContext.getSnapshotManager().getCaptureSnapshot());
}
@Test