- new Within(duration("1 seconds")) {
- @Override
- protected void run() {
-
- ActorRef followerActor = getTestActor();
-
- MockRaftActorContext actorContext =
- (MockRaftActorContext) createActorContext();
-
- Map<String, String> peerAddresses = new HashMap<>();
-
- peerAddresses.put(followerActor.path().toString(),
- followerActor.path().toString());
-
- actorContext.setPeerAddresses(peerAddresses);
-
- Leader leader = new Leader(actorContext);
- leader.markFollowerActive(followerActor.path().toString());
- Uninterruptibles.sleepUninterruptibly(actorContext.getConfigParams().getHeartBeatInterval().toMillis(),
- TimeUnit.MILLISECONDS);
- RaftActorBehavior raftBehavior = leader
- .handleMessage(senderActor, new Replicate(null, null,
- new MockRaftActorContext.MockReplicatedLogEntry(1,
- 100,
- new MockRaftActorContext.MockPayload("foo"))
- ));
-
- // State should not change
- assertTrue(raftBehavior instanceof Leader);
-
- final String out =
- new ExpectMsg<String>(duration("1 seconds"), "match hint") {
- // do not put code outside this method, will run afterwards
- @Override
- protected String match(Object in) {
- Object msg = fromSerializableMessage(in);
- if (msg instanceof AppendEntries) {
- if (((AppendEntries)msg).getTerm() == 0) {
- return "match";
- }
- return null;
- } else {
- throw noMatch();
- }
- }
- }.get(); // this extracts the received message
-
- assertEquals("match", out);
- }
- };
- }};
+ MockRaftActorContext actorContext = createActorContextWithFollower();
+
+ long term = 1;
+ actorContext.getTermInformation().update(term, "");
+
+ leader = new Leader(actorContext);
+
+ // Leader will send an immediate heartbeat - ignore it.
+ MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
+
+ // The follower would normally reply - simulate that explicitly here.
+ long lastIndex = actorContext.getReplicatedLog().lastIndex();
+ leader.handleMessage(followerActor, new AppendEntriesReply(
+ FOLLOWER_ID, term, true, lastIndex, term));
+ assertEquals("isFollowerActive", true, leader.getFollower(FOLLOWER_ID).isFollowerActive());
+
+ followerActor.underlyingActor().clear();
+
+ MockRaftActorContext.MockPayload payload = new MockRaftActorContext.MockPayload("foo");
+ MockRaftActorContext.MockReplicatedLogEntry newEntry = new MockRaftActorContext.MockReplicatedLogEntry(
+ 1, lastIndex + 1, payload);
+ actorContext.getReplicatedLog().append(newEntry);
+ RaftActorBehavior raftBehavior = leader.handleMessage(leaderActor,
+ new Replicate(null, null, newEntry));
+
+ // State should not change
+ assertTrue(raftBehavior instanceof Leader);
+
+ AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
+ assertEquals("getPrevLogIndex", lastIndex, appendEntries.getPrevLogIndex());
+ assertEquals("getPrevLogTerm", term, appendEntries.getPrevLogTerm());
+ assertEquals("Entries size", 1, appendEntries.getEntries().size());
+ assertEquals("Entry getIndex", lastIndex + 1, appendEntries.getEntries().get(0).getIndex());
+ assertEquals("Entry getTerm", term, appendEntries.getEntries().get(0).getTerm());
+ assertEquals("Entry payload", payload, appendEntries.getEntries().get(0).getData());