import org.opendaylight.controller.cluster.raft.PeerInfo;
import org.opendaylight.controller.cluster.raft.RaftActorContext;
import org.opendaylight.controller.cluster.raft.RaftState;
+import org.opendaylight.controller.cluster.raft.RaftVersions;
import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
import org.opendaylight.controller.cluster.raft.VotingState;
import org.opendaylight.controller.cluster.raft.base.messages.ApplyState;
import org.opendaylight.controller.cluster.raft.base.messages.SendInstallSnapshot;
import org.opendaylight.controller.cluster.raft.messages.AppendEntries;
import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply;
+import org.opendaylight.controller.cluster.raft.messages.IdentifiablePayload;
import org.opendaylight.controller.cluster.raft.messages.InstallSnapshot;
import org.opendaylight.controller.cluster.raft.messages.InstallSnapshotReply;
+import org.opendaylight.controller.cluster.raft.messages.Payload;
import org.opendaylight.controller.cluster.raft.messages.RaftRPC;
import org.opendaylight.controller.cluster.raft.messages.RequestVote;
import org.opendaylight.controller.cluster.raft.messages.RequestVoteReply;
import org.opendaylight.controller.cluster.raft.messages.UnInitializedFollowerSnapshotReply;
import org.opendaylight.controller.cluster.raft.persisted.ServerConfigurationPayload;
import org.opendaylight.controller.cluster.raft.persisted.Snapshot;
-import org.opendaylight.controller.cluster.raft.protobuff.client.messages.IdentifiablePayload;
-import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload;
import scala.concurrent.duration.FiniteDuration;
/**
return this;
}
+ final var followerRaftVersion = appendEntriesReply.getRaftVersion();
+ if (followerRaftVersion < RaftVersions.FLUORINE_VERSION) {
+ log.warn("{}: handleAppendEntriesReply - ignoring reply from follower {} raft version {}", logName(),
+ followerId, followerRaftVersion);
+ return this;
+ }
+
final long lastActivityNanos = followerLogInformation.nanosSinceLastActivity();
if (lastActivityNanos > context.getConfigParams().getElectionTimeOutInterval().toNanos()) {
log.warn("{} : handleAppendEntriesReply delayed beyond election timeout, "
followerLogInformation.markFollowerActive();
followerLogInformation.setPayloadVersion(appendEntriesReply.getPayloadVersion());
- followerLogInformation.setRaftVersion(appendEntriesReply.getRaftVersion());
+ followerLogInformation.setRaftVersion(followerRaftVersion);
followerLogInformation.setNeedsLeaderAddress(appendEntriesReply.isNeedsLeaderAddress());
long followerLastLogIndex = appendEntriesReply.getLogLastIndex();
// and became the leader again,. We still want to apply this as a local modification because
// we have resumed leadership with that log entry having been committed.
final Payload payload = entry.getData();
- if (payload instanceof IdentifiablePayload) {
- return new ApplyState(null, ((IdentifiablePayload<?>) payload).getIdentifier(), entry);
+ if (payload instanceof IdentifiablePayload<?> identifiable) {
+ return new ApplyState(null, identifiable.getIdentifier(), entry);
}
return new ApplyState(null, null, entry);
return this;
}
- if (message instanceof RaftRPC) {
- RaftRPC rpc = (RaftRPC) message;
- // If RPC request or response contains term T > currentTerm:
- // set currentTerm = T, convert to follower (§5.1)
- // This applies to all RPC messages and responses
- if (rpc.getTerm() > context.getTermInformation().getCurrentTerm() && shouldUpdateTerm(rpc)) {
- log.info("{}: Term {} in \"{}\" message is greater than leader's term {} - switching to Follower",
- logName(), rpc.getTerm(), rpc, context.getTermInformation().getCurrentTerm());
-
- context.getTermInformation().updateAndPersist(rpc.getTerm(), null);
-
- // This is a special case. Normally when stepping down as leader we don't process and reply to the
- // RaftRPC as per raft. But if we're in the process of transferring leadership and we get a
- // RequestVote, process the RequestVote before switching to Follower. This enables the requesting
- // candidate node to be elected the leader faster and avoids us possibly timing out in the Follower
- // state and starting a new election and grabbing leadership back before the other candidate node can
- // start a new election due to lack of responses. This case would only occur if there isn't a majority
- // of other nodes available that can elect the requesting candidate. Since we're transferring
- // leadership, we should make every effort to get the requesting node elected.
- if (rpc instanceof RequestVote && context.getRaftActorLeadershipTransferCohort() != null) {
- log.debug("{}: Leadership transfer in progress - processing RequestVote", logName());
- super.handleMessage(sender, rpc);
- }
-
- return internalSwitchBehavior(RaftState.Follower);
+ // If RPC request or response contains term T > currentTerm:
+ // set currentTerm = T, convert to follower (§5.1)
+ // This applies to all RPC messages and responses
+ if (message instanceof RaftRPC rpc && rpc.getTerm() > context.getTermInformation().getCurrentTerm()
+ && shouldUpdateTerm(rpc)) {
+
+ log.info("{}: Term {} in \"{}\" message is greater than leader's term {} - switching to Follower",
+ logName(), rpc.getTerm(), rpc, context.getTermInformation().getCurrentTerm());
+
+ context.getTermInformation().updateAndPersist(rpc.getTerm(), null);
+
+ // This is a special case. Normally when stepping down as leader we don't process and reply to the
+ // RaftRPC as per raft. But if we're in the process of transferring leadership and we get a
+ // RequestVote, process the RequestVote before switching to Follower. This enables the requesting
+ // candidate node to be elected the leader faster and avoids us possibly timing out in the Follower
+ // state and starting a new election and grabbing leadership back before the other candidate node can
+ // start a new election due to lack of responses. This case would only occur if there isn't a majority
+ // of other nodes available that can elect the requesting candidate. Since we're transferring
+ // leadership, we should make every effort to get the requesting node elected.
+ if (rpc instanceof RequestVote requestVote && context.getRaftActorLeadershipTransferCohort() != null) {
+ log.debug("{}: Leadership transfer in progress - processing RequestVote", logName());
+ requestVote(sender, requestVote);
}
+
+ return internalSwitchBehavior(RaftState.Follower);
}
if (message instanceof SendHeartBeat) {
beforeSendHeartbeat();
sendHeartBeat();
scheduleHeartBeat(context.getConfigParams().getHeartBeatInterval());
- } else if (message instanceof SendInstallSnapshot) {
- SendInstallSnapshot sendInstallSnapshot = (SendInstallSnapshot) message;
+ } else if (message instanceof SendInstallSnapshot sendInstallSnapshot) {
setSnapshotHolder(new SnapshotHolder(sendInstallSnapshot.getSnapshot(),
sendInstallSnapshot.getSnapshotBytes()));
sendInstallSnapshot();
- } else if (message instanceof Replicate) {
- replicate((Replicate) message);
- } else if (message instanceof InstallSnapshotReply) {
- handleInstallSnapshotReply((InstallSnapshotReply) message);
+ } else if (message instanceof Replicate replicate) {
+ replicate(replicate);
+ } else if (message instanceof InstallSnapshotReply installSnapshotReply) {
+ handleInstallSnapshotReply(installSnapshotReply);
} else if (message instanceof CheckConsensusReached) {
possiblyUpdateCommitIndex();
} else {
if (installSnapshotState.isLastChunk(reply.getChunkIndex())) {
//this was the last chunk reply
- long followerMatchIndex = snapshotHolder.get().getLastIncludedIndex();
+ long followerMatchIndex = snapshotHolder.orElseThrow().getLastIncludedIndex();
followerLogInformation.setMatchIndex(followerMatchIndex);
followerLogInformation.setNextIndex(followerMatchIndex + 1);
followerLogInformation.clearLeaderInstallSnapshotState();
// If the first entry's size exceeds the max data size threshold, it will be returned from the call above. If
// that is the case, then we need to slice it into smaller chunks.
- if (entries.size() != 1 || entries.get(0).getData().size() <= maxDataSize) {
+ if (entries.size() != 1 || entries.get(0).getData().serializedSize() <= maxDataSize) {
// Don't need to slice.
return entries;
}
try {
// Ensure the snapshot bytes are set - this is a no-op.
- installSnapshotState.setSnapshotBytes(snapshotHolder.get().getSnapshotBytes());
+ installSnapshotState.setSnapshotBytes(snapshotHolder.orElseThrow().getSnapshotBytes());
if (!installSnapshotState.canSendNextChunk()) {
return;
} catch (IOException e) {
log.warn("{}: Unable to send chunk: {}/{}. Reseting snapshot progress. Snapshot state: {}", logName(),
installSnapshotState.getChunkIndex(), installSnapshotState.getTotalChunks(),
- installSnapshotState);
+ installSnapshotState, e);
installSnapshotState.reset();
}
}
installSnapshotState.startChunkTimer();
followerActor.tell(
new InstallSnapshot(currentTerm(), context.getId(),
- snapshotHolder.get().getLastIncludedIndex(),
- snapshotHolder.get().getLastIncludedTerm(),
+ snapshotHolder.orElseThrow().getLastIncludedIndex(),
+ snapshotHolder.orElseThrow().getLastIncludedTerm(),
snapshotChunk,
chunkIndex,
installSnapshotState.getTotalChunks(),
OptionalInt.of(installSnapshotState.getLastChunkHashCode()),
- serverConfig
- ).toSerializable(followerLogInfo.getRaftVersion()),
+ serverConfig,
+ followerLogInfo.getRaftVersion()),
actor()
);
}