assertNotEquals("voted for", "foobar", mockRaftActor.getRaftActorContext().getTermInformation().getVotedFor());
mockRaftActor.onReceiveRecover(mock(RecoveryCompleted.class));
-
}};
}
MockRaftActor mockRaftActor = mockActorRef.underlyingActor();
+ mockRaftActor.waitForInitializeBehaviorComplete();
+
mockRaftActor.getRaftActorContext().getTermInformation().updateAndPersist(10, "foobar");
assertEquals("Persist called", true, persistLatch.await(5, TimeUnit.SECONDS));
-
}
-
};
}
MockRaftActor mockRaftActor = mockActorRef.underlyingActor();
+ mockRaftActor.waitForInitializeBehaviorComplete();
+
MockRaftActorContext.MockReplicatedLogEntry logEntry = new MockRaftActorContext.MockReplicatedLogEntry(10, 10, mock(Payload.class));
mockRaftActor.getRaftActorContext().getReplicatedLog().appendAndPersist(logEntry);
verify(dataPersistenceProvider).persist(eq(logEntry), any(Procedure.class));
-
}
-
};
}
MockRaftActor mockRaftActor = mockActorRef.underlyingActor();
+ mockRaftActor.waitForInitializeBehaviorComplete();
+
mockRaftActor.getReplicatedLog().appendAndPersist(new MockRaftActorContext.MockReplicatedLogEntry(1, 0, mock(Payload.class)));
mockRaftActor.getRaftActorContext().getReplicatedLog().removeFromAndPersist(0);
verify(dataPersistenceProvider, times(2)).persist(anyObject(), any(Procedure.class));
-
}
-
};
}
MockRaftActor mockRaftActor = mockActorRef.underlyingActor();
+ mockRaftActor.waitForInitializeBehaviorComplete();
+
mockRaftActor.onReceiveCommand(new ApplyLogEntries(10));
verify(dataPersistenceProvider, times(1)).persist(anyObject(), any(Procedure.class));
MockRaftActor mockRaftActor = mockActorRef.underlyingActor();
+ mockRaftActor.waitForInitializeBehaviorComplete();
+
ByteString snapshotBytes = fromObject(Arrays.asList(
new MockRaftActorContext.MockPayload("A"),
new MockRaftActorContext.MockPayload("B"),
MockRaftActor mockRaftActor = mockActorRef.underlyingActor();
+ mockRaftActor.waitForInitializeBehaviorComplete();
+
mockRaftActor.getReplicatedLog().append(new MockRaftActorContext.MockReplicatedLogEntry(1, 0, mock(Payload.class)));
mockRaftActor.getReplicatedLog().append(new MockRaftActorContext.MockReplicatedLogEntry(1, 1, mock(Payload.class)));
mockRaftActor.getReplicatedLog().append(new MockRaftActorContext.MockReplicatedLogEntry(1, 2, mock(Payload.class)));
MockRaftActor mockRaftActor = mockActorRef.underlyingActor();
+ mockRaftActor.waitForInitializeBehaviorComplete();
+
ReplicatedLogEntry entry = new MockRaftActorContext.MockReplicatedLogEntry(1, 5,
new MockRaftActorContext.MockPayload("F"));
MockRaftActor mockRaftActor = mockActorRef.underlyingActor();
+ mockRaftActor.waitForInitializeBehaviorComplete();
+
ReplicatedLog oldReplicatedLog = mockRaftActor.getReplicatedLog();
oldReplicatedLog.append(new MockRaftActorContext.MockReplicatedLogEntry(1, 0, mock(Payload.class)));
MockRaftActor mockRaftActor = mockActorRef.underlyingActor();
+ mockRaftActor.waitForInitializeBehaviorComplete();
+
ByteString snapshotBytes = fromObject(Arrays.asList(
new MockRaftActorContext.MockPayload("A"),
new MockRaftActorContext.MockPayload("B"),
public void testRaftRoleChangeNotifier() throws Exception {
new JavaTestKit(getSystem()) {{
ActorRef notifierActor = factory.createActor(Props.create(MessageCollectorActor.class));
+ MessageCollectorActor.waitUntilReady(notifierActor);
+
DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
+ long heartBeatInterval = 100;
+ config.setHeartBeatInterval(FiniteDuration.create(heartBeatInterval, TimeUnit.MILLISECONDS));
+ config.setElectionTimeoutFactor(1);
+
String persistenceId = factory.generateActorId("notifier-");
factory.createTestActor(MockRaftActor.props(persistenceId,
Collections.<String, String>emptyMap(), Optional.<ConfigParams>of(config), notifierActor), persistenceId);
- // sleeping for a minimum of 2 seconds, if it spans more its fine.
- Uninterruptibles.sleepUninterruptibly(2, TimeUnit.SECONDS);
+ List<RoleChanged> matches = null;
+ for(int i = 0; i < 5000 / heartBeatInterval; i++) {
+ matches = MessageCollectorActor.getAllMatching(notifierActor, RoleChanged.class);
+ assertNotNull(matches);
+ if(matches.size() == 3) {
+ break;
+ }
+ Uninterruptibles.sleepUninterruptibly(heartBeatInterval, TimeUnit.MILLISECONDS);
+ }
- List<Object> matches = MessageCollectorActor.getAllMatching(notifierActor, RoleChanged.class);
- assertNotNull(matches);
assertEquals(3, matches.size());
// check if the notifier got a role change from null to Follower
- RoleChanged raftRoleChanged = (RoleChanged) matches.get(0);
+ RoleChanged raftRoleChanged = matches.get(0);
assertEquals(persistenceId, raftRoleChanged.getMemberId());
assertNull(raftRoleChanged.getOldRole());
assertEquals(RaftState.Follower.name(), raftRoleChanged.getNewRole());
// check if the notifier got a role change from Follower to Candidate
- raftRoleChanged = (RoleChanged) matches.get(1);
+ raftRoleChanged = matches.get(1);
assertEquals(persistenceId, raftRoleChanged.getMemberId());
assertEquals(RaftState.Follower.name(), raftRoleChanged.getOldRole());
assertEquals(RaftState.Candidate.name(), raftRoleChanged.getNewRole());
// check if the notifier got a role change from Candidate to Leader
- raftRoleChanged = (RoleChanged) matches.get(2);
+ raftRoleChanged = matches.get(2);
assertEquals(persistenceId, raftRoleChanged.getMemberId());
assertEquals(RaftState.Candidate.name(), raftRoleChanged.getOldRole());
assertEquals(RaftState.Leader.name(), raftRoleChanged.getNewRole());
Map<String, String> peerAddresses = new HashMap<>();
peerAddresses.put(follower1Id, followerActor1.path().toString());
- TestActorRef<MockRaftActor> mockActorRef = TestActorRef.create(getSystem(),
+ TestActorRef<MockRaftActor> mockActorRef = factory.createTestActor(
MockRaftActor.props(persistenceId, peerAddresses,
Optional.<ConfigParams>of(config), dataPersistenceProvider), persistenceId);
MockRaftActor leaderActor = mockActorRef.underlyingActor();
+
leaderActor.getRaftActorContext().setCommitIndex(4);
leaderActor.getRaftActorContext().setLastApplied(4);
leaderActor.getRaftActorContext().getTermInformation().update(1, persistenceId);
Map<String, String> peerAddresses = new HashMap<>();
peerAddresses.put(leaderId, leaderActor1.path().toString());
- TestActorRef<MockRaftActor> mockActorRef = TestActorRef.create(getSystem(),
+ TestActorRef<MockRaftActor> mockActorRef = factory.createTestActor(
MockRaftActor.props(persistenceId, peerAddresses,
Optional.<ConfigParams>of(config), dataPersistenceProvider), persistenceId);
};
}
+
+ private static class NonPersistentProvider implements DataPersistenceProvider {
+ @Override
+ public boolean isRecoveryApplicable() {
+ return false;
+ }
+
+ @Override
+ public <T> void persist(T o, Procedure<T> procedure) {
+ try {
+ procedure.apply(o);
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ }
+
+ @Override
+ public void saveSnapshot(Object o) {
+
+ }
+
+ @Override
+ public void deleteSnapshots(SnapshotSelectionCriteria criteria) {
+
+ }
+
+ @Override
+ public void deleteMessages(long sequenceNumber) {
+
+ }
+ }
+
+ @Test
+ public void testRealSnapshotWhenReplicatedToAllIndexMinusOne() throws Exception {
+ new JavaTestKit(getSystem()) {{
+ String persistenceId = factory.generateActorId("leader-");
+ DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
+ config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
+ config.setIsolatedLeaderCheckInterval(new FiniteDuration(1, TimeUnit.DAYS));
+ config.setSnapshotBatchCount(5);
+
+ DataPersistenceProvider dataPersistenceProvider = new NonPersistentProvider();
+
+ Map<String, String> peerAddresses = new HashMap<>();
+
+ TestActorRef<MockRaftActor> mockActorRef = factory.createTestActor(
+ MockRaftActor.props(persistenceId, peerAddresses,
+ Optional.<ConfigParams>of(config), dataPersistenceProvider), persistenceId);
+
+ MockRaftActor leaderActor = mockActorRef.underlyingActor();
+ leaderActor.getRaftActorContext().setCommitIndex(3);
+ leaderActor.getRaftActorContext().setLastApplied(3);
+ leaderActor.getRaftActorContext().getTermInformation().update(1, persistenceId);
+
+ leaderActor.waitForInitializeBehaviorComplete();
+ for(int i=0;i< 4;i++) {
+ leaderActor.getReplicatedLog()
+ .append(new MockRaftActorContext.MockReplicatedLogEntry(1, i,
+ new MockRaftActorContext.MockPayload("A")));
+ }
+
+ Leader leader = new Leader(leaderActor.getRaftActorContext());
+ leaderActor.setCurrentBehavior(leader);
+ assertEquals(RaftState.Leader, leaderActor.getCurrentBehavior().state());
+
+ // Persist another entry (this will cause a CaptureSnapshot to be triggered
+ leaderActor.persistData(mockActorRef, "x", new MockRaftActorContext.MockPayload("duh"));
+
+ // Now send a CaptureSnapshotReply
+ mockActorRef.tell(new CaptureSnapshotReply(fromObject("foo").toByteArray()), mockActorRef);
+
+ // Trimming log in this scenario is a no-op
+ assertEquals(-1, leaderActor.getReplicatedLog().getSnapshotIndex());
+ assertFalse(leaderActor.getRaftActorContext().isSnapshotCaptureInitiated());
+ assertEquals(-1, leader.getReplicatedToAllIndex());
+
+ }};
+ }
+
+ @Test
+ public void testRealSnapshotWhenReplicatedToAllIndexNotInReplicatedLog() throws Exception {
+ new JavaTestKit(getSystem()) {{
+ String persistenceId = factory.generateActorId("leader-");
+ DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
+ config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
+ config.setIsolatedLeaderCheckInterval(new FiniteDuration(1, TimeUnit.DAYS));
+ config.setSnapshotBatchCount(5);
+
+ DataPersistenceProvider dataPersistenceProvider = new NonPersistentProvider();
+
+ Map<String, String> peerAddresses = new HashMap<>();
+
+ TestActorRef<MockRaftActor> mockActorRef = factory.createTestActor(
+ MockRaftActor.props(persistenceId, peerAddresses,
+ Optional.<ConfigParams>of(config), dataPersistenceProvider), persistenceId);
+
+ MockRaftActor leaderActor = mockActorRef.underlyingActor();
+ leaderActor.getRaftActorContext().setCommitIndex(3);
+ leaderActor.getRaftActorContext().setLastApplied(3);
+ leaderActor.getRaftActorContext().getTermInformation().update(1, persistenceId);
+ leaderActor.getReplicatedLog().setSnapshotIndex(3);
+
+ leaderActor.waitForInitializeBehaviorComplete();
+ Leader leader = new Leader(leaderActor.getRaftActorContext());
+ leaderActor.setCurrentBehavior(leader);
+ leader.setReplicatedToAllIndex(3);
+ assertEquals(RaftState.Leader, leaderActor.getCurrentBehavior().state());
+
+ // Persist another entry (this will cause a CaptureSnapshot to be triggered
+ leaderActor.persistData(mockActorRef, "x", new MockRaftActorContext.MockPayload("duh"));
+
+ // Now send a CaptureSnapshotReply
+ mockActorRef.tell(new CaptureSnapshotReply(fromObject("foo").toByteArray()), mockActorRef);
+
+ // Trimming log in this scenario is a no-op
+ assertEquals(3, leaderActor.getReplicatedLog().getSnapshotIndex());
+ assertFalse(leaderActor.getRaftActorContext().isSnapshotCaptureInitiated());
+ assertEquals(3, leader.getReplicatedToAllIndex());
+
+ }};
+ }
+
private ByteString fromObject(Object snapshot) throws Exception {
ByteArrayOutputStream b = null;
ObjectOutputStream o = null;