// Persist another entry (this will cause a CaptureSnapshot to be triggered
leaderActor.persistData(mockActorRef, new MockIdentifier("x"),
- new MockRaftActorContext.MockPayload("duh"));
+ new MockRaftActorContext.MockPayload("duh"), false);
// Now send a CaptureSnapshotReply
mockActorRef.tell(new CaptureSnapshotReply(fromObject("foo").toByteArray()), mockActorRef);
Leader leader = new Leader(leaderActor.getRaftActorContext());
leaderActor.setCurrentBehavior(leader);
- leaderActor.persistData(leaderActorRef, new MockIdentifier("1"), new MockRaftActorContext.MockPayload("1"));
+ leaderActor.persistData(leaderActorRef, new MockIdentifier("1"), new MockRaftActorContext.MockPayload("1"),
+ false);
ReplicatedLogEntry logEntry = leaderActor.getReplicatedLog().get(0);
assertNotNull("ReplicatedLogEntry not found", logEntry);
assertEquals("getCommitIndex", 0, leaderActor.getRaftActorContext().getCommitIndex());
assertEquals("getLastApplied", 0, leaderActor.getRaftActorContext().getLastApplied());
}
+
+ @Test
+ public void testReplicateWithBatchHint() throws Exception {
+ final String leaderId = factory.generateActorId("leader-");
+ final String followerId = factory.generateActorId("follower-");
+
+ final ActorRef followerActor = factory.createActor(Props.create(MessageCollectorActor.class));
+
+ DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
+ config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
+ config.setIsolatedLeaderCheckInterval(new FiniteDuration(1, TimeUnit.DAYS));
+
+ TestActorRef<MockRaftActor> leaderActorRef = factory.createTestActor(
+ MockRaftActor.props(leaderId, ImmutableMap.of(followerId, followerActor.path().toString()), config),
+ leaderId);
+ MockRaftActor leaderActor = leaderActorRef.underlyingActor();
+ leaderActor.waitForInitializeBehaviorComplete();
+
+ leaderActor.getRaftActorContext().getTermInformation().update(1, leaderId);
+
+ Leader leader = new Leader(leaderActor.getRaftActorContext());
+ leaderActor.setCurrentBehavior(leader);
+
+ MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
+ MessageCollectorActor.clearMessages(followerActor);
+
+ leaderActor.onReceiveCommand(new AppendEntriesReply(followerId, 1, true, -1, -1, (short)0));
+
+ leaderActor.persistData(leaderActorRef, new MockIdentifier("1"), new MockPayload("1"), true);
+ MessageCollectorActor.assertNoneMatching(followerActor, AppendEntries.class, 500);
+
+ leaderActor.persistData(leaderActorRef, new MockIdentifier("2"), new MockPayload("2"), true);
+ MessageCollectorActor.assertNoneMatching(followerActor, AppendEntries.class, 500);
+
+ leaderActor.persistData(leaderActorRef, new MockIdentifier("3"), new MockPayload("3"), false);
+ AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
+ assertEquals("AppendEntries size", 3, appendEntries.getEntries().size());
+ }
}