import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import javax.annotation.Nullable;
+import org.opendaylight.controller.cluster.messaging.MessageAssembler;
import org.opendaylight.controller.cluster.raft.RaftActorContext;
import org.opendaylight.controller.cluster.raft.RaftState;
import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
private final SyncStatusTracker initialSyncStatusTracker;
+ private final MessageAssembler appendEntriesMessageAssembler;
+
private final Stopwatch lastLeaderMessageTimer = Stopwatch.createStarted();
private SnapshotTracker snapshotTracker = null;
private String leaderId;
initialSyncStatusTracker = new SyncStatusTracker(context.getActor(), getId(), context.getConfigParams()
.getSyncIndexThreshold());
+ appendEntriesMessageAssembler = MessageAssembler.builder().logContext(logName())
+ .filedBackedStreamFactory(context.getFileBackedOutputStreamFactory())
+ .assembledMessageCallback((message, sender) -> handleMessage(sender, message)).build();
+
if (context.getPeerIds().isEmpty() && getLeaderId() == null) {
actor().tell(TimeoutNow.INSTANCE, actor());
} else {
super.performSnapshotWithoutCapture(appendEntries.getReplicatedToAllIndex());
}
+ appendEntriesMessageAssembler.checkExpiredAssembledMessageState();
+
return this;
}
return handleElectionTimeout(message);
}
+ if (appendEntriesMessageAssembler.handleMessage(message, actor())) {
+ return this;
+ }
+
if (!(message instanceof RaftRPC)) {
// The rest of the processing requires the message to be a RaftRPC
return null;
public void close() {
closeSnapshotTracker();
stopElection();
+ appendEntriesMessageAssembler.close();
}
@VisibleForTesting