if (snapshotSupport.handleSnapshotMessage(message, getSender())) {
return;
}
- if (message instanceof ApplyState) {
- ApplyState applyState = (ApplyState) message;
-
+ if (message instanceof ApplyState applyState) {
if (!hasFollowers()) {
// for single node, the capture should happen after the apply state
// as we delete messages from the persistent journal which have made it to the snapshot
}
possiblyHandleBehaviorMessage(message);
- } else if (message instanceof ApplyJournalEntries) {
- ApplyJournalEntries applyEntries = (ApplyJournalEntries) message;
+ } else if (message instanceof ApplyJournalEntries applyEntries) {
LOG.debug("{}: Persisting ApplyJournalEntries with index={}", persistenceId(), applyEntries.getToIndex());
persistence().persistAsync(applyEntries, NoopProcedure.instance());
onGetOnDemandRaftStats();
} else if (message instanceof InitiateCaptureSnapshot) {
captureSnapshot();
- } else if (message instanceof SwitchBehavior) {
- switchBehavior((SwitchBehavior) message);
- } else if (message instanceof LeaderTransitioning) {
- onLeaderTransitioning((LeaderTransitioning)message);
+ } else if (message instanceof SwitchBehavior switchBehavior) {
+ switchBehavior(switchBehavior);
+ } else if (message instanceof LeaderTransitioning leaderTransitioning) {
+ onLeaderTransitioning(leaderTransitioning);
} else if (message instanceof Shutdown) {
onShutDown();
- } else if (message instanceof Runnable) {
- ((Runnable)message).run();
- } else if (message instanceof NoopPayload) {
- persistData(null, null, (NoopPayload) message, false);
- } else if (message instanceof RequestLeadership) {
- onRequestLeadership((RequestLeadership) message);
+ } else if (message instanceof Runnable runnable) {
+ runnable.run();
+ } else if (message instanceof NoopPayload noopPayload) {
+ persistData(null, null, noopPayload, false);
+ } else if (message instanceof RequestLeadership requestLeadership) {
+ onRequestLeadership(requestLeadership);
} else if (!possiblyHandleBehaviorMessage(message)) {
- if (message instanceof JournalProtocol.Response
- && delegatingPersistenceProvider.handleJournalResponse((JournalProtocol.Response) message)) {
+ if (message instanceof JournalProtocol.Response response
+ && delegatingPersistenceProvider.handleJournalResponse(response)) {
LOG.debug("{}: handled a journal response", persistenceId());
- } else if (message instanceof SnapshotProtocol.Response
- && delegatingPersistenceProvider.handleSnapshotResponse((SnapshotProtocol.Response) message)) {
+ } else if (message instanceof SnapshotProtocol.Response response
+ && delegatingPersistenceProvider.handleSnapshotResponse(response)) {
LOG.debug("{}: handled a snapshot response", persistenceId());
} else {
handleNonRaftCommand(message);
builder.lastLogTerm(lastLogEntry.getTerm());
}
- if (getCurrentBehavior() instanceof AbstractLeader) {
- AbstractLeader leader = (AbstractLeader)getCurrentBehavior();
+ if (getCurrentBehavior() instanceof AbstractLeader leader) {
Collection<String> followerIds = leader.getFollowerIds();
List<FollowerInfo> followerInfoList = new ArrayList<>(followerIds.size());
- for (String id: followerIds) {
+ for (String id : followerIds) {
final FollowerLogInformation info = leader.getFollower(id);
followerInfoList.add(new FollowerInfo(id, info.getNextIndex(), info.getMatchIndex(),
info.isFollowerActive(), DurationFormatUtils.formatDurationHMS(
import org.opendaylight.controller.cluster.DataPersistenceProvider;
import org.opendaylight.controller.cluster.DelegatingPersistentDataProvider;
import org.opendaylight.controller.cluster.PersistentDataProvider;
-import org.opendaylight.controller.cluster.raft.messages.Payload;
import org.opendaylight.controller.cluster.raft.messages.PersistentPayload;
/**
}
private <T> void doPersist(final T entry, final Procedure<T> procedure, final boolean async) {
- if (getDelegate().isRecoveryApplicable()) {
- persistSuper(entry, procedure, async);
- } else {
- if (entry instanceof ReplicatedLogEntry) {
- Payload payload = ((ReplicatedLogEntry)entry).getData();
- if (payload instanceof PersistentPayload) {
- // We persist the Payload but not the ReplicatedLogEntry to avoid gaps in the journal indexes
- // on recovery if data persistence is later enabled.
- if (async) {
- persistentProvider.persistAsync(payload, p -> procedure.apply(entry));
- } else {
- persistentProvider.persist(payload, p -> procedure.apply(entry));
- }
- } else {
- persistSuper(entry, procedure, async);
- }
+ if (!getDelegate().isRecoveryApplicable() && entry instanceof ReplicatedLogEntry replicatedLogEntry
+ && replicatedLogEntry.getData() instanceof PersistentPayload payload) {
+ // We persist the Payload but not the ReplicatedLogEntry to avoid gaps in the journal indexes on recovery
+ // if data persistence is later enabled.
+ if (async) {
+ persistentProvider.persistAsync(payload, p -> procedure.apply(entry));
} else {
- persistSuper(entry, procedure, async);
+ persistentProvider.persist(payload, p -> procedure.apply(entry));
}
- }
- }
-
- private <T> void persistSuper(final T object, final Procedure<T> procedure, final boolean async) {
- if (async) {
- super.persistAsync(object, procedure);
+ } else if (async) {
+ super.persistAsync(entry, procedure);
} else {
- super.persist(object, procedure);
+ super.persist(entry, procedure);
}
}
}
void doTransfer() {
RaftActorBehavior behavior = raftActor.getCurrentBehavior();
// Sanity check...
- if (behavior instanceof Leader) {
+ if (behavior instanceof Leader leader) {
isTransferring = true;
- ((Leader)behavior).transferLeadership(this);
+ leader.transferLeadership(this);
} else {
LOG.debug("{}: No longer the leader - skipping transfer", raftActor.persistenceId());
finish(true);
}
boolean recoveryComplete = false;
- if (message instanceof UpdateElectionTerm) {
- context.getTermInformation().update(((UpdateElectionTerm) message).getCurrentTerm(),
- ((UpdateElectionTerm) message).getVotedFor());
- } else if (message instanceof SnapshotOffer) {
- onRecoveredSnapshot((SnapshotOffer) message);
- } else if (message instanceof ReplicatedLogEntry) {
- onRecoveredJournalLogEntry((ReplicatedLogEntry) message);
- } else if (message instanceof ApplyJournalEntries) {
- onRecoveredApplyLogEntries(((ApplyJournalEntries) message).getToIndex());
- } else if (message instanceof DeleteEntries) {
- onDeleteEntries((DeleteEntries) message);
- } else if (message instanceof ServerConfigurationPayload) {
- context.updatePeerIds((ServerConfigurationPayload)message);
+ if (message instanceof UpdateElectionTerm updateElectionTerm) {
+ context.getTermInformation().update(updateElectionTerm.getCurrentTerm(), updateElectionTerm.getVotedFor());
+ } else if (message instanceof SnapshotOffer snapshotOffer) {
+ onRecoveredSnapshot(snapshotOffer);
+ } else if (message instanceof ReplicatedLogEntry replicatedLogEntry) {
+ onRecoveredJournalLogEntry(replicatedLogEntry);
+ } else if (message instanceof ApplyJournalEntries applyJournalEntries) {
+ onRecoveredApplyLogEntries(applyJournalEntries.getToIndex());
+ } else if (message instanceof DeleteEntries deleteEntries) {
+ onDeleteEntries(deleteEntries);
+ } else if (message instanceof ServerConfigurationPayload serverConfigurationPayload) {
+ context.updatePeerIds(serverConfigurationPayload);
} else if (message instanceof RecoveryCompleted) {
recoveryComplete = true;
onRecoveryCompletedMessage(persistentProvider);
RaftActorServerConfigurationSupport(final RaftActor raftActor) {
this.raftActor = raftActor;
- this.raftContext = raftActor.getRaftActorContext();
+ raftContext = raftActor.getRaftActorContext();
}
boolean handleMessage(final Object message, final ActorRef sender) {
- if (message instanceof AddServer) {
- onAddServer((AddServer) message, sender);
+ if (message instanceof AddServer addServer) {
+ onAddServer(addServer, sender);
return true;
- } else if (message instanceof RemoveServer) {
- onRemoveServer((RemoveServer) message, sender);
+ } else if (message instanceof RemoveServer removeServer) {
+ onRemoveServer(removeServer, sender);
return true;
- } else if (message instanceof ChangeServersVotingStatus) {
- onChangeServersVotingStatus((ChangeServersVotingStatus) message, sender);
+ } else if (message instanceof ChangeServersVotingStatus changeServersVotingStatus) {
+ onChangeServersVotingStatus(changeServersVotingStatus, sender);
return true;
- } else if (message instanceof ServerOperationTimeout) {
- currentOperationState.onServerOperationTimeout((ServerOperationTimeout) message);
+ } else if (message instanceof ServerOperationTimeout serverOperationTimeout) {
+ currentOperationState.onServerOperationTimeout(serverOperationTimeout);
return true;
- } else if (message instanceof UnInitializedFollowerSnapshotReply) {
- currentOperationState.onUnInitializedFollowerSnapshotReply((UnInitializedFollowerSnapshotReply) message);
+ } else if (message instanceof UnInitializedFollowerSnapshotReply uninitFollowerSnapshotReply) {
+ currentOperationState.onUnInitializedFollowerSnapshotReply(uninitFollowerSnapshotReply);
return true;
- } else if (message instanceof ApplyState) {
- return onApplyState((ApplyState) message);
+ } else if (message instanceof ApplyState applyState) {
+ return onApplyState(applyState);
} else if (message instanceof SnapshotComplete) {
currentOperationState.onSnapshotComplete();
return false;
}
raftContext.updatePeerIds(new ServerConfigurationPayload(newServerInfoList));
- if (raftActor.getCurrentBehavior() instanceof AbstractLeader) {
- AbstractLeader leader = (AbstractLeader) raftActor.getCurrentBehavior();
+ if (raftActor.getCurrentBehavior() instanceof AbstractLeader leader) {
leader.updateMinReplicaCount();
}
}
boolean handleSnapshotMessage(final Object message, final ActorRef sender) {
- if (message instanceof ApplySnapshot) {
- onApplySnapshot((ApplySnapshot) message);
- } else if (message instanceof SaveSnapshotSuccess) {
- onSaveSnapshotSuccess((SaveSnapshotSuccess) message);
- } else if (message instanceof SaveSnapshotFailure) {
- onSaveSnapshotFailure((SaveSnapshotFailure) message);
- } else if (message instanceof CaptureSnapshotReply) {
- onCaptureSnapshotReply((CaptureSnapshotReply) message);
+ if (message instanceof ApplySnapshot applySnapshot) {
+ onApplySnapshot(applySnapshot);
+ } else if (message instanceof SaveSnapshotSuccess saveSnapshotSuccess) {
+ onSaveSnapshotSuccess(saveSnapshotSuccess);
+ } else if (message instanceof SaveSnapshotFailure saveSnapshotFailure) {
+ onSaveSnapshotFailure(saveSnapshotFailure);
+ } else if (message instanceof CaptureSnapshotReply captureSnapshotReply) {
+ onCaptureSnapshotReply(captureSnapshotReply);
} else if (COMMIT_SNAPSHOT.equals(message)) {
context.getSnapshotManager().commit(-1, -1);
- } else if (message instanceof GetSnapshot) {
- onGetSnapshot(sender, (GetSnapshot) message);
+ } else if (message instanceof GetSnapshot getSnapshot) {
+ onGetSnapshot(sender, getSnapshot);
} else if (message instanceof SnapshotComplete) {
log.debug("{}: SnapshotComplete received", context.getId());
} else {
// and became the leader again,. We still want to apply this as a local modification because
// we have resumed leadership with that log entry having been committed.
final Payload payload = entry.getData();
- if (payload instanceof IdentifiablePayload) {
- return new ApplyState(null, ((IdentifiablePayload<?>) payload).getIdentifier(), entry);
+ if (payload instanceof IdentifiablePayload<?> identifiable) {
+ return new ApplyState(null, identifiable.getIdentifier(), entry);
}
return new ApplyState(null, null, entry);
// start a new election due to lack of responses. This case would only occur if there isn't a majority
// of other nodes available that can elect the requesting candidate. Since we're transferring
// leadership, we should make every effort to get the requesting node elected.
- if (rpc instanceof RequestVote && context.getRaftActorLeadershipTransferCohort() != null) {
+ if (rpc instanceof RequestVote requestVote && context.getRaftActorLeadershipTransferCohort() != null) {
log.debug("{}: Leadership transfer in progress - processing RequestVote", logName());
- super.handleMessage(sender, rpc);
+ requestVote(sender, requestVote);
}
return internalSwitchBehavior(RaftState.Follower);
setSnapshotHolder(new SnapshotHolder(sendInstallSnapshot.getSnapshot(),
sendInstallSnapshot.getSnapshotBytes()));
sendInstallSnapshot();
- } else if (message instanceof Replicate) {
- replicate((Replicate) message);
- } else if (message instanceof InstallSnapshotReply) {
- handleInstallSnapshotReply((InstallSnapshotReply) message);
+ } else if (message instanceof Replicate replicate) {
+ replicate(replicate);
+ } else if (message instanceof InstallSnapshotReply installSnapshotReply) {
+ handleInstallSnapshotReply(installSnapshotReply);
} else if (message instanceof CheckConsensusReached) {
possiblyUpdateCommitIndex();
} else {
}
public static RaftActorBehavior createBehavior(final RaftActorContext context, final RaftState state) {
- switch (state) {
- case Candidate:
- return new Candidate(context);
- case Follower:
- return new Follower(context);
- case IsolatedLeader:
- return new IsolatedLeader(context);
- case Leader:
- return new Leader(context);
- case PreLeader:
- return new PreLeader(context);
- default:
- throw new IllegalArgumentException("Unhandled state " + state);
- }
+ return switch (state) {
+ case Candidate -> new Candidate(context);
+ case Follower -> new Follower(context);
+ case IsolatedLeader -> new IsolatedLeader(context);
+ case Leader -> new Leader(context);
+ case PreLeader -> new PreLeader(context);
+ };
}
@Override
@Override
public RaftActorBehavior handleMessage(final ActorRef sender, final Object message) {
- if (message instanceof AppendEntries) {
- return appendEntries(sender, (AppendEntries) message);
- } else if (message instanceof AppendEntriesReply) {
- return handleAppendEntriesReply(sender, (AppendEntriesReply) message);
- } else if (message instanceof RequestVote) {
- return requestVote(sender, (RequestVote) message);
- } else if (message instanceof RequestVoteReply) {
- return handleRequestVoteReply(sender, (RequestVoteReply) message);
+ if (message instanceof AppendEntries appendEntries) {
+ return appendEntries(sender, appendEntries);
+ } else if (message instanceof AppendEntriesReply appendEntriesReply) {
+ return handleAppendEntriesReply(sender, appendEntriesReply);
+ } else if (message instanceof RequestVote requestVote) {
+ return requestVote(sender, requestVote);
+ } else if (message instanceof RequestVoteReply requestVoteReply) {
+ return handleRequestVoteReply(sender, requestVoteReply);
} else {
return null;
}
return this;
}
- log.info("{} :- Switching from behavior {} to {}, election term: {}", logName(), this.state(),
+ log.info("{} :- Switching from behavior {} to {}, election term: {}", logName(), state(),
newBehavior.state(), context.getTermInformation().getCurrentTerm());
try {
close();
} catch (RuntimeException e) {
- log.error("{}: Failed to close behavior : {}", logName(), this.state(), e);
+ log.error("{}: Failed to close behavior : {}", logName(), state(), e);
}
return newBehavior;
}
// Check whether we should update the term. In case of half-connected nodes, we want to ignore RequestVote
// messages, as the candidate is not able to receive our response.
protected boolean shouldUpdateTerm(final RaftRPC rpc) {
- if (!(rpc instanceof RequestVote)) {
+ if (!(rpc instanceof RequestVote requestVote)) {
return true;
}
- final RequestVote requestVote = (RequestVote) rpc;
log.debug("{}: Found higher term in RequestVote rpc, verifying whether it's safe to update term.", logName());
final Optional<Cluster> maybeCluster = context.getCluster();
if (!maybeCluster.isPresent()) {
* @author Thomas Pantelis
*/
public final class ServerInfo {
- private final String id;
+ private final @NonNull String id;
private final boolean isVoting;
- public ServerInfo(@NonNull String id, boolean isVoting) {
+ public ServerInfo(final @NonNull String id, final boolean isVoting) {
this.id = requireNonNull(id);
this.isVoting = isVoting;
}
}
@Override
- public boolean equals(Object obj) {
- if (this == obj) {
- return true;
- }
- if (!(obj instanceof ServerInfo)) {
- return false;
- }
-
- final ServerInfo other = (ServerInfo) obj;
- return isVoting == other.isVoting && id.equals(other.id);
+ public boolean equals(final Object obj) {
+ return this == obj || obj instanceof ServerInfo other && isVoting == other.isVoting && id.equals(other.id);
}
@Override
*/
package org.opendaylight.controller.cluster.raft.persisted;
-import static com.google.common.base.Preconditions.checkArgument;
import static java.util.Objects.requireNonNull;
import akka.actor.ExtendedActorSystem;
}
@Override
- public byte[] toBinary(Object obj) {
- checkArgument(obj instanceof SimpleReplicatedLogEntry, "Unsupported object type %s", obj.getClass());
+ public byte[] toBinary(final Object obj) {
+ if (!(obj instanceof SimpleReplicatedLogEntry replicatedLogEntry)) {
+ throw new IllegalArgumentException("Unsupported object type " + obj.getClass());
+ }
- SimpleReplicatedLogEntry replicatedLogEntry = (SimpleReplicatedLogEntry)obj;
final int estimatedSerializedSize = replicatedLogEntry.serializedSize();
final ByteArrayOutputStream bos = new ByteArrayOutputStream(estimatedSerializedSize);
}
@Override
- public Object fromBinaryJava(byte[] bytes, Class<?> manifest) {
+ public Object fromBinaryJava(final byte[] bytes, final Class<?> manifest) {
try (ClassLoaderObjectInputStream is = new ClassLoaderObjectInputStream(system.dynamicAccess().classLoader(),
new ByteArrayInputStream(bytes))) {
return is.readObject();