import com.google.common.base.Throwables;
import com.google.common.io.ByteSource;
import java.io.IOException;
+import java.io.ObjectOutputStream;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Queue;
+import java.util.concurrent.TimeUnit;
import javax.annotation.Nullable;
+import org.opendaylight.controller.cluster.io.SharedFileBackedOutputStream;
+import org.opendaylight.controller.cluster.messaging.MessageSlicer;
+import org.opendaylight.controller.cluster.messaging.SliceOptions;
import org.opendaylight.controller.cluster.raft.ClientRequestTracker;
import org.opendaylight.controller.cluster.raft.ClientRequestTrackerImpl;
import org.opendaylight.controller.cluster.raft.FollowerLogInformation;
*/
private final Queue<ClientRequestTracker> trackers = new LinkedList<>();
+ /**
+ * Map of serialized AppendEntries output streams keyed by log index. This is used in conjunction with the
+ * appendEntriesMessageSlicer for slicing single ReplicatedLogEntry payloads that exceed the message size threshold.
+ * This Map allows the SharedFileBackedOutputStreams to be reused for multiple followers.
+ */
+ private final Map<Long, SharedFileBackedOutputStream> sharedSerializedAppendEntriesStreams = new HashMap<>();
+ private final MessageSlicer appendEntriesMessageSlicer;
+
private Cancellable heartbeatSchedule = null;
private Optional<SnapshotHolder> snapshotHolder = Optional.absent();
private int minReplicationCount;
@Nullable AbstractLeader initializeFromLeader) {
super(context, state);
+ appendEntriesMessageSlicer = MessageSlicer.builder().logContext(logName())
+ .messageSliceSize(context.getConfigParams().getSnapshotChunkSize())
+ .expireStateAfterInactivity(context.getConfigParams().getElectionTimeOutInterval().toMillis() * 3,
+ TimeUnit.MILLISECONDS).build();
+
if (initializeFromLeader != null) {
followerToLog.putAll(initializeFromLeader.followerToLog);
snapshotHolder = initializeFromLeader.snapshotHolder;
public RaftActorBehavior handleMessage(ActorRef sender, Object message) {
Preconditions.checkNotNull(sender, "sender should not be null");
+ if (appendEntriesMessageSlicer.handleMessage(message)) {
+ return this;
+ }
+
if (message instanceof RaftRPC) {
RaftRPC rpc = (RaftRPC) message;
// If RPC request or response contains term T > currentTerm:
// we send a heartbeat even if we have not received a reply for the last chunk
sendAppendEntries = true;
}
+ } else if (followerLogInformation.isLogEntrySlicingInProgress()) {
+ sendAppendEntries = sendHeartbeat;
} else {
long leaderLastIndex = context.getReplicatedLog().lastIndex();
long leaderSnapShotIndex = context.getReplicatedLog().getSnapshotIndex();
followerNextIndex, followerId);
if (followerLogInformation.okToReplicate()) {
- // Try to send all the entries in the journal but not exceeding the max data size
- // for a single AppendEntries message.
- int maxEntries = (int) context.getReplicatedLog().size();
- entries = context.getReplicatedLog().getFrom(followerNextIndex, maxEntries,
- context.getConfigParams().getSnapshotChunkSize());
+ entries = getEntriesToSend(followerLogInformation, followerActor);
sendAppendEntries = true;
}
} else if (isFollowerActive && followerNextIndex >= 0
}
}
+ private List<ReplicatedLogEntry> getEntriesToSend(FollowerLogInformation followerLogInfo,
+ ActorSelection followerActor) {
+ // Try to get all the entries in the journal but not exceeding the max data size for a single AppendEntries
+ // message.
+ int maxEntries = (int) context.getReplicatedLog().size();
+ final int maxDataSize = context.getConfigParams().getSnapshotChunkSize();
+ final long followerNextIndex = followerLogInfo.getNextIndex();
+ List<ReplicatedLogEntry> entries = context.getReplicatedLog().getFrom(followerNextIndex,
+ maxEntries, maxDataSize);
+
+ // If the first entry's size exceeds the max data size threshold, it will be returned from the call above. If
+ // that is the case, then we need to slice it into smaller chunks.
+ if (!(entries.size() == 1 && entries.get(0).getData().size() > maxDataSize)) {
+ // Don't need to slice.
+ return entries;
+ }
+
+ log.debug("{}: Log entry size {} exceeds max payload size {}", logName(), entries.get(0).getData().size(),
+ maxDataSize);
+
+ // If an AppendEntries has already been serialized for the log index then reuse the
+ // SharedFileBackedOutputStream.
+ final Long logIndex = entries.get(0).getIndex();
+ SharedFileBackedOutputStream fileBackedStream = sharedSerializedAppendEntriesStreams.get(logIndex);
+ if (fileBackedStream == null) {
+ fileBackedStream = context.getFileBackedOutputStreamFactory().newSharedInstance();
+
+ final AppendEntries appendEntries = new AppendEntries(currentTerm(), context.getId(),
+ getLogEntryIndex(followerNextIndex - 1), getLogEntryTerm(followerNextIndex - 1), entries,
+ context.getCommitIndex(), getReplicatedToAllIndex(), context.getPayloadVersion());
+
+ log.debug("{}: Serializing {} for slicing for follower {}", logName(), appendEntries,
+ followerLogInfo.getId());
+
+ try (ObjectOutputStream out = new ObjectOutputStream(fileBackedStream)) {
+ out.writeObject(appendEntries);
+ } catch (IOException e) {
+ log.error("{}: Error serializing {}", logName(), appendEntries, e);
+ fileBackedStream.cleanup();
+ return Collections.emptyList();
+ }
+
+ sharedSerializedAppendEntriesStreams.put(logIndex, fileBackedStream);
+
+ fileBackedStream.setOnCleanupCallback(index -> {
+ log.debug("{}: On SharedFileBackedOutputStream cleanup for index {}", logName(), index);
+ sharedSerializedAppendEntriesStreams.remove(index);
+ }, logIndex);
+ } else {
+ log.debug("{}: Reusing SharedFileBackedOutputStream for follower {}", logName(), followerLogInfo.getId());
+ fileBackedStream.incrementUsageCount();
+ }
+
+ log.debug("{}: Slicing stream for index {}, follower {}", logName(), logIndex, followerLogInfo.getId());
+
+ // Record that slicing is in progress for the follower.
+ followerLogInfo.setSlicedLogEntryIndex(logIndex);
+
+ final FollowerIdentifier identifier = new FollowerIdentifier(followerLogInfo.getId());
+ appendEntriesMessageSlicer.slice(SliceOptions.builder().identifier(identifier)
+ .fileBackedOutputStream(fileBackedStream).sendTo(followerActor).replyTo(actor())
+ .onFailureCallback(failure -> {
+ log.error("{}: Error slicing AppendEntries for follower {}", logName(),
+ followerLogInfo.getId(), failure);
+ followerLogInfo.setSlicedLogEntryIndex(FollowerLogInformation.NO_INDEX);
+ }).build());
+
+ return Collections.emptyList();
+ }
+
private void sendAppendEntriesToFollower(ActorSelection followerActor, List<ReplicatedLogEntry> entries,
FollowerLogInformation followerLogInformation) {
// In certain cases outlined below we don't want to send the actual commit index to prevent the follower from
// empty AppendEntries as a heart beat to prevent election.
// - if we're in the process of installing a snapshot. In this case we don't send any new entries but still
// need to send AppendEntries to prevent election.
+ // - if we're in the process of slicing an AppendEntries with a large log entry payload. In this case we
+ // need to send an empty AppendEntries to prevent election.
boolean isInstallingSnaphot = followerLogInformation.getInstallSnapshotState() != null;
- long leaderCommitIndex = isInstallingSnaphot || !followerLogInformation.isFollowerActive() ? -1 :
- context.getCommitIndex();
+ long leaderCommitIndex = isInstallingSnaphot || followerLogInformation.isLogEntrySlicingInProgress()
+ || !followerLogInformation.isFollowerActive() ? -1 : context.getCommitIndex();
long followerNextIndex = followerLogInformation.getNextIndex();
AppendEntries appendEntries = new AppendEntries(currentTerm(), context.getId(),
if (!followerToLog.isEmpty()) {
log.trace("{}: Sending heartbeat", logName());
sendAppendEntries(context.getConfigParams().getHeartBeatInterval().toMillis(), true);
+
+ appendEntriesMessageSlicer.checkExpiredSlicedMessageState();
}
}
@Override
public void close() {
stopHeartBeat();
+ appendEntriesMessageSlicer.close();
}
@Override