assertNotEquals("voted for", "foobar", mockRaftActor.getRaftActorContext().getTermInformation().getVotedFor());
mockRaftActor.onReceiveRecover(mock(RecoveryCompleted.class));
-
}};
}
MockRaftActor mockRaftActor = mockActorRef.underlyingActor();
+ mockRaftActor.waitForInitializeBehaviorComplete();
+
mockRaftActor.getRaftActorContext().getTermInformation().updateAndPersist(10, "foobar");
assertEquals("Persist called", true, persistLatch.await(5, TimeUnit.SECONDS));
-
}
-
};
}
MockRaftActor mockRaftActor = mockActorRef.underlyingActor();
+ mockRaftActor.waitForInitializeBehaviorComplete();
+
MockRaftActorContext.MockReplicatedLogEntry logEntry = new MockRaftActorContext.MockReplicatedLogEntry(10, 10, mock(Payload.class));
mockRaftActor.getRaftActorContext().getReplicatedLog().appendAndPersist(logEntry);
verify(dataPersistenceProvider).persist(eq(logEntry), any(Procedure.class));
-
}
-
};
}
MockRaftActor mockRaftActor = mockActorRef.underlyingActor();
+ mockRaftActor.waitForInitializeBehaviorComplete();
+
mockRaftActor.getReplicatedLog().appendAndPersist(new MockRaftActorContext.MockReplicatedLogEntry(1, 0, mock(Payload.class)));
mockRaftActor.getRaftActorContext().getReplicatedLog().removeFromAndPersist(0);
verify(dataPersistenceProvider, times(2)).persist(anyObject(), any(Procedure.class));
-
}
-
};
}
MockRaftActor mockRaftActor = mockActorRef.underlyingActor();
+ mockRaftActor.waitForInitializeBehaviorComplete();
+
mockRaftActor.onReceiveCommand(new ApplyLogEntries(10));
verify(dataPersistenceProvider, times(1)).persist(anyObject(), any(Procedure.class));
MockRaftActor mockRaftActor = mockActorRef.underlyingActor();
+ mockRaftActor.waitForInitializeBehaviorComplete();
+
ByteString snapshotBytes = fromObject(Arrays.asList(
new MockRaftActorContext.MockPayload("A"),
new MockRaftActorContext.MockPayload("B"),
new MockRaftActorContext.MockPayload("C"),
new MockRaftActorContext.MockPayload("D")));
- mockRaftActor.onReceiveCommand(new CaptureSnapshot(-1, 1, -1, 1));
+ mockRaftActor.onReceiveCommand(new CaptureSnapshot(-1, 1,-1, 1, -1, 1));
RaftActorContext raftActorContext = mockRaftActor.getRaftActorContext();
MockRaftActor mockRaftActor = mockActorRef.underlyingActor();
+ mockRaftActor.waitForInitializeBehaviorComplete();
+
mockRaftActor.getReplicatedLog().append(new MockRaftActorContext.MockReplicatedLogEntry(1, 0, mock(Payload.class)));
mockRaftActor.getReplicatedLog().append(new MockRaftActorContext.MockReplicatedLogEntry(1, 1, mock(Payload.class)));
mockRaftActor.getReplicatedLog().append(new MockRaftActorContext.MockReplicatedLogEntry(1, 2, mock(Payload.class)));
RaftActorContext raftActorContext = mockRaftActor.getRaftActorContext();
mockRaftActor.setCurrentBehavior(new Follower(raftActorContext));
- mockRaftActor.onReceiveCommand(new CaptureSnapshot(-1, 1, 2, 1));
+ long replicatedToAllIndex = 1;
+ mockRaftActor.onReceiveCommand(new CaptureSnapshot(-1, 1, 2, 1, replicatedToAllIndex, 1));
verify(mockRaftActor.delegate).createSnapshot();
verify(dataPersistenceProvider).deleteMessages(100);
- assertEquals(2, mockRaftActor.getReplicatedLog().size());
+ assertEquals(3, mockRaftActor.getReplicatedLog().size());
+ assertEquals(1, mockRaftActor.getCurrentBehavior().getReplicatedToAllIndex());
+ assertNotNull(mockRaftActor.getReplicatedLog().get(2));
assertNotNull(mockRaftActor.getReplicatedLog().get(3));
assertNotNull(mockRaftActor.getReplicatedLog().get(4));
// Index 2 will not be in the log because it was removed due to snapshotting
- assertNull(mockRaftActor.getReplicatedLog().get(2));
+ assertNull(mockRaftActor.getReplicatedLog().get(1));
+ assertNull(mockRaftActor.getReplicatedLog().get(0));
}
};
MockRaftActor mockRaftActor = mockActorRef.underlyingActor();
+ mockRaftActor.waitForInitializeBehaviorComplete();
+
ReplicatedLogEntry entry = new MockRaftActorContext.MockReplicatedLogEntry(1, 5,
new MockRaftActorContext.MockPayload("F"));
MockRaftActor mockRaftActor = mockActorRef.underlyingActor();
+ mockRaftActor.waitForInitializeBehaviorComplete();
+
ReplicatedLog oldReplicatedLog = mockRaftActor.getReplicatedLog();
oldReplicatedLog.append(new MockRaftActorContext.MockReplicatedLogEntry(1, 0, mock(Payload.class)));
MockRaftActor mockRaftActor = mockActorRef.underlyingActor();
+ mockRaftActor.waitForInitializeBehaviorComplete();
+
ByteString snapshotBytes = fromObject(Arrays.asList(
new MockRaftActorContext.MockPayload("A"),
new MockRaftActorContext.MockPayload("B"),
mockRaftActor.setCurrentBehavior(new Leader(raftActorContext));
- mockRaftActor.onReceiveCommand(new CaptureSnapshot(-1, 1, -1, 1));
+ mockRaftActor.onReceiveCommand(new CaptureSnapshot(-1, 1, -1, 1, -1, 1));
mockRaftActor.onReceiveCommand(new CaptureSnapshotReply(snapshotBytes.toByteArray()));
public void testRaftRoleChangeNotifier() throws Exception {
new JavaTestKit(getSystem()) {{
ActorRef notifierActor = factory.createActor(Props.create(MessageCollectorActor.class));
+ MessageCollectorActor.waitUntilReady(notifierActor);
+
DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
+ long heartBeatInterval = 100;
+ config.setHeartBeatInterval(FiniteDuration.create(heartBeatInterval, TimeUnit.MILLISECONDS));
+ config.setElectionTimeoutFactor(1);
+
String persistenceId = factory.generateActorId("notifier-");
factory.createTestActor(MockRaftActor.props(persistenceId,
Collections.<String, String>emptyMap(), Optional.<ConfigParams>of(config), notifierActor), persistenceId);
- // sleeping for a minimum of 2 seconds, if it spans more its fine.
- Uninterruptibles.sleepUninterruptibly(2, TimeUnit.SECONDS);
+ List<RoleChanged> matches = null;
+ for(int i = 0; i < 5000 / heartBeatInterval; i++) {
+ matches = MessageCollectorActor.getAllMatching(notifierActor, RoleChanged.class);
+ assertNotNull(matches);
+ if(matches.size() == 3) {
+ break;
+ }
+ Uninterruptibles.sleepUninterruptibly(heartBeatInterval, TimeUnit.MILLISECONDS);
+ }
- List<Object> matches = MessageCollectorActor.getAllMatching(notifierActor, RoleChanged.class);
- assertNotNull(matches);
assertEquals(3, matches.size());
// check if the notifier got a role change from null to Follower
- RoleChanged raftRoleChanged = (RoleChanged) matches.get(0);
+ RoleChanged raftRoleChanged = matches.get(0);
assertEquals(persistenceId, raftRoleChanged.getMemberId());
assertNull(raftRoleChanged.getOldRole());
assertEquals(RaftState.Follower.name(), raftRoleChanged.getNewRole());
// check if the notifier got a role change from Follower to Candidate
- raftRoleChanged = (RoleChanged) matches.get(1);
+ raftRoleChanged = matches.get(1);
assertEquals(persistenceId, raftRoleChanged.getMemberId());
assertEquals(RaftState.Follower.name(), raftRoleChanged.getOldRole());
assertEquals(RaftState.Candidate.name(), raftRoleChanged.getNewRole());
// check if the notifier got a role change from Candidate to Leader
- raftRoleChanged = (RoleChanged) matches.get(2);
+ raftRoleChanged = matches.get(2);
assertEquals(persistenceId, raftRoleChanged.getMemberId());
assertEquals(RaftState.Candidate.name(), raftRoleChanged.getOldRole());
assertEquals(RaftState.Leader.name(), raftRoleChanged.getNewRole());
Map<String, String> peerAddresses = new HashMap<>();
peerAddresses.put(follower1Id, followerActor1.path().toString());
- TestActorRef<MockRaftActor> mockActorRef = TestActorRef.create(getSystem(),
+ TestActorRef<MockRaftActor> mockActorRef = factory.createTestActor(
MockRaftActor.props(persistenceId, peerAddresses,
Optional.<ConfigParams>of(config), dataPersistenceProvider), persistenceId);
MockRaftActor leaderActor = mockActorRef.underlyingActor();
+
leaderActor.getRaftActorContext().setCommitIndex(4);
leaderActor.getRaftActorContext().setLastApplied(4);
leaderActor.getRaftActorContext().getTermInformation().update(1, persistenceId);
assertEquals(8, leaderActor.getReplicatedLog().size());
- leaderActor.onReceiveCommand(new CaptureSnapshot(6, 1, 4, 1));
+ leaderActor.onReceiveCommand(new CaptureSnapshot(6, 1, 4, 1, 4, 1));
+
leaderActor.getRaftActorContext().setSnapshotCaptureInitiated(true);
verify(leaderActor.delegate).createSnapshot();
Map<String, String> peerAddresses = new HashMap<>();
peerAddresses.put(leaderId, leaderActor1.path().toString());
- TestActorRef<MockRaftActor> mockActorRef = TestActorRef.create(getSystem(),
+ TestActorRef<MockRaftActor> mockActorRef = factory.createTestActor(
MockRaftActor.props(persistenceId, peerAddresses,
Optional.<ConfigParams>of(config), dataPersistenceProvider), persistenceId);
followerActor.waitForInitializeBehaviorComplete();
- // create 8 entries in the log - 0 to 4 are applied and will get picked up as part of the capture snapshot
+
Follower follower = new Follower(followerActor.getRaftActorContext());
followerActor.setCurrentBehavior(follower);
assertEquals(RaftState.Follower, followerActor.getCurrentBehavior().state());
+ // create 6 entries in the log - 0 to 4 are applied and will get picked up as part of the capture snapshot
MockRaftActorContext.MockReplicatedLogBuilder logBuilder = new MockRaftActorContext.MockReplicatedLogBuilder();
followerActor.getRaftActorContext().setReplicatedLog(logBuilder.createEntries(0, 6, 1).build());
- // log as indices 0-5
+ // log has indices 0-5
assertEquals(6, followerActor.getReplicatedLog().size());
//snapshot on 4
- followerActor.onReceiveCommand(new CaptureSnapshot(5, 1, 4, 1));
+ followerActor.onReceiveCommand(new CaptureSnapshot(5, 1, 4, 1, 4, 1));
+
followerActor.getRaftActorContext().setSnapshotCaptureInitiated(true);
verify(followerActor.delegate).createSnapshot();
followerActor.onReceiveCommand(new CaptureSnapshotReply(snapshotBytes.toByteArray()));
assertFalse(followerActor.getRaftActorContext().isSnapshotCaptureInitiated());
- // capture snapshot reply should remove the snapshotted entries only
+ // capture snapshot reply should remove the snapshotted entries only till replicatedToAllIndex
assertEquals(3, followerActor.getReplicatedLog().size()); //indexes 5,6,7 left in the log
assertEquals(7, followerActor.getReplicatedLog().lastIndex());
// create 5 entries in the log
MockRaftActorContext.MockReplicatedLogBuilder logBuilder = new MockRaftActorContext.MockReplicatedLogBuilder();
leaderActor.getRaftActorContext().setReplicatedLog(logBuilder.createEntries(5, 10, 1).build());
+
//set the snapshot index to 4 , 0 to 4 are snapshotted
leaderActor.getRaftActorContext().getReplicatedLog().setSnapshotIndex(4);
+ //setting replicatedToAllIndex = 9, for the log to clear
+ leader.setReplicatedToAllIndex(9);
assertEquals(5, leaderActor.getReplicatedLog().size());
+ assertEquals(RaftState.Leader, leaderActor.getCurrentBehavior().state());
leaderActor.onReceiveCommand(new AppendEntriesReply(follower1Id, 1, true, 9, 1));
assertEquals(5, leaderActor.getReplicatedLog().size());
+ assertEquals(RaftState.Leader, leaderActor.getCurrentBehavior().state());
// set the 2nd follower nextIndex to 1 which has been snapshotted
leaderActor.onReceiveCommand(new AppendEntriesReply(follower2Id, 1, true, 0, 1));
assertEquals(5, leaderActor.getReplicatedLog().size());
+ assertEquals(RaftState.Leader, leaderActor.getCurrentBehavior().state());
// simulate a real snapshot
leaderActor.onReceiveCommand(new SendHeartBeat());
leaderActor.onReceiveCommand(new CaptureSnapshotReply(snapshotBytes.toByteArray()));
assertFalse(leaderActor.getRaftActorContext().isSnapshotCaptureInitiated());
- assertEquals("Real snapshot didn't clear the log till lastApplied", 0, leaderActor.getReplicatedLog().size());
+ assertEquals("Real snapshot didn't clear the log till replicatedToAllIndex", 0, leaderActor.getReplicatedLog().size());
//reply from a slow follower after should not raise errors
leaderActor.onReceiveCommand(new AppendEntriesReply(follower2Id, 1, true, 5, 1));