Bug 7391: Fix out-of-order LeaderStateChange events
[controller.git] / opendaylight / md-sal / sal-akka-raft / src / main / java / org / opendaylight / controller / cluster / raft / RaftActor.java
index 46551506e337182df1c0f7a05e2a6fe1e11043ab..fff6ce9ed17db138af3ec73be83ddcd4d324f102 100644 (file)
@@ -51,6 +51,7 @@ import org.opendaylight.controller.cluster.raft.client.messages.OnDemandRaftStat
 import org.opendaylight.controller.cluster.raft.client.messages.Shutdown;
 import org.opendaylight.controller.cluster.raft.persisted.ApplyJournalEntries;
 import org.opendaylight.controller.cluster.raft.persisted.NoopPayload;
+import org.opendaylight.controller.cluster.raft.persisted.SimpleReplicatedLogEntry;
 import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload;
 import org.opendaylight.yangtools.concepts.Identifier;
 import org.opendaylight.yangtools.concepts.Immutable;
@@ -277,13 +278,13 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
         } else if (message instanceof SwitchBehavior) {
             switchBehavior((SwitchBehavior) message);
         } else if (message instanceof LeaderTransitioning) {
-            onLeaderTransitioning();
+            onLeaderTransitioning((LeaderTransitioning)message);
         } else if (message instanceof Shutdown) {
             onShutDown();
         } else if (message instanceof Runnable) {
             ((Runnable)message).run();
         } else if (message instanceof NoopPayload) {
-            persistData(null, null, (NoopPayload)message);
+            persistData(null, null, (NoopPayload)message, false);
         } else if (!possiblyHandleBehaviorMessage(message)) {
             handleNonRaftCommand(message);
         }
@@ -375,10 +376,11 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
         }
     }
 
-    private void onLeaderTransitioning() {
-        LOG.debug("{}: onLeaderTransitioning", persistenceId());
+    private void onLeaderTransitioning(final LeaderTransitioning leaderTransitioning) {
+        LOG.debug("{}: onLeaderTransitioning: {}", persistenceId(), leaderTransitioning);
         Optional<ActorRef> roleChangeNotifier = getRoleChangeNotifier();
-        if (getRaftState() == RaftState.Follower && roleChangeNotifier.isPresent()) {
+        if (getRaftState() == RaftState.Follower && roleChangeNotifier.isPresent()
+                && leaderTransitioning.getLeaderId().equals(getCurrentBehavior().getLeaderId())) {
             roleChangeNotifier.get().tell(newLeaderStateChanged(getId(), null,
                 getCurrentBehavior().getLeaderPayloadVersion()), getSelf());
         }
@@ -513,12 +515,18 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
     }
 
     /**
-     * When a derived RaftActor needs to persist something it must call
-     * persistData.
+     * Persists the given Payload in the journal and replicates to any followers. After successful completion,
+     * {@link #applyState(ActorRef, Identifier, Object)} is notified.
+     *
+     * @param clientActor optional ActorRef that is provided via the applyState callback
+     * @param identifier the payload identifier
+     * @param data the payload data to persist
+     * @param batchHint if true, an attempt is made to delay immediate replication and batch the payload with
+     *        subsequent payloads for efficiency. Otherwise the payload is immediately replicated.
      */
-    protected final void persistData(final ActorRef clientActor, final Identifier identifier, final Payload data) {
-
-        ReplicatedLogEntry replicatedLogEntry = new ReplicatedLogImplEntry(
+    protected final void persistData(final ActorRef clientActor, final Identifier identifier, final Payload data,
+            final boolean batchHint) {
+        ReplicatedLogEntry replicatedLogEntry = new SimpleReplicatedLogEntry(
             context.getReplicatedLog().lastIndex() + 1,
             context.getTermInformation().getCurrentTerm(), data);
         replicatedLogEntry.setPersistencePending(true);
@@ -555,7 +563,8 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
 
         if (wasAppended && hasFollowers()) {
             // Send log entry for replication.
-            getCurrentBehavior().handleMessage(getSelf(), new Replicate(clientActor, identifier, replicatedLogEntry));
+            getCurrentBehavior().handleMessage(getSelf(), new Replicate(clientActor, identifier, replicatedLogEntry,
+                    !batchHint));
         }
     }