* terms of the Eclipse Public License v1.0 which accompanies this distribution,
* and is available at http://www.eclipse.org/legal/epl-v10.html
*/
-
package org.opendaylight.controller.cluster.raft.behaviors;
+import static java.util.Objects.requireNonNull;
+
import akka.actor.ActorRef;
import akka.actor.ActorSelection;
import akka.actor.Cancellable;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Optional;
-import com.google.common.base.Preconditions;
import com.google.common.io.ByteSource;
+import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import java.io.IOException;
import java.io.ObjectOutputStream;
import java.util.Collection;
import java.util.Map.Entry;
import java.util.Queue;
import java.util.concurrent.TimeUnit;
-import javax.annotation.Nullable;
+import org.eclipse.jdt.annotation.Nullable;
import org.opendaylight.controller.cluster.io.SharedFileBackedOutputStream;
import org.opendaylight.controller.cluster.messaging.MessageSlicer;
import org.opendaylight.controller.cluster.messaging.SliceOptions;
private int minReplicationCount;
protected AbstractLeader(final RaftActorContext context, final RaftState state,
- @Nullable final AbstractLeader initializeFromLeader) {
+ final @Nullable AbstractLeader initializeFromLeader) {
super(context, state);
appendEntriesMessageSlicer = MessageSlicer.builder().logContext(logName())
}
@VisibleForTesting
- void setSnapshotHolder(@Nullable final SnapshotHolder snapshotHolder) {
+ void setSnapshotHolder(final @Nullable SnapshotHolder snapshotHolder) {
this.snapshotHolder = Optional.fromNullable(snapshotHolder);
}
@Override
public RaftActorBehavior handleMessage(final ActorRef sender, final Object message) {
- Preconditions.checkNotNull(sender, "sender should not be null");
+ requireNonNull(sender, "sender should not be null");
if (appendEntriesMessageSlicer.handleMessage(message)) {
return this;
return this;
}
+ @SuppressFBWarnings(value = "NP_NULL_PARAM_DEREF_ALL_TARGETS_DANGEROUS",
+ justification = "JDT nullness with SpotBugs at setSnapshotHolder(null)")
private void handleInstallSnapshotReply(final InstallSnapshotReply reply) {
log.debug("{}: handleInstallSnapshotReply: {}", logName(), reply);
return;
}
+ installSnapshotState.resetChunkTimer();
followerLogInformation.markFollowerActive();
if (installSnapshotState.getChunkIndex() == reply.getChunkIndex()) {
LeaderInstallSnapshotState installSnapshotState = followerLogInformation.getInstallSnapshotState();
if (installSnapshotState != null) {
+
// if install snapshot is in process , then sent next chunk if possible
- if (isFollowerActive && installSnapshotState.canSendNextChunk()) {
- sendSnapshotChunk(followerActor, followerLogInformation);
- } else if (sendHeartbeat) {
+ if (isFollowerActive) {
+ // 30 seconds with default settings, can be modified via heartbeat or election timeout factor
+ FiniteDuration snapshotReplyTimeout = context.getConfigParams().getHeartBeatInterval()
+ .$times(context.getConfigParams().getElectionTimeoutFactor() * 3);
+
+ if (installSnapshotState.isChunkTimedOut(snapshotReplyTimeout)) {
+ sendAppendEntries = !resendSnapshotChunk(followerActor, followerLogInformation);
+ } else if (installSnapshotState.canSendNextChunk()) {
+ sendSnapshotChunk(followerActor, followerLogInformation);
+ }
+ } else if (sendHeartbeat || followerLogInformation.hasStaleCommitIndex(context.getCommitIndex())) {
// we send a heartbeat even if we have not received a reply for the last chunk
sendAppendEntries = true;
}
log.debug("{}: sendAppendEntries: {} is present for follower {}", logName(),
followerNextIndex, followerId);
- if (followerLogInformation.okToReplicate()) {
+ if (followerLogInformation.okToReplicate(context.getCommitIndex())) {
entries = getEntriesToSend(followerLogInformation, followerActor);
sendAppendEntries = true;
}
context.getReplicatedLog().size());
}
- } else if (sendHeartbeat) {
+ } else if (sendHeartbeat || followerLogInformation.hasStaleCommitIndex(context.getCommitIndex())) {
// we send an AppendEntries, even if the follower is inactive
// in-order to update the followers timestamp, in case it becomes active again
sendAppendEntries = true;
appendEntries);
}
+ followerLogInformation.setSentCommitIndex(leaderCommitIndex);
followerActor.tell(appendEntries, actor());
}
serverConfig = Optional.fromNullable(context.getPeerServerInfo(true));
}
- followerActor.tell(
- new InstallSnapshot(currentTerm(), context.getId(),
- snapshotHolder.get().getLastIncludedIndex(),
- snapshotHolder.get().getLastIncludedTerm(),
- nextSnapshotChunk,
- nextChunkIndex,
- installSnapshotState.getTotalChunks(),
- Optional.of(installSnapshotState.getLastChunkHashCode()),
- serverConfig
- ).toSerializable(followerLogInfo.getRaftVersion()),
- actor()
- );
+ sendSnapshotChunk(followerActor, followerLogInfo, nextSnapshotChunk, nextChunkIndex, serverConfig);
} catch (IOException e) {
throw new RuntimeException(e);
}
}
+ private void sendSnapshotChunk(final ActorSelection followerActor, final FollowerLogInformation followerLogInfo,
+ final byte[] snapshotChunk, final int chunkIndex,
+ final Optional<ServerConfigurationPayload> serverConfig) {
+ LeaderInstallSnapshotState installSnapshotState = followerLogInfo.getInstallSnapshotState();
+
+ installSnapshotState.startChunkTimer();
+ followerActor.tell(
+ new InstallSnapshot(currentTerm(), context.getId(),
+ snapshotHolder.get().getLastIncludedIndex(),
+ snapshotHolder.get().getLastIncludedTerm(),
+ snapshotChunk,
+ chunkIndex,
+ installSnapshotState.getTotalChunks(),
+ Optional.of(installSnapshotState.getLastChunkHashCode()),
+ serverConfig
+ ).toSerializable(followerLogInfo.getRaftVersion()),
+ actor()
+ );
+ }
+
+ private boolean resendSnapshotChunk(final ActorSelection followerActor,
+ final FollowerLogInformation followerLogInfo) {
+ if (!snapshotHolder.isPresent()) {
+ // Seems like we should never hit this case, but just in case we do, reset the snapshot progress so that it
+ // can restart from the next AppendEntries.
+ log.warn("{}: Attempting to resend snapshot with no snapshot holder present.", logName());
+ followerLogInfo.clearLeaderInstallSnapshotState();
+ return false;
+ }
+
+ LeaderInstallSnapshotState installSnapshotState = followerLogInfo.getInstallSnapshotState();
+ // we are resending, timer needs to be reset
+ installSnapshotState.resetChunkTimer();
+ installSnapshotState.markSendStatus(false);
+
+ sendSnapshotChunk(followerActor, followerLogInfo);
+
+ return true;
+ }
+
private void sendHeartBeat() {
if (!followerToLog.isEmpty()) {
log.trace("{}: Sending heartbeat", logName());