Reset snapshot progress after timeout has been hit
[controller.git] / opendaylight / md-sal / sal-akka-raft / src / main / java / org / opendaylight / controller / cluster / raft / behaviors / AbstractLeader.java
index fef6cc8b7c57a142d683d1e0f27214ad751fd9d3..ec465935039eb99a0c34341e05aa93bd2340e34d 100644 (file)
@@ -533,6 +533,7 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior {
             return;
         }
 
+        installSnapshotState.resetChunkTimer();
         followerLogInformation.markFollowerActive();
 
         if (installSnapshotState.getChunkIndex() == reply.getChunkIndex()) {
@@ -664,9 +665,18 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior {
 
             LeaderInstallSnapshotState installSnapshotState = followerLogInformation.getInstallSnapshotState();
             if (installSnapshotState != null) {
+
                 // if install snapshot is in process , then sent next chunk if possible
-                if (isFollowerActive && installSnapshotState.canSendNextChunk()) {
-                    sendSnapshotChunk(followerActor, followerLogInformation);
+                if (isFollowerActive) {
+                    // 30 seconds with default settings, can be modified via heartbeat or election timeout factor
+                    FiniteDuration snapshotReplyTimeout = context.getConfigParams().getHeartBeatInterval()
+                            .$times(context.getConfigParams().getElectionTimeoutFactor() * 3);
+
+                    if (installSnapshotState.isChunkTimedOut(snapshotReplyTimeout)) {
+                        sendAppendEntries = !resendSnapshotChunk(followerActor, followerLogInformation);
+                    } else if (installSnapshotState.canSendNextChunk()) {
+                        sendSnapshotChunk(followerActor, followerLogInformation);
+                    }
                 } else if (sendHeartbeat) {
                     // we send a heartbeat even if we have not received a reply for the last chunk
                     sendAppendEntries = true;
@@ -931,18 +941,7 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior {
                     serverConfig = Optional.fromNullable(context.getPeerServerInfo(true));
                 }
 
-                followerActor.tell(
-                    new InstallSnapshot(currentTerm(), context.getId(),
-                        snapshotHolder.get().getLastIncludedIndex(),
-                        snapshotHolder.get().getLastIncludedTerm(),
-                        nextSnapshotChunk,
-                        nextChunkIndex,
-                        installSnapshotState.getTotalChunks(),
-                        Optional.of(installSnapshotState.getLastChunkHashCode()),
-                        serverConfig
-                    ).toSerializable(followerLogInfo.getRaftVersion()),
-                    actor()
-                );
+                sendSnapshotChunk(followerActor, followerLogInfo, nextSnapshotChunk, nextChunkIndex, serverConfig);
 
             } catch (IOException e) {
                 throw new RuntimeException(e);
@@ -953,6 +952,46 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior {
         }
     }
 
+    private void sendSnapshotChunk(final ActorSelection followerActor, final FollowerLogInformation followerLogInfo,
+                                   final byte[] snapshotChunk, final int chunkIndex,
+                                   final Optional<ServerConfigurationPayload> serverConfig) {
+        LeaderInstallSnapshotState installSnapshotState = followerLogInfo.getInstallSnapshotState();
+
+        installSnapshotState.startChunkTimer();
+        followerActor.tell(
+                new InstallSnapshot(currentTerm(), context.getId(),
+                        snapshotHolder.get().getLastIncludedIndex(),
+                        snapshotHolder.get().getLastIncludedTerm(),
+                        snapshotChunk,
+                        chunkIndex,
+                        installSnapshotState.getTotalChunks(),
+                        Optional.of(installSnapshotState.getLastChunkHashCode()),
+                        serverConfig
+                ).toSerializable(followerLogInfo.getRaftVersion()),
+                actor()
+        );
+    }
+
+    private boolean resendSnapshotChunk(final ActorSelection followerActor,
+                                        final FollowerLogInformation followerLogInfo) {
+        if (!snapshotHolder.isPresent()) {
+            // Seems like we should never hit this case, but just in case we do, reset the snapshot progress so that it
+            // can restart from the next AppendEntries.
+            log.warn("{}: Attempting to resend snapshot with no snapshot holder present.", logName());
+            followerLogInfo.clearLeaderInstallSnapshotState();
+            return false;
+        }
+
+        LeaderInstallSnapshotState installSnapshotState = followerLogInfo.getInstallSnapshotState();
+        // we are resending, timer needs to be reset
+        installSnapshotState.resetChunkTimer();
+        installSnapshotState.markSendStatus(false);
+
+        sendSnapshotChunk(followerActor, followerLogInfo);
+
+        return true;
+    }
+
     private void sendHeartBeat() {
         if (!followerToLog.isEmpty()) {
             log.trace("{}: Sending heartbeat", logName());