Bug 7391: Fix out-of-order LeaderStateChange events
[controller.git] / opendaylight / md-sal / sal-akka-raft / src / main / java / org / opendaylight / controller / cluster / raft / RaftActor.java
index 8d29614105c5d86e4ab683f48a8b80b326cf14bd..fff6ce9ed17db138af3ec73be83ddcd4d324f102 100644 (file)
@@ -34,6 +34,7 @@ import org.opendaylight.controller.cluster.common.actor.AbstractUntypedPersisten
 import org.opendaylight.controller.cluster.notifications.LeaderStateChanged;
 import org.opendaylight.controller.cluster.notifications.RoleChanged;
 import org.opendaylight.controller.cluster.raft.base.messages.ApplyState;
+import org.opendaylight.controller.cluster.raft.base.messages.CheckConsensusReached;
 import org.opendaylight.controller.cluster.raft.base.messages.InitiateCaptureSnapshot;
 import org.opendaylight.controller.cluster.raft.base.messages.LeaderTransitioning;
 import org.opendaylight.controller.cluster.raft.base.messages.Replicate;
@@ -50,6 +51,7 @@ import org.opendaylight.controller.cluster.raft.client.messages.OnDemandRaftStat
 import org.opendaylight.controller.cluster.raft.client.messages.Shutdown;
 import org.opendaylight.controller.cluster.raft.persisted.ApplyJournalEntries;
 import org.opendaylight.controller.cluster.raft.persisted.NoopPayload;
+import org.opendaylight.controller.cluster.raft.persisted.SimpleReplicatedLogEntry;
 import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload;
 import org.opendaylight.yangtools.concepts.Identifier;
 import org.opendaylight.yangtools.concepts.Immutable;
@@ -59,7 +61,8 @@ import org.opendaylight.yangtools.concepts.Immutable;
  * in a cluster. It implements the RAFT algorithm as described in the paper
  * <a href='https://ramcloud.stanford.edu/wiki/download/attachments/11370504/raft.pdf'>
  * In Search of an Understandable Consensus Algorithm</a>
- * <p/>
+ *
+ * <p>
  * RaftActor has 3 states and each state has a certain behavior associated
  * with it. A Raft actor can behave as,
  * <ul>
@@ -67,27 +70,26 @@ import org.opendaylight.yangtools.concepts.Immutable;
  * <li> A Follower (or) </li>
  * <li> A Candidate </li>
  * </ul>
- * <p/>
- * <p/>
+ *
+ * <p>
  * A RaftActor MUST be a Leader in order to accept requests from clients to
  * change the state of it's encapsulated state machine. Once a RaftActor becomes
  * a Leader it is also responsible for ensuring that all followers ultimately
  * have the same log and therefore the same state machine as itself.
- * <p/>
- * <p/>
+ *
+ * <p>
  * The current behavior of a RaftActor determines how election for leadership
  * is initiated and how peer RaftActors react to request for votes.
- * <p/>
- * <p/>
+ *
+ * <p>
  * Each RaftActor also needs to know the current election term. It uses this
  * information for a couple of things. One is to simply figure out who it
  * voted for in the last election. Another is to figure out if the message
  * it received to update it's state is stale.
- * <p/>
- * <p/>
+ *
+ * <p>
  * The RaftActor uses akka-persistence to store it's replicated log.
  * Furthermore through it's behaviors a Raft Actor determines
- * <p/>
  * <ul>
  * <li> when a log entry should be persisted </li>
  * <li> when a log entry should be applied to the state machine (and) </li>
@@ -262,7 +264,7 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
             ApplyJournalEntries applyEntries = (ApplyJournalEntries) message;
             LOG.debug("{}: Persisting ApplyJournalEntries with index={}", persistenceId(), applyEntries.getToIndex());
 
-            persistence().persist(applyEntries, NoopProcedure.instance());
+            persistence().persistAsync(applyEntries, NoopProcedure.instance());
 
         } else if (message instanceof FindLeader) {
             getSender().tell(
@@ -276,13 +278,13 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
         } else if (message instanceof SwitchBehavior) {
             switchBehavior((SwitchBehavior) message);
         } else if (message instanceof LeaderTransitioning) {
-            onLeaderTransitioning();
+            onLeaderTransitioning((LeaderTransitioning)message);
         } else if (message instanceof Shutdown) {
             onShutDown();
         } else if (message instanceof Runnable) {
             ((Runnable)message).run();
         } else if (message instanceof NoopPayload) {
-            persistData(null, null, (NoopPayload)message);
+            persistData(null, null, (NoopPayload)message, false);
         } else if (!possiblyHandleBehaviorMessage(message)) {
             handleNonRaftCommand(message);
         }
@@ -374,10 +376,11 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
         }
     }
 
-    private void onLeaderTransitioning() {
-        LOG.debug("{}: onLeaderTransitioning", persistenceId());
+    private void onLeaderTransitioning(final LeaderTransitioning leaderTransitioning) {
+        LOG.debug("{}: onLeaderTransitioning: {}", persistenceId(), leaderTransitioning);
         Optional<ActorRef> roleChangeNotifier = getRoleChangeNotifier();
-        if (getRaftState() == RaftState.Follower && roleChangeNotifier.isPresent()) {
+        if (getRaftState() == RaftState.Follower && roleChangeNotifier.isPresent()
+                && leaderTransitioning.getLeaderId().equals(getCurrentBehavior().getLeaderId())) {
             roleChangeNotifier.get().tell(newLeaderStateChanged(getId(), null,
                 getCurrentBehavior().getLeaderPayloadVersion()), getSelf());
         }
@@ -512,40 +515,57 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
     }
 
     /**
-     * When a derived RaftActor needs to persist something it must call
-     * persistData.
+     * Persists the given Payload in the journal and replicates to any followers. After successful completion,
+     * {@link #applyState(ActorRef, Identifier, Object)} is notified.
+     *
+     * @param clientActor optional ActorRef that is provided via the applyState callback
+     * @param identifier the payload identifier
+     * @param data the payload data to persist
+     * @param batchHint if true, an attempt is made to delay immediate replication and batch the payload with
+     *        subsequent payloads for efficiency. Otherwise the payload is immediately replicated.
      */
-    protected final void persistData(final ActorRef clientActor, final Identifier identifier, final Payload data) {
-
-        ReplicatedLogEntry replicatedLogEntry = new ReplicatedLogImplEntry(
+    protected final void persistData(final ActorRef clientActor, final Identifier identifier, final Payload data,
+            final boolean batchHint) {
+        ReplicatedLogEntry replicatedLogEntry = new SimpleReplicatedLogEntry(
             context.getReplicatedLog().lastIndex() + 1,
             context.getTermInformation().getCurrentTerm(), data);
+        replicatedLogEntry.setPersistencePending(true);
 
         LOG.debug("{}: Persist data {}", persistenceId(), replicatedLogEntry);
 
         final RaftActorContext raftContext = getRaftActorContext();
 
-        replicatedLog().appendAndPersist(replicatedLogEntry, replicatedLogEntry1 -> {
+        boolean wasAppended = replicatedLog().appendAndPersist(replicatedLogEntry, persistedLogEntry -> {
+            // Clear the persistence pending flag in the log entry.
+            persistedLogEntry.setPersistencePending(false);
+
             if (!hasFollowers()) {
                 // Increment the Commit Index and the Last Applied values
-                raftContext.setCommitIndex(replicatedLogEntry1.getIndex());
-                raftContext.setLastApplied(replicatedLogEntry1.getIndex());
+                raftContext.setCommitIndex(persistedLogEntry.getIndex());
+                raftContext.setLastApplied(persistedLogEntry.getIndex());
 
                 // Apply the state immediately.
-                self().tell(new ApplyState(clientActor, identifier, replicatedLogEntry1), self());
+                self().tell(new ApplyState(clientActor, identifier, persistedLogEntry), self());
 
                 // Send a ApplyJournalEntries message so that we write the fact that we applied
                 // the state to durable storage
-                self().tell(new ApplyJournalEntries(replicatedLogEntry1.getIndex()), self());
+                self().tell(new ApplyJournalEntries(persistedLogEntry.getIndex()), self());
 
             } else {
-                context.getReplicatedLog().captureSnapshotIfReady(replicatedLogEntry1);
+                context.getReplicatedLog().captureSnapshotIfReady(replicatedLogEntry);
 
-                // Send message for replication
-                getCurrentBehavior().handleMessage(getSelf(),
-                        new Replicate(clientActor, identifier, replicatedLogEntry1));
+                // Local persistence is complete so send the CheckConsensusReached message to the behavior (which
+                // normally should still be the leader) to check if consensus has now been reached in conjunction with
+                // follower replication.
+                getCurrentBehavior().handleMessage(getSelf(), CheckConsensusReached.INSTANCE);
             }
-        });
+        }, true);
+
+        if (wasAppended && hasFollowers()) {
+            // Send log entry for replication.
+            getCurrentBehavior().handleMessage(getSelf(), new Replicate(clientActor, identifier, replicatedLogEntry,
+                    !batchHint));
+        }
     }
 
     private ReplicatedLog replicatedLog() {