import org.opendaylight.controller.cluster.notifications.LeaderStateChanged;
import org.opendaylight.controller.cluster.notifications.RoleChanged;
import org.opendaylight.controller.cluster.raft.base.messages.ApplyState;
+import org.opendaylight.controller.cluster.raft.base.messages.CheckConsensusReached;
import org.opendaylight.controller.cluster.raft.base.messages.InitiateCaptureSnapshot;
import org.opendaylight.controller.cluster.raft.base.messages.LeaderTransitioning;
import org.opendaylight.controller.cluster.raft.base.messages.Replicate;
ApplyJournalEntries applyEntries = (ApplyJournalEntries) message;
LOG.debug("{}: Persisting ApplyJournalEntries with index={}", persistenceId(), applyEntries.getToIndex());
- persistence().persist(applyEntries, NoopProcedure.instance());
+ persistence().persistAsync(applyEntries, NoopProcedure.instance());
} else if (message instanceof FindLeader) {
getSender().tell(
ReplicatedLogEntry replicatedLogEntry = new ReplicatedLogImplEntry(
context.getReplicatedLog().lastIndex() + 1,
context.getTermInformation().getCurrentTerm(), data);
+ replicatedLogEntry.setPersistencePending(true);
LOG.debug("{}: Persist data {}", persistenceId(), replicatedLogEntry);
final RaftActorContext raftContext = getRaftActorContext();
- replicatedLog().appendAndPersist(replicatedLogEntry, replicatedLogEntry1 -> {
+ boolean wasAppended = replicatedLog().appendAndPersist(replicatedLogEntry, persistedLogEntry -> {
+ // Clear the persistence pending flag in the log entry.
+ persistedLogEntry.setPersistencePending(false);
+
if (!hasFollowers()) {
// Increment the Commit Index and the Last Applied values
- raftContext.setCommitIndex(replicatedLogEntry1.getIndex());
- raftContext.setLastApplied(replicatedLogEntry1.getIndex());
+ raftContext.setCommitIndex(persistedLogEntry.getIndex());
+ raftContext.setLastApplied(persistedLogEntry.getIndex());
// Apply the state immediately.
- self().tell(new ApplyState(clientActor, identifier, replicatedLogEntry1), self());
+ self().tell(new ApplyState(clientActor, identifier, persistedLogEntry), self());
// Send a ApplyJournalEntries message so that we write the fact that we applied
// the state to durable storage
- self().tell(new ApplyJournalEntries(replicatedLogEntry1.getIndex()), self());
+ self().tell(new ApplyJournalEntries(persistedLogEntry.getIndex()), self());
} else {
- context.getReplicatedLog().captureSnapshotIfReady(replicatedLogEntry1);
+ context.getReplicatedLog().captureSnapshotIfReady(replicatedLogEntry);
- // Send message for replication
- getCurrentBehavior().handleMessage(getSelf(),
- new Replicate(clientActor, identifier, replicatedLogEntry1));
+ // Local persistence is complete so send the CheckConsensusReached message to the behavior (which
+ // normally should still be the leader) to check if consensus has now been reached in conjunction with
+ // follower replication.
+ getCurrentBehavior().handleMessage(getSelf(), CheckConsensusReached.INSTANCE);
}
- });
+ }, true);
+
+ if (wasAppended && hasFollowers()) {
+ // Send log entry for replication.
+ getCurrentBehavior().handleMessage(getSelf(), new Replicate(clientActor, identifier, replicatedLogEntry));
+ }
}
private ReplicatedLog replicatedLog() {