Bug 2890: Chunk AppendEntries when single payload size exceeds threshold
[controller.git] / opendaylight / md-sal / sal-akka-raft / src / main / java / org / opendaylight / controller / cluster / raft / behaviors / AbstractLeader.java
index 097f0ec677ea52355d99a1256c3a219ccf522869..9fc7b3393cbe1f86404e2eb9a08a9ca6f6e21bc1 100644 (file)
@@ -17,6 +17,7 @@ import com.google.common.base.Preconditions;
 import com.google.common.base.Throwables;
 import com.google.common.io.ByteSource;
 import java.io.IOException;
+import java.io.ObjectOutputStream;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
@@ -26,7 +27,11 @@ import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
 import java.util.Queue;
+import java.util.concurrent.TimeUnit;
 import javax.annotation.Nullable;
+import org.opendaylight.controller.cluster.io.SharedFileBackedOutputStream;
+import org.opendaylight.controller.cluster.messaging.MessageSlicer;
+import org.opendaylight.controller.cluster.messaging.SliceOptions;
 import org.opendaylight.controller.cluster.raft.ClientRequestTracker;
 import org.opendaylight.controller.cluster.raft.ClientRequestTrackerImpl;
 import org.opendaylight.controller.cluster.raft.FollowerLogInformation;
@@ -85,6 +90,14 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior {
      */
     private final Queue<ClientRequestTracker> trackers = new LinkedList<>();
 
+    /**
+     * Map of serialized AppendEntries output streams keyed by log index. This is used in conjunction with the
+     * appendEntriesMessageSlicer for slicing single ReplicatedLogEntry payloads that exceed the message size threshold.
+     * This Map allows the SharedFileBackedOutputStreams to be reused for multiple followers.
+     */
+    private final Map<Long, SharedFileBackedOutputStream> sharedSerializedAppendEntriesStreams = new HashMap<>();
+    private final MessageSlicer appendEntriesMessageSlicer;
+
     private Cancellable heartbeatSchedule = null;
     private Optional<SnapshotHolder> snapshotHolder = Optional.absent();
     private int minReplicationCount;
@@ -93,6 +106,11 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior {
             @Nullable AbstractLeader initializeFromLeader) {
         super(context, state);
 
+        appendEntriesMessageSlicer = MessageSlicer.builder().logContext(logName())
+            .messageSliceSize(context.getConfigParams().getSnapshotChunkSize())
+            .expireStateAfterInactivity(context.getConfigParams().getElectionTimeOutInterval().toMillis() * 3,
+                    TimeUnit.MILLISECONDS).build();
+
         if (initializeFromLeader != null) {
             followerToLog.putAll(initializeFromLeader.followerToLog);
             snapshotHolder = initializeFromLeader.snapshotHolder;
@@ -438,6 +456,10 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior {
     public RaftActorBehavior handleMessage(ActorRef sender, Object message) {
         Preconditions.checkNotNull(sender, "sender should not be null");
 
+        if (appendEntriesMessageSlicer.handleMessage(message)) {
+            return this;
+        }
+
         if (message instanceof RaftRPC) {
             RaftRPC rpc = (RaftRPC) message;
             // If RPC request or response contains term T > currentTerm:
@@ -644,6 +666,8 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior {
                     // we send a heartbeat even if we have not received a reply for the last chunk
                     sendAppendEntries = true;
                 }
+            } else if (followerLogInformation.isLogEntrySlicingInProgress()) {
+                sendAppendEntries = sendHeartbeat;
             } else {
                 long leaderLastIndex = context.getReplicatedLog().lastIndex();
                 long leaderSnapShotIndex = context.getReplicatedLog().getSnapshotIndex();
@@ -660,11 +684,7 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior {
                             followerNextIndex, followerId);
 
                     if (followerLogInformation.okToReplicate()) {
-                        // Try to send all the entries in the journal but not exceeding the max data size
-                        // for a single AppendEntries message.
-                        int maxEntries = (int) context.getReplicatedLog().size();
-                        entries = context.getReplicatedLog().getFrom(followerNextIndex, maxEntries,
-                                context.getConfigParams().getSnapshotChunkSize());
+                        entries = getEntriesToSend(followerLogInformation, followerActor);
                         sendAppendEntries = true;
                     }
                 } else if (isFollowerActive && followerNextIndex >= 0
@@ -705,6 +725,76 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior {
         }
     }
 
+    private List<ReplicatedLogEntry> getEntriesToSend(FollowerLogInformation followerLogInfo,
+            ActorSelection followerActor) {
+        // Try to get all the entries in the journal but not exceeding the max data size for a single AppendEntries
+        // message.
+        int maxEntries = (int) context.getReplicatedLog().size();
+        final int maxDataSize = context.getConfigParams().getSnapshotChunkSize();
+        final long followerNextIndex = followerLogInfo.getNextIndex();
+        List<ReplicatedLogEntry> entries = context.getReplicatedLog().getFrom(followerNextIndex,
+                maxEntries, maxDataSize);
+
+        // If the first entry's size exceeds the max data size threshold, it will be returned from the call above. If
+        // that is the case, then we need to slice it into smaller chunks.
+        if (!(entries.size() == 1 && entries.get(0).getData().size() > maxDataSize)) {
+            // Don't need to slice.
+            return entries;
+        }
+
+        log.debug("{}: Log entry size {} exceeds max payload size {}", logName(), entries.get(0).getData().size(),
+                maxDataSize);
+
+        // If an AppendEntries has already been serialized for the log index then reuse the
+        // SharedFileBackedOutputStream.
+        final Long logIndex = entries.get(0).getIndex();
+        SharedFileBackedOutputStream fileBackedStream = sharedSerializedAppendEntriesStreams.get(logIndex);
+        if (fileBackedStream == null) {
+            fileBackedStream = context.getFileBackedOutputStreamFactory().newSharedInstance();
+
+            final AppendEntries appendEntries = new AppendEntries(currentTerm(), context.getId(),
+                    getLogEntryIndex(followerNextIndex - 1), getLogEntryTerm(followerNextIndex - 1), entries,
+                    context.getCommitIndex(), getReplicatedToAllIndex(), context.getPayloadVersion());
+
+            log.debug("{}: Serializing {} for slicing for follower {}", logName(), appendEntries,
+                    followerLogInfo.getId());
+
+            try (ObjectOutputStream out = new ObjectOutputStream(fileBackedStream)) {
+                out.writeObject(appendEntries);
+            } catch (IOException e) {
+                log.error("{}: Error serializing {}", logName(), appendEntries, e);
+                fileBackedStream.cleanup();
+                return Collections.emptyList();
+            }
+
+            sharedSerializedAppendEntriesStreams.put(logIndex, fileBackedStream);
+
+            fileBackedStream.setOnCleanupCallback(index -> {
+                log.debug("{}: On SharedFileBackedOutputStream cleanup for index {}", logName(), index);
+                sharedSerializedAppendEntriesStreams.remove(index);
+            }, logIndex);
+        } else {
+            log.debug("{}: Reusing SharedFileBackedOutputStream for follower {}", logName(), followerLogInfo.getId());
+            fileBackedStream.incrementUsageCount();
+        }
+
+        log.debug("{}: Slicing stream for index {}, follower {}", logName(), logIndex, followerLogInfo.getId());
+
+        // Record that slicing is in progress for the follower.
+        followerLogInfo.setSlicedLogEntryIndex(logIndex);
+
+        final FollowerIdentifier identifier = new FollowerIdentifier(followerLogInfo.getId());
+        appendEntriesMessageSlicer.slice(SliceOptions.builder().identifier(identifier)
+                .fileBackedOutputStream(fileBackedStream).sendTo(followerActor).replyTo(actor())
+                .onFailureCallback(failure -> {
+                    log.error("{}: Error slicing AppendEntries for follower {}", logName(),
+                            followerLogInfo.getId(), failure);
+                    followerLogInfo.setSlicedLogEntryIndex(FollowerLogInformation.NO_INDEX);
+                }).build());
+
+        return Collections.emptyList();
+    }
+
     private void sendAppendEntriesToFollower(ActorSelection followerActor, List<ReplicatedLogEntry> entries,
             FollowerLogInformation followerLogInformation) {
         // In certain cases outlined below we don't want to send the actual commit index to prevent the follower from
@@ -714,9 +804,11 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior {
         //       empty AppendEntries as a heart beat to prevent election.
         //     - if we're in the process of installing a snapshot. In this case we don't send any new entries but still
         //       need to send AppendEntries to prevent election.
+        //     - if we're in the process of slicing an AppendEntries with a large log entry payload. In this case we
+        //       need to send an empty AppendEntries to prevent election.
         boolean isInstallingSnaphot = followerLogInformation.getInstallSnapshotState() != null;
-        long leaderCommitIndex = isInstallingSnaphot || !followerLogInformation.isFollowerActive() ? -1 :
-            context.getCommitIndex();
+        long leaderCommitIndex = isInstallingSnaphot || followerLogInformation.isLogEntrySlicingInProgress()
+                || !followerLogInformation.isFollowerActive() ? -1 : context.getCommitIndex();
 
         long followerNextIndex = followerLogInformation.getNextIndex();
         AppendEntries appendEntries = new AppendEntries(currentTerm(), context.getId(),
@@ -858,6 +950,8 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior {
         if (!followerToLog.isEmpty()) {
             log.trace("{}: Sending heartbeat", logName());
             sendAppendEntries(context.getConfigParams().getHeartBeatInterval().toMillis(), true);
+
+            appendEntriesMessageSlicer.checkExpiredSlicedMessageState();
         }
     }
 
@@ -889,6 +983,7 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior {
     @Override
     public void close() {
         stopHeartBeat();
+        appendEntriesMessageSlicer.close();
     }
 
     @Override