Bug 2890: Chunk AppendEntries when single payload size exceeds threshold
[controller.git] / opendaylight / md-sal / sal-akka-raft / src / main / java / org / opendaylight / controller / cluster / raft / behaviors / Follower.java
index 15877194173cf330d85f221688ab890366033be4..9f0e0327ce16e109c33c28e0b7c0147aabe69194 100644 (file)
@@ -26,6 +26,7 @@ import java.util.Set;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import javax.annotation.Nullable;
+import org.opendaylight.controller.cluster.messaging.MessageAssembler;
 import org.opendaylight.controller.cluster.raft.RaftActorContext;
 import org.opendaylight.controller.cluster.raft.RaftState;
 import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
@@ -56,6 +57,8 @@ public class Follower extends AbstractRaftActorBehavior {
 
     private final SyncStatusTracker initialSyncStatusTracker;
 
+    private final MessageAssembler appendEntriesMessageAssembler;
+
     private final Stopwatch lastLeaderMessageTimer = Stopwatch.createStarted();
     private SnapshotTracker snapshotTracker = null;
     private String leaderId;
@@ -74,6 +77,10 @@ public class Follower extends AbstractRaftActorBehavior {
         initialSyncStatusTracker = new SyncStatusTracker(context.getActor(), getId(), context.getConfigParams()
             .getSyncIndexThreshold());
 
+        appendEntriesMessageAssembler = MessageAssembler.builder().logContext(logName())
+                .filedBackedStreamFactory(context.getFileBackedOutputStreamFactory())
+                .assembledMessageCallback((message, sender) -> handleMessage(sender, message)).build();
+
         if (context.getPeerIds().isEmpty() && getLeaderId() == null) {
             actor().tell(TimeoutNow.INSTANCE, actor());
         } else {
@@ -315,6 +322,8 @@ public class Follower extends AbstractRaftActorBehavior {
             super.performSnapshotWithoutCapture(appendEntries.getReplicatedToAllIndex());
         }
 
+        appendEntriesMessageAssembler.checkExpiredAssembledMessageState();
+
         return this;
     }
 
@@ -396,6 +405,10 @@ public class Follower extends AbstractRaftActorBehavior {
             return handleElectionTimeout(message);
         }
 
+        if (appendEntriesMessageAssembler.handleMessage(message, actor())) {
+            return this;
+        }
+
         if (!(message instanceof RaftRPC)) {
             // The rest of the processing requires the message to be a RaftRPC
             return null;
@@ -596,6 +609,7 @@ public class Follower extends AbstractRaftActorBehavior {
     public void close() {
         closeSnapshotTracker();
         stopElection();
+        appendEntriesMessageAssembler.close();
     }
 
     @VisibleForTesting