protected void handleNonRaftCommand(Object message) {
if(message instanceof KeyValue){
if(isLeader()) {
- persistData(getSender(), new PayloadIdentifier(persistIdentifier++), (Payload) message);
+ persistData(getSender(), new PayloadIdentifier(persistIdentifier++), (Payload) message, false);
} else {
if(getLeader() != null) {
getLeader().forward(message, getContext());
} else if (message instanceof Runnable) {
((Runnable)message).run();
} else if (message instanceof NoopPayload) {
- persistData(null, null, (NoopPayload)message);
+ persistData(null, null, (NoopPayload)message, false);
} else if (!possiblyHandleBehaviorMessage(message)) {
handleNonRaftCommand(message);
}
}
/**
- * When a derived RaftActor needs to persist something it must call
- * persistData.
+ * Persists the given Payload in the journal and replicates to any followers. After successful completion,
+ * {@link #applyState(ActorRef, Identifier, Object)} is notified.
+ *
+ * @param clientActor optional ActorRef that is provided via the applyState callback
+ * @param identifier the payload identifier
+ * @param data the payload data to persist
+ * @param batchHint if true, an attempt is made to delay immediate replication and batch the payload with
+ * subsequent payloads for efficiency. Otherwise the payload is immediately replicated.
*/
- protected final void persistData(final ActorRef clientActor, final Identifier identifier, final Payload data) {
-
+ protected final void persistData(final ActorRef clientActor, final Identifier identifier, final Payload data,
+ final boolean batchHint) {
ReplicatedLogEntry replicatedLogEntry = new SimpleReplicatedLogEntry(
context.getReplicatedLog().lastIndex() + 1,
context.getTermInformation().getCurrentTerm(), data);
if (wasAppended && hasFollowers()) {
// Send log entry for replication.
- getCurrentBehavior().handleMessage(getSelf(), new Replicate(clientActor, identifier, replicatedLogEntry));
+ getCurrentBehavior().handleMessage(getSelf(), new Replicate(clientActor, identifier, replicatedLogEntry,
+ !batchHint));
}
}
operationContext.includeSelfInNewConfiguration(raftActor));
LOG.debug("{}: New server configuration : {}", raftContext.getId(), payload.getServerConfig());
- raftActor.persistData(operationContext.getClientRequestor(), operationContext.getContextId(), payload);
+ raftActor.persistData(operationContext.getClientRequestor(), operationContext.getContextId(),
+ payload, false);
currentOperationState = new Persisting(operationContext, newTimer(new ServerOperationTimeout(
operationContext.getLoggingContext())));
private final ActorRef clientActor;
private final Identifier identifier;
private final ReplicatedLogEntry replicatedLogEntry;
+ private final boolean sendImmediate;
- public Replicate(ActorRef clientActor, Identifier identifier, ReplicatedLogEntry replicatedLogEntry) {
-
+ public Replicate(ActorRef clientActor, Identifier identifier, ReplicatedLogEntry replicatedLogEntry,
+ boolean sendImmediate) {
this.clientActor = clientActor;
this.identifier = identifier;
this.replicatedLogEntry = replicatedLogEntry;
+ this.sendImmediate = sendImmediate;
}
public ActorRef getClientActor() {
public ReplicatedLogEntry getReplicatedLogEntry() {
return replicatedLogEntry;
}
+
+ public boolean isSendImmediate() {
+ return sendImmediate;
+ }
}
private void replicate(Replicate replicate) {
long logIndex = replicate.getReplicatedLogEntry().getIndex();
- log.debug("{}: Replicate message: identifier: {}, logIndex: {}, payload: {}", logName(),
- replicate.getIdentifier(), logIndex, replicate.getReplicatedLogEntry().getData().getClass());
+ log.debug("{}: Replicate message: identifier: {}, logIndex: {}, payload: {}, isSendImmediate: {}", logName(),
+ replicate.getIdentifier(), logIndex, replicate.getReplicatedLogEntry().getData().getClass(),
+ replicate.isSendImmediate());
// Create a tracker entry we will use this later to notify the
// client actor
applyLogToStateMachine(logIndex);
}
- if (!followerToLog.isEmpty()) {
+ if (replicate.isSendImmediate() && !followerToLog.isEmpty()) {
sendAppendEntries(0, false);
}
}
public void handleCommand(Object message) {
if (message instanceof MockPayload) {
MockPayload payload = (MockPayload) message;
- super.persistData(collectorActor, new MockIdentifier(payload.toString()), payload);
+ super.persistData(collectorActor, new MockIdentifier(payload.toString()), payload, false);
return;
}
if (message instanceof ServerConfigurationPayload) {
- super.persistData(collectorActor, new MockIdentifier("serverConfig"), (Payload) message);
+ super.persistData(collectorActor, new MockIdentifier("serverConfig"), (Payload) message, false);
return;
}
// Persist another entry (this will cause a CaptureSnapshot to be triggered
leaderActor.persistData(mockActorRef, new MockIdentifier("x"),
- new MockRaftActorContext.MockPayload("duh"));
+ new MockRaftActorContext.MockPayload("duh"), false);
// Now send a CaptureSnapshotReply
mockActorRef.tell(new CaptureSnapshotReply(fromObject("foo").toByteArray()), mockActorRef);
Leader leader = new Leader(leaderActor.getRaftActorContext());
leaderActor.setCurrentBehavior(leader);
- leaderActor.persistData(leaderActorRef, new MockIdentifier("1"), new MockRaftActorContext.MockPayload("1"));
+ leaderActor.persistData(leaderActorRef, new MockIdentifier("1"), new MockRaftActorContext.MockPayload("1"),
+ false);
ReplicatedLogEntry logEntry = leaderActor.getReplicatedLog().get(0);
assertNotNull("ReplicatedLogEntry not found", logEntry);
assertEquals("getCommitIndex", 0, leaderActor.getRaftActorContext().getCommitIndex());
assertEquals("getLastApplied", 0, leaderActor.getRaftActorContext().getLastApplied());
}
+
+ @Test
+ public void testReplicateWithBatchHint() throws Exception {
+ final String leaderId = factory.generateActorId("leader-");
+ final String followerId = factory.generateActorId("follower-");
+
+ final ActorRef followerActor = factory.createActor(Props.create(MessageCollectorActor.class));
+
+ DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
+ config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
+ config.setIsolatedLeaderCheckInterval(new FiniteDuration(1, TimeUnit.DAYS));
+
+ TestActorRef<MockRaftActor> leaderActorRef = factory.createTestActor(
+ MockRaftActor.props(leaderId, ImmutableMap.of(followerId, followerActor.path().toString()), config),
+ leaderId);
+ MockRaftActor leaderActor = leaderActorRef.underlyingActor();
+ leaderActor.waitForInitializeBehaviorComplete();
+
+ leaderActor.getRaftActorContext().getTermInformation().update(1, leaderId);
+
+ Leader leader = new Leader(leaderActor.getRaftActorContext());
+ leaderActor.setCurrentBehavior(leader);
+
+ MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
+ MessageCollectorActor.clearMessages(followerActor);
+
+ leaderActor.onReceiveCommand(new AppendEntriesReply(followerId, 1, true, -1, -1, (short)0));
+
+ leaderActor.persistData(leaderActorRef, new MockIdentifier("1"), new MockPayload("1"), true);
+ MessageCollectorActor.assertNoneMatching(followerActor, AppendEntries.class, 500);
+
+ leaderActor.persistData(leaderActorRef, new MockIdentifier("2"), new MockPayload("2"), true);
+ MessageCollectorActor.assertNoneMatching(followerActor, AppendEntries.class, 500);
+
+ leaderActor.persistData(leaderActorRef, new MockIdentifier("3"), new MockPayload("3"), false);
+ AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
+ assertEquals("AppendEntries size", 3, appendEntries.getEntries().size());
+ }
}
MockRaftActorContext.MockPayload payload = new MockRaftActorContext.MockPayload("foo");
SimpleReplicatedLogEntry newEntry = new SimpleReplicatedLogEntry(index, term, payload);
actorContext.getReplicatedLog().append(newEntry);
- return leader.handleMessage(leaderActor, new Replicate(null, null, newEntry));
+ return leader.handleMessage(leaderActor, new Replicate(null, null, newEntry, true));
}
@Test
actorContext.getReplicatedLog().append(newEntry);
final Identifier id = new MockIdentifier("state-id");
- RaftActorBehavior raftBehavior = leader.handleMessage(leaderActor, new Replicate(leaderActor, id, newEntry));
+ RaftActorBehavior raftBehavior = leader.handleMessage(leaderActor,
+ new Replicate(leaderActor, id, newEntry, true));
// State should not change
assertTrue(raftBehavior instanceof Leader);
// this should invoke a sendinstallsnapshot as followersLastIndex < snapshotIndex
RaftActorBehavior raftBehavior = leader.handleMessage(
- leaderActor, new Replicate(null, new MockIdentifier("state-id"), entry));
+ leaderActor, new Replicate(null, new MockIdentifier("state-id"), entry, true));
assertTrue(raftBehavior instanceof Leader);
//update follower timestamp
leader.markFollowerActive(FOLLOWER_ID);
- leader.handleMessage(leaderActor, new Replicate(null, new MockIdentifier("state-id"), entry));
+ leader.handleMessage(leaderActor, new Replicate(null, new MockIdentifier("state-id"), entry, true));
assertEquals("isCapturing", true, actorContext.getSnapshotManager().isCapturing());
assertEquals(2, cs.getLastTerm());
// if an initiate is started again when first is in progress, it shouldnt initiate Capture
- leader.handleMessage(leaderActor, new Replicate(null, new MockIdentifier("state-id"), entry));
+ leader.handleMessage(leaderActor, new Replicate(null, new MockIdentifier("state-id"), entry, true));
assertSame("CaptureSnapshot instance", cs, actorContext.getSnapshotManager().getCaptureSnapshot());
}
assertEquals(2, cs.getLastTerm());
// if an initiate is started again when first is in progress, it should not initiate Capture
- leader.handleMessage(leaderActor, new Replicate(null, new MockIdentifier("state-id"), entry));
+ leader.handleMessage(leaderActor, new Replicate(null, new MockIdentifier("state-id"), entry, true));
assertSame("CaptureSnapshot instance", cs, actorContext.getSnapshotManager().getCaptureSnapshot());
}
// applyState() will be invoked once consensus is reached on the payload
void persistPayload(final TransactionIdentifier transactionId, final Payload payload) {
// We are faking the sender
- persistData(self(), transactionId, payload);
+ persistData(self(), transactionId, payload, false);
}
private void handleCommitTransaction(final CommitTransaction commit) {