initializeBehavior();
raftRecovery = null;
+
+ if (context.getReplicatedLog().size() > 0) {
+ self().tell(new InitiateCaptureSnapshot(), self());
+ LOG.info("Snapshot capture initiated after recovery");
+ } else {
+ LOG.info("Snapshot capture NOT initiated after recovery, journal empty");
+ }
}
}
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.timeout;
import akka.actor.ActorRef;
import akka.actor.PoisonPill;
import akka.actor.Props;
kit.waitUntilLeader();
}
+
@Test
public void testRaftActorRecoveryWithPersistenceEnabled() throws Exception {
TEST_LOG.info("testRaftActorRecoveryWithPersistenceEnabled starting");
}};
}
+ @Test
+ public void testRaftActorOnRecoverySnapshot() throws Exception {
+ TEST_LOG.info("testRaftActorOnRecoverySnapshot");
+
+ new JavaTestKit(getSystem()) {{
+ String persistenceId = factory.generateActorId("follower-");
+
+ DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
+
+ // Set the heartbeat interval high to essentially disable election otherwise the test
+ // may fail if the actor is switched to Leader
+ config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
+
+ ImmutableMap<String, String> peerAddresses = ImmutableMap.<String, String>builder().put("member1", "address").build();
+
+ // Create mock ReplicatedLogEntry
+ ReplicatedLogEntry replLogEntry = new MockRaftActorContext.MockReplicatedLogEntry(1,1,
+ new MockRaftActorContext.MockPayload("F", 1));
+
+ InMemoryJournal.addEntry(persistenceId, 1, replLogEntry);
+
+ TestActorRef<MockRaftActor> ref = factory.createTestActor(
+ MockRaftActor.props(persistenceId, peerAddresses, Optional.<ConfigParams>of(config)));
+
+ MockRaftActor mockRaftActor = ref.underlyingActor();
+
+ mockRaftActor.waitForRecoveryComplete();
+
+ mockRaftActor.waitForInitializeBehaviorComplete();
+
+ verify(mockRaftActor.snapshotCohortDelegate, timeout(5000)).createSnapshot(any(ActorRef.class));
+ }};
+ }
+
@Test
public void testSwitchBehavior(){
String persistenceId = factory.generateActorId("leader-");
// Verify the persisted snapshot in the leader. This should reflect the advanced snapshot index as
// the last applied log entry (2) even though the leader hasn't yet advanced its cached snapshot index.
List<Snapshot> persistedSnapshots = InMemorySnapshotStore.getSnapshots(leaderId, Snapshot.class);
- assertEquals("Persisted snapshots size", 1, persistedSnapshots.size());
- verifySnapshot("Persisted", persistedSnapshots.get(0), initialTerm, 2, currentTerm, 3);
- List<ReplicatedLogEntry> unAppliedEntry = persistedSnapshots.get(0).getUnAppliedEntries();
+ assertEquals("Persisted snapshots size", 2, persistedSnapshots.size());
+ verifySnapshot("Persisted", persistedSnapshots.get(1), initialTerm, 2, currentTerm, 3);
+ List<ReplicatedLogEntry> unAppliedEntry = persistedSnapshots.get(1).getUnAppliedEntries();
assertEquals("Persisted Snapshot getUnAppliedEntries size", 1, unAppliedEntry.size());
verifyReplicatedLogEntry(unAppliedEntry.get(0), currentTerm, 3, payload3);
Props.create(new DelegatingShardCreator(creator)), shardActorName);
waitUntilLeader(shard);
-
writeToStore(shard, TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
final NormalizedNode<?,?> expectedRoot = readStore(shard, YangInstanceIdentifier.builder().build());
// Trigger creation of a snapshot by ensuring
final RaftActorContext raftActorContext = ((TestShard) shard.underlyingActor()).getRaftActorContext();
raftActorContext.getSnapshotManager().capture(mock(ReplicatedLogEntry.class), -1);
-
- assertEquals("Snapshot saved", true, latch.get().await(5, TimeUnit.SECONDS));
-
- assertTrue("Invalid saved snapshot " + savedSnapshot.get(),
- savedSnapshot.get() instanceof Snapshot);
-
- verifySnapshot((Snapshot)savedSnapshot.get(), expectedRoot);
-
- latch.set(new CountDownLatch(1));
- savedSnapshot.set(null);
+ awaitAndValidateSnapshot(expectedRoot);
raftActorContext.getSnapshotManager().capture(mock(ReplicatedLogEntry.class), -1);
+ awaitAndValidateSnapshot(expectedRoot);
- assertEquals("Snapshot saved", true, latch.get().await(5, TimeUnit.SECONDS));
+ shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
+ }
- assertTrue("Invalid saved snapshot " + savedSnapshot.get(),
- savedSnapshot.get() instanceof Snapshot);
+ private void awaitAndValidateSnapshot(NormalizedNode<?,?> expectedRoot
+ ) throws InterruptedException {
+ System.out.println("Inside awaitAndValidateSnapshot {}" + savedSnapshot.get());
+ assertEquals("Snapshot saved", true, latch.get().await(5, TimeUnit.SECONDS));
- verifySnapshot((Snapshot)savedSnapshot.get(), expectedRoot);
+ assertTrue("Invalid saved snapshot " + savedSnapshot.get(),
+ savedSnapshot.get() instanceof Snapshot);
- shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
- }
+ verifySnapshot((Snapshot)savedSnapshot.get(), expectedRoot);
+
+ latch.set(new CountDownLatch(1));
+ savedSnapshot.set(null);
+ }
- private void verifySnapshot(final Snapshot snapshot, final NormalizedNode<?,?> expectedRoot) {
+ private void verifySnapshot(final Snapshot snapshot, final NormalizedNode<?,?> expectedRoot) {
- final NormalizedNode<?, ?> actual = SerializationUtils.deserializeNormalizedNode(snapshot.getState());
- assertEquals("Root node", expectedRoot, actual);
+ final NormalizedNode<?, ?> actual = SerializationUtils.deserializeNormalizedNode(snapshot.getState());
+ assertEquals("Root node", expectedRoot, actual);
- }};
+ }
+ };
}
/**