Fixup "Leader should always apply modifications as local" regression
[controller.git] / opendaylight / md-sal / sal-akka-raft / src / main / java / org / opendaylight / controller / cluster / raft / behaviors / Follower.java
index 2377fbf442d45099856d9ebd61f402b01588e4b7..b642ee43a563588f7a95c08e4b97495b94526186 100644 (file)
@@ -5,7 +5,6 @@
  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
  * and is available at http://www.eclipse.org/legal/epl-v10.html
  */
-
 package org.opendaylight.controller.cluster.raft.behaviors;
 
 import akka.actor.ActorRef;
@@ -15,7 +14,6 @@ import akka.cluster.Cluster;
 import akka.cluster.ClusterEvent.CurrentClusterState;
 import akka.cluster.Member;
 import akka.cluster.MemberStatus;
-import akka.japi.Procedure;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Stopwatch;
 import java.io.IOException;
@@ -25,12 +23,14 @@ import java.util.Optional;
 import java.util.Set;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
-import javax.annotation.Nullable;
+import java.util.function.Consumer;
+import org.eclipse.jdt.annotation.Nullable;
 import org.opendaylight.controller.cluster.messaging.MessageAssembler;
 import org.opendaylight.controller.cluster.raft.RaftActorContext;
 import org.opendaylight.controller.cluster.raft.RaftState;
 import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
 import org.opendaylight.controller.cluster.raft.base.messages.ApplySnapshot;
+import org.opendaylight.controller.cluster.raft.base.messages.ApplyState;
 import org.opendaylight.controller.cluster.raft.base.messages.ElectionTimeout;
 import org.opendaylight.controller.cluster.raft.base.messages.TimeoutNow;
 import org.opendaylight.controller.cluster.raft.messages.AppendEntries;
@@ -94,7 +94,7 @@ public class Follower extends AbstractRaftActorBehavior {
     }
 
     @VisibleForTesting
-    protected final void setLeaderId(@Nullable final String leaderId) {
+    protected final void setLeaderId(final @Nullable String leaderId) {
         this.leaderId = leaderId;
     }
 
@@ -148,7 +148,8 @@ public class Follower extends AbstractRaftActorBehavior {
         if (snapshotTracker != null || context.getSnapshotManager().isApplying()) {
             // if snapshot install is in progress, follower should just acknowledge append entries with a reply.
             AppendEntriesReply reply = new AppendEntriesReply(context.getId(), currentTerm(), true,
-                    lastIndex(), lastTerm(), context.getPayloadVersion());
+                    lastIndex(), lastTerm(), context.getPayloadVersion(), false, needsLeaderAddress(),
+                    appendEntries.getLeaderRaftVersion());
 
             log.debug("{}: snapshot install is in progress, replying immediately with {}", logName(), reply);
             sender.tell(reply, actor());
@@ -160,6 +161,14 @@ public class Follower extends AbstractRaftActorBehavior {
         leaderId = appendEntries.getLeaderId();
         leaderPayloadVersion = appendEntries.getPayloadVersion();
 
+        if (appendEntries.getLeaderAddress().isPresent()) {
+            final String address = appendEntries.getLeaderAddress().get();
+            log.debug("New leader address: {}", address);
+
+            context.setPeerAddress(leaderId, address);
+            context.getConfigParams().getPeerAddressResolver().setResolved(leaderId, address);
+        }
+
         // First check if the logs are in sync or not
         if (isOutOfSync(appendEntries, sender)) {
             updateInitialSyncStatus(appendEntries.getLeaderCommit(), appendEntries.getLeaderId());
@@ -184,7 +193,8 @@ public class Follower extends AbstractRaftActorBehavior {
         }
 
         AppendEntriesReply reply = new AppendEntriesReply(context.getId(), currentTerm(), true,
-                lastIndex, lastTerm(), context.getPayloadVersion());
+                lastIndex, lastTerm(), context.getPayloadVersion(), false, needsLeaderAddress(),
+                appendEntries.getLeaderRaftVersion());
 
         if (log.isTraceEnabled()) {
             log.trace("{}: handleAppendEntries returning : {}", logName(), reply);
@@ -267,14 +277,16 @@ public class Follower extends AbstractRaftActorBehavior {
 
                         log.info("{}: Could not remove entries - sending reply to force snapshot", logName());
                         sender.tell(new AppendEntriesReply(context.getId(), currentTerm(), false, lastIndex,
-                                lastTerm(), context.getPayloadVersion(), true), actor());
+                                lastTerm(), context.getPayloadVersion(), true, needsLeaderAddress(),
+                                appendEntries.getLeaderRaftVersion()), actor());
                         return false;
                     }
 
                     break;
                 } else {
                     sender.tell(new AppendEntriesReply(context.getId(), currentTerm(), false, lastIndex,
-                            lastTerm(), context.getPayloadVersion(), true), actor());
+                            lastTerm(), context.getPayloadVersion(), true, needsLeaderAddress(),
+                            appendEntries.getLeaderRaftVersion()), actor());
                     return false;
                 }
             }
@@ -292,7 +304,7 @@ public class Follower extends AbstractRaftActorBehavior {
         // applied to the state already, as the persistence callback occurs async, and we want those entries
         // purged from the persisted log as well.
         final AtomicBoolean shouldCaptureSnapshot = new AtomicBoolean(false);
-        final Procedure<ReplicatedLogEntry> appendAndPersistCallback = logEntry -> {
+        final Consumer<ReplicatedLogEntry> appendAndPersistCallback = logEntry -> {
             final List<ReplicatedLogEntry> entries = appendEntries.getEntries();
             final ReplicatedLogEntry lastEntryToAppend = entries.get(entries.size() - 1);
             if (shouldCaptureSnapshot.get() && logEntry == lastEntryToAppend) {
@@ -332,7 +344,7 @@ public class Follower extends AbstractRaftActorBehavior {
             log.info("{}: The followers log is empty and the senders prevLogIndex is {}", logName(),
                 appendEntries.getPrevLogIndex());
 
-            sendOutOfSyncAppendEntriesReply(sender, false);
+            sendOutOfSyncAppendEntriesReply(sender, false, appendEntries.getLeaderRaftVersion());
             return true;
         }
 
@@ -351,7 +363,7 @@ public class Follower extends AbstractRaftActorBehavior {
                         appendEntries.getPrevLogTerm(), lastIndex, context.getReplicatedLog().getSnapshotIndex(),
                         context.getReplicatedLog().getSnapshotTerm());
 
-                    sendOutOfSyncAppendEntriesReply(sender, false);
+                    sendOutOfSyncAppendEntriesReply(sender, false, appendEntries.getLeaderRaftVersion());
                     return true;
                 }
             } else if (appendEntries.getPrevLogIndex() != -1) {
@@ -362,7 +374,7 @@ public class Follower extends AbstractRaftActorBehavior {
                         + "snapshotIndex: {}, snapshotTerm: {}", logName(), appendEntries.getPrevLogIndex(), lastIndex,
                         context.getReplicatedLog().getSnapshotIndex(), context.getReplicatedLog().getSnapshotTerm());
 
-                sendOutOfSyncAppendEntriesReply(sender, false);
+                sendOutOfSyncAppendEntriesReply(sender, false, appendEntries.getLeaderRaftVersion());
                 return true;
             }
         }
@@ -378,7 +390,7 @@ public class Follower extends AbstractRaftActorBehavior {
                         appendEntries.getReplicatedToAllIndex(), lastIndex,
                         context.getReplicatedLog().getSnapshotIndex(), context.getReplicatedLog().getSnapshotTerm());
 
-                sendOutOfSyncAppendEntriesReply(sender, false);
+                sendOutOfSyncAppendEntriesReply(sender, false, appendEntries.getLeaderRaftVersion());
                 return true;
             }
 
@@ -389,7 +401,7 @@ public class Follower extends AbstractRaftActorBehavior {
                         entries.get(0).getIndex() - 1, lastIndex, context.getReplicatedLog().getSnapshotIndex(),
                         context.getReplicatedLog().getSnapshotTerm());
 
-                sendOutOfSyncAppendEntriesReply(sender, false);
+                sendOutOfSyncAppendEntriesReply(sender, false, appendEntries.getLeaderRaftVersion());
                 return true;
             }
         }
@@ -397,15 +409,21 @@ public class Follower extends AbstractRaftActorBehavior {
         return false;
     }
 
-    private void sendOutOfSyncAppendEntriesReply(final ActorRef sender, boolean forceInstallSnapshot) {
+    private void sendOutOfSyncAppendEntriesReply(final ActorRef sender, final boolean forceInstallSnapshot,
+            final short leaderRaftVersion) {
         // We found that the log was out of sync so just send a negative reply.
         final AppendEntriesReply reply = new AppendEntriesReply(context.getId(), currentTerm(), false, lastIndex(),
-                lastTerm(), context.getPayloadVersion(), forceInstallSnapshot);
+                lastTerm(), context.getPayloadVersion(), forceInstallSnapshot, needsLeaderAddress(),
+                leaderRaftVersion);
 
         log.info("{}: Follower is out-of-sync so sending negative reply: {}", logName(), reply);
         sender.tell(reply, actor());
     }
 
+    private boolean needsLeaderAddress() {
+        return context.getPeerAddress(leaderId) == null;
+    }
+
     @Override
     protected RaftActorBehavior handleAppendEntriesReply(final ActorRef sender,
         final AppendEntriesReply appendEntriesReply) {
@@ -418,6 +436,11 @@ public class Follower extends AbstractRaftActorBehavior {
         return this;
     }
 
+    @Override
+    final ApplyState getApplyStateFor(final ReplicatedLogEntry entry) {
+        return new ApplyState(null, null, entry);
+    }
+
     @Override
     public RaftActorBehavior handleMessage(final ActorRef sender, final Object message) {
         if (message instanceof ElectionTimeout || message instanceof TimeoutNow) {
@@ -437,7 +460,7 @@ public class Follower extends AbstractRaftActorBehavior {
         // If RPC request or response contains term T > currentTerm:
         // set currentTerm = T, convert to follower (ยง5.1)
         // This applies to all RPC messages and responses
-        if (rpc.getTerm() > context.getTermInformation().getCurrentTerm()) {
+        if (rpc.getTerm() > context.getTermInformation().getCurrentTerm() && shouldUpdateTerm(rpc)) {
             log.info("{}: Term {} in \"{}\" message is greater than follower's term {} - updating term",
                 logName(), rpc.getTerm(), rpc, context.getTermInformation().getCurrentTerm());
 
@@ -583,7 +606,7 @@ public class Follower extends AbstractRaftActorBehavior {
                         installSnapshot.getLastIncludedTerm(),
                         context.getTermInformation().getCurrentTerm(),
                         context.getTermInformation().getVotedFor(),
-                        installSnapshot.getServerConfig().orNull());
+                        installSnapshot.getServerConfig().orElse(null));
 
                 ApplySnapshot.Callback applySnapshotCallback = new ApplySnapshot.Callback() {
                     @Override