Remove support for pre-Fluorine versions
[controller.git] / opendaylight / md-sal / sal-akka-raft / src / main / java / org / opendaylight / controller / cluster / raft / behaviors / AbstractLeader.java
index 0188a6df1ac387e603962d838def314a7dab1c8d..c11dbaac9285de7ae1d75ff057ad41782360cfdd 100644 (file)
@@ -39,6 +39,7 @@ import org.opendaylight.controller.cluster.raft.FollowerLogInformation;
 import org.opendaylight.controller.cluster.raft.PeerInfo;
 import org.opendaylight.controller.cluster.raft.RaftActorContext;
 import org.opendaylight.controller.cluster.raft.RaftState;
+import org.opendaylight.controller.cluster.raft.RaftVersions;
 import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
 import org.opendaylight.controller.cluster.raft.VotingState;
 import org.opendaylight.controller.cluster.raft.base.messages.ApplyState;
@@ -221,6 +222,13 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior {
             return this;
         }
 
+        final var followerRaftVersion = appendEntriesReply.getRaftVersion();
+        if (followerRaftVersion < RaftVersions.FLUORINE_VERSION) {
+            log.warn("{}: handleAppendEntriesReply - ignoring reply from follower {} raft version {}", logName(),
+                followerId, followerRaftVersion);
+            return this;
+        }
+
         final long lastActivityNanos = followerLogInformation.nanosSinceLastActivity();
         if (lastActivityNanos > context.getConfigParams().getElectionTimeOutInterval().toNanos()) {
             log.warn("{} : handleAppendEntriesReply delayed beyond election timeout, "
@@ -231,7 +239,7 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior {
 
         followerLogInformation.markFollowerActive();
         followerLogInformation.setPayloadVersion(appendEntriesReply.getPayloadVersion());
-        followerLogInformation.setRaftVersion(appendEntriesReply.getRaftVersion());
+        followerLogInformation.setRaftVersion(followerRaftVersion);
         followerLogInformation.setNeedsLeaderAddress(appendEntriesReply.isNeedsLeaderAddress());
 
         long followerLastLogIndex = appendEntriesReply.getLogLastIndex();
@@ -469,8 +477,8 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior {
         // and became the leader again,. We still want to apply this as a local modification because
         // we have resumed leadership with that log entry having been committed.
         final Payload payload = entry.getData();
-        if (payload instanceof IdentifiablePayload) {
-            return new ApplyState(null, ((IdentifiablePayload<?>) payload).getIdentifier(), entry);
+        if (payload instanceof IdentifiablePayload<?> identifiable) {
+            return new ApplyState(null, identifiable.getIdentifier(), entry);
         }
 
         return new ApplyState(null, null, entry);
@@ -493,31 +501,31 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior {
             return this;
         }
 
-        if (message instanceof RaftRPC rpc) {
-            // If RPC request or response contains term T > currentTerm:
-            // set currentTerm = T, convert to follower (§5.1)
-            // This applies to all RPC messages and responses
-            if (rpc.getTerm() > context.getTermInformation().getCurrentTerm() && shouldUpdateTerm(rpc)) {
-                log.info("{}: Term {} in \"{}\" message is greater than leader's term {} - switching to Follower",
-                        logName(), rpc.getTerm(), rpc, context.getTermInformation().getCurrentTerm());
-
-                context.getTermInformation().updateAndPersist(rpc.getTerm(), null);
-
-                // This is a special case. Normally when stepping down as leader we don't process and reply to the
-                // RaftRPC as per raft. But if we're in the process of transferring leadership and we get a
-                // RequestVote, process the RequestVote before switching to Follower. This enables the requesting
-                // candidate node to be elected the leader faster and avoids us possibly timing out in the Follower
-                // state and starting a new election and grabbing leadership back before the other candidate node can
-                // start a new election due to lack of responses. This case would only occur if there isn't a majority
-                // of other nodes available that can elect the requesting candidate. Since we're transferring
-                // leadership, we should make every effort to get the requesting node elected.
-                if (rpc instanceof RequestVote && context.getRaftActorLeadershipTransferCohort() != null) {
-                    log.debug("{}: Leadership transfer in progress - processing RequestVote", logName());
-                    super.handleMessage(sender, rpc);
-                }
-
-                return internalSwitchBehavior(RaftState.Follower);
+        // If RPC request or response contains term T > currentTerm:
+        // set currentTerm = T, convert to follower (§5.1)
+        // This applies to all RPC messages and responses
+        if (message instanceof RaftRPC rpc && rpc.getTerm() > context.getTermInformation().getCurrentTerm()
+                && shouldUpdateTerm(rpc)) {
+
+            log.info("{}: Term {} in \"{}\" message is greater than leader's term {} - switching to Follower",
+                logName(), rpc.getTerm(), rpc, context.getTermInformation().getCurrentTerm());
+
+            context.getTermInformation().updateAndPersist(rpc.getTerm(), null);
+
+            // This is a special case. Normally when stepping down as leader we don't process and reply to the
+            // RaftRPC as per raft. But if we're in the process of transferring leadership and we get a
+            // RequestVote, process the RequestVote before switching to Follower. This enables the requesting
+            // candidate node to be elected the leader faster and avoids us possibly timing out in the Follower
+            // state and starting a new election and grabbing leadership back before the other candidate node can
+            // start a new election due to lack of responses. This case would only occur if there isn't a majority
+            // of other nodes available that can elect the requesting candidate. Since we're transferring
+            // leadership, we should make every effort to get the requesting node elected.
+            if (rpc instanceof RequestVote requestVote && context.getRaftActorLeadershipTransferCohort() != null) {
+                log.debug("{}: Leadership transfer in progress - processing RequestVote", logName());
+                requestVote(sender, requestVote);
             }
+
+            return internalSwitchBehavior(RaftState.Follower);
         }
 
         if (message instanceof SendHeartBeat) {
@@ -528,10 +536,10 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior {
             setSnapshotHolder(new SnapshotHolder(sendInstallSnapshot.getSnapshot(),
                 sendInstallSnapshot.getSnapshotBytes()));
             sendInstallSnapshot();
-        } else if (message instanceof Replicate) {
-            replicate((Replicate) message);
-        } else if (message instanceof InstallSnapshotReply) {
-            handleInstallSnapshotReply((InstallSnapshotReply) message);
+        } else if (message instanceof Replicate replicate) {
+            replicate(replicate);
+        } else if (message instanceof InstallSnapshotReply installSnapshotReply) {
+            handleInstallSnapshotReply(installSnapshotReply);
         } else if (message instanceof CheckConsensusReached) {
             possiblyUpdateCommitIndex();
         } else {
@@ -571,7 +579,7 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior {
                 if (installSnapshotState.isLastChunk(reply.getChunkIndex())) {
                     //this was the last chunk reply
 
-                    long followerMatchIndex = snapshotHolder.get().getLastIncludedIndex();
+                    long followerMatchIndex = snapshotHolder.orElseThrow().getLastIncludedIndex();
                     followerLogInformation.setMatchIndex(followerMatchIndex);
                     followerLogInformation.setNextIndex(followerMatchIndex + 1);
                     followerLogInformation.clearLeaderInstallSnapshotState();
@@ -954,7 +962,7 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior {
 
             try {
                 // Ensure the snapshot bytes are set - this is a no-op.
-                installSnapshotState.setSnapshotBytes(snapshotHolder.get().getSnapshotBytes());
+                installSnapshotState.setSnapshotBytes(snapshotHolder.orElseThrow().getSnapshotBytes());
 
                 if (!installSnapshotState.canSendNextChunk()) {
                     return;
@@ -979,7 +987,7 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior {
             } catch (IOException e) {
                 log.warn("{}: Unable to send chunk: {}/{}. Reseting snapshot progress. Snapshot state: {}", logName(),
                         installSnapshotState.getChunkIndex(), installSnapshotState.getTotalChunks(),
-                        installSnapshotState);
+                        installSnapshotState, e);
                 installSnapshotState.reset();
             }
         }
@@ -993,14 +1001,14 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior {
         installSnapshotState.startChunkTimer();
         followerActor.tell(
                 new InstallSnapshot(currentTerm(), context.getId(),
-                        snapshotHolder.get().getLastIncludedIndex(),
-                        snapshotHolder.get().getLastIncludedTerm(),
+                        snapshotHolder.orElseThrow().getLastIncludedIndex(),
+                        snapshotHolder.orElseThrow().getLastIncludedTerm(),
                         snapshotChunk,
                         chunkIndex,
                         installSnapshotState.getTotalChunks(),
                         OptionalInt.of(installSnapshotState.getLastChunkHashCode()),
-                        serverConfig
-                ).toSerializable(followerLogInfo.getRaftVersion()),
+                        serverConfig,
+                        followerLogInfo.getRaftVersion()),
                 actor()
         );
     }