import org.opendaylight.controller.cluster.notifications.LeaderStateChanged;
import org.opendaylight.controller.cluster.notifications.RoleChanged;
import org.opendaylight.controller.cluster.raft.base.messages.ApplyState;
+import org.opendaylight.controller.cluster.raft.base.messages.CheckConsensusReached;
import org.opendaylight.controller.cluster.raft.base.messages.InitiateCaptureSnapshot;
import org.opendaylight.controller.cluster.raft.base.messages.LeaderTransitioning;
import org.opendaylight.controller.cluster.raft.base.messages.Replicate;
import org.opendaylight.controller.cluster.raft.client.messages.Shutdown;
import org.opendaylight.controller.cluster.raft.persisted.ApplyJournalEntries;
import org.opendaylight.controller.cluster.raft.persisted.NoopPayload;
+import org.opendaylight.controller.cluster.raft.persisted.SimpleReplicatedLogEntry;
import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload;
import org.opendaylight.yangtools.concepts.Identifier;
import org.opendaylight.yangtools.concepts.Immutable;
* in a cluster. It implements the RAFT algorithm as described in the paper
* <a href='https://ramcloud.stanford.edu/wiki/download/attachments/11370504/raft.pdf'>
* In Search of an Understandable Consensus Algorithm</a>
- * <p/>
+ *
+ * <p>
* RaftActor has 3 states and each state has a certain behavior associated
* with it. A Raft actor can behave as,
* <ul>
* <li> A Follower (or) </li>
* <li> A Candidate </li>
* </ul>
- * <p/>
- * <p/>
+ *
+ * <p>
* A RaftActor MUST be a Leader in order to accept requests from clients to
* change the state of it's encapsulated state machine. Once a RaftActor becomes
* a Leader it is also responsible for ensuring that all followers ultimately
* have the same log and therefore the same state machine as itself.
- * <p/>
- * <p/>
+ *
+ * <p>
* The current behavior of a RaftActor determines how election for leadership
* is initiated and how peer RaftActors react to request for votes.
- * <p/>
- * <p/>
+ *
+ * <p>
* Each RaftActor also needs to know the current election term. It uses this
* information for a couple of things. One is to simply figure out who it
* voted for in the last election. Another is to figure out if the message
* it received to update it's state is stale.
- * <p/>
- * <p/>
+ *
+ * <p>
* The RaftActor uses akka-persistence to store it's replicated log.
* Furthermore through it's behaviors a Raft Actor determines
- * <p/>
* <ul>
* <li> when a log entry should be persisted </li>
* <li> when a log entry should be applied to the state machine (and) </li>
ApplyJournalEntries applyEntries = (ApplyJournalEntries) message;
LOG.debug("{}: Persisting ApplyJournalEntries with index={}", persistenceId(), applyEntries.getToIndex());
- persistence().persist(applyEntries, NoopProcedure.instance());
+ persistence().persistAsync(applyEntries, NoopProcedure.instance());
} else if (message instanceof FindLeader) {
getSender().tell(
} else if (message instanceof SwitchBehavior) {
switchBehavior((SwitchBehavior) message);
} else if (message instanceof LeaderTransitioning) {
- onLeaderTransitioning();
+ onLeaderTransitioning((LeaderTransitioning)message);
} else if (message instanceof Shutdown) {
onShutDown();
} else if (message instanceof Runnable) {
((Runnable)message).run();
} else if (message instanceof NoopPayload) {
- persistData(null, null, (NoopPayload)message);
+ persistData(null, null, (NoopPayload)message, false);
} else if (!possiblyHandleBehaviorMessage(message)) {
handleNonRaftCommand(message);
}
}
}
- private void onLeaderTransitioning() {
- LOG.debug("{}: onLeaderTransitioning", persistenceId());
+ private void onLeaderTransitioning(final LeaderTransitioning leaderTransitioning) {
+ LOG.debug("{}: onLeaderTransitioning: {}", persistenceId(), leaderTransitioning);
Optional<ActorRef> roleChangeNotifier = getRoleChangeNotifier();
- if (getRaftState() == RaftState.Follower && roleChangeNotifier.isPresent()) {
+ if (getRaftState() == RaftState.Follower && roleChangeNotifier.isPresent()
+ && leaderTransitioning.getLeaderId().equals(getCurrentBehavior().getLeaderId())) {
roleChangeNotifier.get().tell(newLeaderStateChanged(getId(), null,
getCurrentBehavior().getLeaderPayloadVersion()), getSelf());
}
}
/**
- * When a derived RaftActor needs to persist something it must call
- * persistData.
+ * Persists the given Payload in the journal and replicates to any followers. After successful completion,
+ * {@link #applyState(ActorRef, Identifier, Object)} is notified.
+ *
+ * @param clientActor optional ActorRef that is provided via the applyState callback
+ * @param identifier the payload identifier
+ * @param data the payload data to persist
+ * @param batchHint if true, an attempt is made to delay immediate replication and batch the payload with
+ * subsequent payloads for efficiency. Otherwise the payload is immediately replicated.
*/
- protected final void persistData(final ActorRef clientActor, final Identifier identifier, final Payload data) {
-
- ReplicatedLogEntry replicatedLogEntry = new ReplicatedLogImplEntry(
+ protected final void persistData(final ActorRef clientActor, final Identifier identifier, final Payload data,
+ final boolean batchHint) {
+ ReplicatedLogEntry replicatedLogEntry = new SimpleReplicatedLogEntry(
context.getReplicatedLog().lastIndex() + 1,
context.getTermInformation().getCurrentTerm(), data);
+ replicatedLogEntry.setPersistencePending(true);
LOG.debug("{}: Persist data {}", persistenceId(), replicatedLogEntry);
final RaftActorContext raftContext = getRaftActorContext();
- replicatedLog().appendAndPersist(replicatedLogEntry, replicatedLogEntry1 -> {
+ boolean wasAppended = replicatedLog().appendAndPersist(replicatedLogEntry, persistedLogEntry -> {
+ // Clear the persistence pending flag in the log entry.
+ persistedLogEntry.setPersistencePending(false);
+
if (!hasFollowers()) {
// Increment the Commit Index and the Last Applied values
- raftContext.setCommitIndex(replicatedLogEntry1.getIndex());
- raftContext.setLastApplied(replicatedLogEntry1.getIndex());
+ raftContext.setCommitIndex(persistedLogEntry.getIndex());
+ raftContext.setLastApplied(persistedLogEntry.getIndex());
// Apply the state immediately.
- self().tell(new ApplyState(clientActor, identifier, replicatedLogEntry1), self());
+ self().tell(new ApplyState(clientActor, identifier, persistedLogEntry), self());
// Send a ApplyJournalEntries message so that we write the fact that we applied
// the state to durable storage
- self().tell(new ApplyJournalEntries(replicatedLogEntry1.getIndex()), self());
+ self().tell(new ApplyJournalEntries(persistedLogEntry.getIndex()), self());
} else {
- context.getReplicatedLog().captureSnapshotIfReady(replicatedLogEntry1);
+ context.getReplicatedLog().captureSnapshotIfReady(replicatedLogEntry);
- // Send message for replication
- getCurrentBehavior().handleMessage(getSelf(),
- new Replicate(clientActor, identifier, replicatedLogEntry1));
+ // Local persistence is complete so send the CheckConsensusReached message to the behavior (which
+ // normally should still be the leader) to check if consensus has now been reached in conjunction with
+ // follower replication.
+ getCurrentBehavior().handleMessage(getSelf(), CheckConsensusReached.INSTANCE);
}
- });
+ }, true);
+
+ if (wasAppended && hasFollowers()) {
+ // Send log entry for replication.
+ getCurrentBehavior().handleMessage(getSelf(), new Replicate(clientActor, identifier, replicatedLogEntry,
+ !batchHint));
+ }
}
private ReplicatedLog replicatedLog() {