import static org.junit.Assert.assertEquals;
import static org.junit.Assert.fail;
+import static org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor.assertNoneMatching;
import static org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor.clearMessages;
-import static org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor.expectMatching;
import static org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor.expectFirstMatching;
+import static org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor.expectMatching;
import static org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor.getAllMatching;
-import static org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor.assertNoneMatching;
import akka.actor.Actor;
import akka.actor.ActorRef;
// Send an initial payloads and verify replication.
- MockPayload payload0 = sendPayloadData(leaderActor, "zero");
- MockPayload payload1 = sendPayloadData(leaderActor, "one");
+ final MockPayload payload0 = sendPayloadData(leaderActor, "zero");
+ final MockPayload payload1 = sendPayloadData(leaderActor, "one");
verifyApplyJournalEntries(leaderCollectorActor, 1);
verifyApplyJournalEntries(follower1CollectorActor, 1);
verifyApplyJournalEntries(follower2CollectorActor, 1);
testLog.info("Sending payload to isolated leader");
- MockPayload isolatedLeaderPayload2 = sendPayloadData(leaderActor, "two");
+ final MockPayload isolatedLeaderPayload2 = sendPayloadData(leaderActor, "two");
// Wait for the isolated leader to send AppendEntries to follower1 with the entry at index 2. Note the message
// is collected but not forwarded to the follower RaftActor.
// The leader should transition to IsolatedLeader.
expectFirstMatching(leaderNotifierActor, RoleChanged.class,
- rc -> rc.getNewRole().equals(RaftState.IsolatedLeader.name()));
+ rc -> rc.getNewRole().equals(RaftState.IsolatedLeader.name()));
forceElectionOnFollower1();
- // Send a payload to the new leader follower1 with index 2 and verify it's replicated to follower2 and committed.
+ // Send a payload to the new leader follower1 with index 2 and verify it's replicated to follower2
+ // and committed.
testLog.info("Sending payload to new leader");
- MockPayload newLeaderPayload2 = sendPayloadData(follower1Actor, "two-new");
+ final MockPayload newLeaderPayload2 = sendPayloadData(follower1Actor, "two-new");
verifyApplyJournalEntries(follower1CollectorActor, 2);
verifyApplyJournalEntries(follower2CollectorActor, 2);
// Previous leader should switch to follower b/c it will receive either an AppendEntries or AppendEntriesReply
// with a higher term.
- expectFirstMatching(leaderNotifierActor, RoleChanged.class, rc -> rc.getNewRole().equals(RaftState.Follower.name()));
+ expectFirstMatching(leaderNotifierActor, RoleChanged.class,
+ rc -> rc.getNewRole().equals(RaftState.Follower.name()));
// The previous leader has a conflicting log entry at index 2 with a different term which should get
// replaced by the new leader's index 1 entry.
// Submit an initial payload that is committed/applied on all nodes.
- MockPayload payload0 = sendPayloadData(leaderActor, "zero");
+ final MockPayload payload0 = sendPayloadData(leaderActor, "zero");
verifyApplyJournalEntries(leaderCollectorActor, 0);
verifyApplyJournalEntries(follower1CollectorActor, 0);
verifyApplyJournalEntries(follower2CollectorActor, 0);
// message is forwarded to the followers.
expectFirstMatching(follower1CollectorActor, AppendEntries.class, ae -> {
- return ae.getEntries().size() == 1 && ae.getEntries().get(0).getIndex() == 1 &&
- ae.getEntries().get(0).getData().equals(payload1);
+ return ae.getEntries().size() == 1 && ae.getEntries().get(0).getIndex() == 1
+ && ae.getEntries().get(0).getData().equals(payload1);
});
expectFirstMatching(follower2CollectorActor, AppendEntries.class, ae -> {
- return ae.getEntries().size() == 1 && ae.getEntries().get(0).getIndex() == 1 &&
- ae.getEntries().get(0).getData().equals(payload1);
+ return ae.getEntries().size() == 1 && ae.getEntries().get(0).getIndex() == 1
+ && ae.getEntries().get(0).getData().equals(payload1);
});
verifyApplyJournalEntries(leaderCollectorActor, 1);
testLog.info("Sending payload to isolated leader");
- MockPayload isolatedLeaderPayload2 = sendPayloadData(leaderActor, "two");
+ final MockPayload isolatedLeaderPayload2 = sendPayloadData(leaderActor, "two");
// Wait for the isolated leader to send AppendEntries to follower1 with the entry at index 2. Note the message
// is collected but not forwarded to the follower RaftActor.
// The leader should transition to IsolatedLeader.
expectFirstMatching(leaderNotifierActor, RoleChanged.class,
- rc -> rc.getNewRole().equals(RaftState.IsolatedLeader.name()));
+ rc -> rc.getNewRole().equals(RaftState.IsolatedLeader.name()));
forceElectionOnFollower1();
testLog.info("Sending payload to new leader");
- MockPayload newLeaderPayload2 = sendPayloadData(follower1Actor, "two-new");
+ final MockPayload newLeaderPayload2 = sendPayloadData(follower1Actor, "two-new");
verifyApplyJournalEntries(follower1CollectorActor, 3);
verifyApplyJournalEntries(follower2CollectorActor, 3);
// Previous leader should switch to follower b/c it will receive either an AppendEntries or AppendEntriesReply
// with a higher term.
- expectFirstMatching(leaderNotifierActor, RoleChanged.class, rc -> rc.getNewRole().equals(RaftState.Follower.name()));
+ expectFirstMatching(leaderNotifierActor, RoleChanged.class,
+ rc -> rc.getNewRole().equals(RaftState.Follower.name()));
// The previous leader has a conflicting log entry at index 2 with a different term which should get
// replaced by the new leader's entry.
// Ensure the prior leader didn't apply its conflicting entry with index 2, term 1.
List<ApplyState> applyState = getAllMatching(leaderCollectorActor, ApplyState.class);
- for(ApplyState as: applyState) {
- if(as.getReplicatedLogEntry().getIndex() == 2 && as.getReplicatedLogEntry().getTerm() == 1) {
+ for (ApplyState as: applyState) {
+ if (as.getReplicatedLogEntry().getIndex() == 2 && as.getReplicatedLogEntry().getTerm() == 1) {
fail("Got unexpected ApplyState: " + as);
}
}
// Submit an initial payload that is committed/applied on all nodes.
- MockPayload payload0 = sendPayloadData(leaderActor, "zero");
+ final MockPayload payload0 = sendPayloadData(leaderActor, "zero");
verifyApplyJournalEntries(leaderCollectorActor, 0);
verifyApplyJournalEntries(follower1CollectorActor, 0);
verifyApplyJournalEntries(follower2CollectorActor, 0);
// message is forwarded to the followers.
expectFirstMatching(follower1CollectorActor, AppendEntries.class, ae -> {
- return ae.getEntries().size() == 1 && ae.getEntries().get(0).getIndex() == 1 &&
- ae.getEntries().get(0).getData().equals(payload1);
+ return ae.getEntries().size() == 1 && ae.getEntries().get(0).getIndex() == 1
+ && ae.getEntries().get(0).getData().equals(payload1);
});
expectFirstMatching(follower2CollectorActor, AppendEntries.class, ae -> {
- return ae.getEntries().size() == 1 && ae.getEntries().get(0).getIndex() == 1 &&
- ae.getEntries().get(0).getData().equals(payload1);
+ return ae.getEntries().size() == 1 && ae.getEntries().get(0).getIndex() == 1
+ && ae.getEntries().get(0).getData().equals(payload1);
});
verifyApplyJournalEntries(leaderCollectorActor, 1);
// are collected but not forwarded to the follower RaftActor.
expectFirstMatching(follower1CollectorActor, AppendEntries.class, ae -> {
- for(ReplicatedLogEntry e: ae.getEntries()) {
- if(e.getIndex() == 4) {
+ for (ReplicatedLogEntry e: ae.getEntries()) {
+ if (e.getIndex() == 4) {
return true;
}
}
// The leader should transition to IsolatedLeader.
expectFirstMatching(leaderNotifierActor, RoleChanged.class,
- rc -> rc.getNewRole().equals(RaftState.IsolatedLeader.name()));
+ rc -> rc.getNewRole().equals(RaftState.IsolatedLeader.name()));
forceElectionOnFollower1();
testLog.info("Sending 3 payloads to new leader");
- MockPayload newLeaderPayload2 = sendPayloadData(follower1Actor, "two-new");
- MockPayload newLeaderPayload3 = sendPayloadData(follower1Actor, "three-new");
- MockPayload newLeaderPayload4 = sendPayloadData(follower1Actor, "four-new");
+ final MockPayload newLeaderPayload2 = sendPayloadData(follower1Actor, "two-new");
+ final MockPayload newLeaderPayload3 = sendPayloadData(follower1Actor, "three-new");
+ final MockPayload newLeaderPayload4 = sendPayloadData(follower1Actor, "four-new");
verifyApplyJournalEntries(follower1CollectorActor, 5);
verifyApplyJournalEntries(follower2CollectorActor, 5);
// Previous leader should switch to follower b/c it will receive either an AppendEntries or AppendEntriesReply
// with a higher term.
- expectFirstMatching(leaderNotifierActor, RoleChanged.class, rc -> rc.getNewRole().equals(RaftState.Follower.name()));
+ expectFirstMatching(leaderNotifierActor, RoleChanged.class,
+ rc -> rc.getNewRole().equals(RaftState.Follower.name()));
// The previous leader has conflicting log entries starting at index 2 with different terms which should get
// replaced by the new leader's entries.
// Ensure the prior leader didn't apply any of its conflicting entries with term 1.
List<ApplyState> applyState = getAllMatching(leaderCollectorActor, ApplyState.class);
- for(ApplyState as: applyState) {
- if(as.getReplicatedLogEntry().getTerm() == 1) {
+ for (ApplyState as: applyState) {
+ if (as.getReplicatedLogEntry().getTerm() == 1) {
fail("Got unexpected ApplyState: " + as);
}
}
follower1Actor.tell(TimeoutNow.INSTANCE, ActorRef.noSender());
expectFirstMatching(follower1NotifierActor, RoleChanged.class,
- rc -> rc.getNewRole().equals(RaftState.Leader.name()));
+ rc -> rc.getNewRole().equals(RaftState.Leader.name()));
currentTerm = follower1Context.getTermInformation().getCurrentTerm();
}
leaderActor.underlyingActor().startDropMessages(AppendEntries.class);
leaderActor.underlyingActor().startDropMessages(RequestVote.class);
- follower1Actor.underlyingActor().startDropMessages(AppendEntries.class, ae -> ae.getLeaderId().equals(leaderId));
- follower2Actor.underlyingActor().startDropMessages(AppendEntries.class, ae -> ae.getLeaderId().equals(leaderId));
+ follower1Actor.underlyingActor().startDropMessages(AppendEntries.class,
+ ae -> ae.getLeaderId().equals(leaderId));
+ follower2Actor.underlyingActor().startDropMessages(AppendEntries.class,
+ ae -> ae.getLeaderId().equals(leaderId));
clearMessages(follower1CollectorActor);
clearMessages(follower1NotifierActor);
followerConfigParams.setHeartBeatInterval(new FiniteDuration(100, TimeUnit.MILLISECONDS));
followerConfigParams.setElectionTimeoutFactor(1000);
follower1Actor = newTestRaftActor(follower1Id, TestRaftActor.newBuilder().peerAddresses(
- ImmutableMap.of(leaderId, testActorPath(leaderId), follower2Id, testActorPath(follower2Id))).
- config(followerConfigParams).roleChangeNotifier(follower1NotifierActor));
+ ImmutableMap.of(leaderId, testActorPath(leaderId), follower2Id, testActorPath(follower2Id)))
+ .config(followerConfigParams).roleChangeNotifier(follower1NotifierActor));
follower2Actor = newTestRaftActor(follower2Id, ImmutableMap.of(leaderId, testActorPath(leaderId),
follower1Id, testActorPath(follower1Id)), followerConfigParams);
- peerAddresses = ImmutableMap.<String, String>builder().
- put(follower1Id, follower1Actor.path().toString()).
- put(follower2Id, follower2Actor.path().toString()).build();
+ peerAddresses = ImmutableMap.<String, String>builder()
+ .put(follower1Id, follower1Actor.path().toString())
+ .put(follower2Id, follower2Actor.path().toString()).build();
leaderConfigParams = newLeaderConfigParams();
leaderConfigParams.setIsolatedLeaderCheckInterval(new FiniteDuration(500, TimeUnit.MILLISECONDS));
leaderNotifierActor = factory.createTestActor(Props.create(MessageCollectorActor.class),
factory.generateActorId(leaderId + "-notifier"));
- leaderActor = newTestRaftActor(leaderId, TestRaftActor.newBuilder().peerAddresses(peerAddresses).
- config(leaderConfigParams).roleChangeNotifier(leaderNotifierActor));
+ leaderActor = newTestRaftActor(leaderId, TestRaftActor.newBuilder().peerAddresses(peerAddresses)
+ .config(leaderConfigParams).roleChangeNotifier(leaderNotifierActor));
follower1CollectorActor = follower1Actor.underlyingActor().collectorActor();
follower2CollectorActor = follower2Actor.underlyingActor().collectorActor();