} else if (message instanceof SwitchBehavior) {
switchBehavior((SwitchBehavior) message);
} else if (message instanceof LeaderTransitioning) {
- onLeaderTransitioning();
+ onLeaderTransitioning((LeaderTransitioning)message);
} else if (message instanceof Shutdown) {
onShutDown();
} else if (message instanceof Runnable) {
((Runnable)message).run();
} else if (message instanceof NoopPayload) {
- persistData(null, null, (NoopPayload)message);
+ persistData(null, null, (NoopPayload)message, false);
} else if (!possiblyHandleBehaviorMessage(message)) {
handleNonRaftCommand(message);
}
}
}
- private void onLeaderTransitioning() {
- LOG.debug("{}: onLeaderTransitioning", persistenceId());
+ private void onLeaderTransitioning(final LeaderTransitioning leaderTransitioning) {
+ LOG.debug("{}: onLeaderTransitioning: {}", persistenceId(), leaderTransitioning);
Optional<ActorRef> roleChangeNotifier = getRoleChangeNotifier();
- if (getRaftState() == RaftState.Follower && roleChangeNotifier.isPresent()) {
+ if (getRaftState() == RaftState.Follower && roleChangeNotifier.isPresent()
+ && leaderTransitioning.getLeaderId().equals(getCurrentBehavior().getLeaderId())) {
roleChangeNotifier.get().tell(newLeaderStateChanged(getId(), null,
getCurrentBehavior().getLeaderPayloadVersion()), getSelf());
}
}
/**
- * When a derived RaftActor needs to persist something it must call
- * persistData.
+ * Persists the given Payload in the journal and replicates to any followers. After successful completion,
+ * {@link #applyState(ActorRef, Identifier, Object)} is notified.
+ *
+ * @param clientActor optional ActorRef that is provided via the applyState callback
+ * @param identifier the payload identifier
+ * @param data the payload data to persist
+ * @param batchHint if true, an attempt is made to delay immediate replication and batch the payload with
+ * subsequent payloads for efficiency. Otherwise the payload is immediately replicated.
*/
- protected final void persistData(final ActorRef clientActor, final Identifier identifier, final Payload data) {
-
+ protected final void persistData(final ActorRef clientActor, final Identifier identifier, final Payload data,
+ final boolean batchHint) {
ReplicatedLogEntry replicatedLogEntry = new SimpleReplicatedLogEntry(
context.getReplicatedLog().lastIndex() + 1,
context.getTermInformation().getCurrentTerm(), data);
if (wasAppended && hasFollowers()) {
// Send log entry for replication.
- getCurrentBehavior().handleMessage(getSelf(), new Replicate(clientActor, identifier, replicatedLogEntry));
+ getCurrentBehavior().handleMessage(getSelf(), new Replicate(clientActor, identifier, replicatedLogEntry,
+ !batchHint));
}
}