<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-checkstyle-plugin</artifactId>
<configuration>
- <excludes>**/protobuff/**/*,**/target/**/*</excludes>
+ <propertyExpansion>checkstyle.violationSeverity=error</propertyExpansion>
</configuration>
</plugin>
* <p/>
* Any component using this implementation might want to provide an implementation of
* this interface to configure
- *
+ * <p/>
* A default implementation will be used if none is provided.
*
* @author Kamal Rameshan
FiniteDuration getElectionTimeOutInterval();
/**
- * Returns the maximum election time variance. The election is scheduled using both the election timeout and variance.
+ * Returns the maximum election time variance. The election is scheduled using both the election timeout
+ * and variance.
*
* @return the election time variance.
*/
/**
* Returns the RaftPolicy used to determine certain Raft behaviors.
*
- * @return an instance of org.opendaylight.controller.cluster.raft.policy.RaftPolicy, if set, or an instance of the
- * DefaultRaftPolicy.
+ * @return an instance of RaftPolicy, if set, or an instance of the DefaultRaftPolicy.
*/
@Nonnull
RaftPolicy getRaftPolicy();
import scala.concurrent.duration.FiniteDuration;
/**
- * Default implementation of the ConfigParams
- *
- * If no implementation is provided for ConfigParams, then this will be used.
+ * Default implementation of the ConfigParams.
*/
public class DefaultConfigParamsImpl implements ConfigParams {
private static final int JOURNAL_RECOVERY_LOG_BATCH_SIZE = 1000;
/**
- * The maximum election time variance
+ * The maximum election time variance.
*/
private static final int ELECTION_TIME_MAX_VARIANCE = 100;
/**
* The interval at which a heart beat message will be sent to the remote
- * RaftActor
+ * RaftActor.
* <p/>
* Since this is set to 100 milliseconds the Election timeout should be
* at least 200 milliseconds
this.snapshotBatchCount = snapshotBatchCount;
}
- public void setSnapshotDataThresholdPercentage(int snapshotDataThresholdPercentage){
+ public void setSnapshotDataThresholdPercentage(int snapshotDataThresholdPercentage) {
this.snapshotDataThresholdPercentage = snapshotDataThresholdPercentage;
}
this.isolatedLeaderCheckInterval = isolatedLeaderCheckInterval.toMillis();
}
- public void setElectionTimeoutFactor(long electionTimeoutFactor){
+ public void setElectionTimeoutFactor(long electionTimeoutFactor) {
this.electionTimeoutFactor = electionTimeoutFactor;
electionTimeOutInterval = null;
}
- public void setCustomRaftPolicyImplementationClass(String customRaftPolicyImplementationClass){
+ public void setCustomRaftPolicyImplementationClass(String customRaftPolicyImplementationClass) {
this.customRaftPolicyImplementationClass = customRaftPolicyImplementationClass;
}
@Override
public FiniteDuration getElectionTimeOutInterval() {
- if(electionTimeOutInterval == null) {
+ if (electionTimeOutInterval == null) {
electionTimeOutInterval = getHeartBeatInterval().$times(electionTimeoutFactor);
}
return policySupplier.get();
}
- private class PolicySupplier implements Supplier<RaftPolicy>{
+ private class PolicySupplier implements Supplier<RaftPolicy> {
@Override
+ @SuppressWarnings("checkstyle:IllegalCatch")
public RaftPolicy get() {
- if(Strings.isNullOrEmpty(DefaultConfigParamsImpl.this.customRaftPolicyImplementationClass)){
+ if (Strings.isNullOrEmpty(DefaultConfigParamsImpl.this.customRaftPolicyImplementationClass)) {
LOG.debug("No custom RaftPolicy specified. Using DefaultRaftPolicy");
return DefaultRaftPolicy.INSTANCE;
}
+
try {
String className = DefaultConfigParamsImpl.this.customRaftPolicyImplementationClass;
LOG.info("Trying to use custom RaftPolicy {}", className);
- Class<?> c = Class.forName(className);
- return (RaftPolicy)c.newInstance();
+ return (RaftPolicy)Class.forName(className).newInstance();
} catch (Exception e) {
- if(LOG.isDebugEnabled()) {
+ if (LOG.isDebugEnabled()) {
LOG.error("Could not create custom raft policy, will stick with default", e);
} else {
- LOG.error("Could not create custom raft policy, will stick with default : cause = {}", e.getMessage());
+ LOG.error("Could not create custom raft policy, will stick with default : cause = {}",
+ e.getMessage());
}
}
return DefaultRaftPolicy.INSTANCE;
/**
* ElectionTerm contains information about a RaftActors election term.
- * <p>
+ * <p/>
* This information includes the last known current term of the RaftActor
* and which candidate was voted for by the RaftActor in that term.
- * <p>
+ * <p/>
* This class ensures that election term information is persisted.
*/
public interface ElectionTerm {
return true;
}
- private void resetLastReplicated(){
+ private void resetLastReplicated() {
lastReplicatedIndex = getNextIndex();
if (lastReplicatedStopwatch.isRunning()) {
lastReplicatedStopwatch.reset();
}
@VisibleForTesting
+ @SuppressWarnings("checkstyle:IllegalCatch")
protected void changeCurrentBehavior(RaftActorBehavior newBehavior) {
final RaftActorBehavior currentBehavior = getCurrentBehavior();
if (currentBehavior != null) {
}
/**
+ * Handles a message.
+ *
* @deprecated This method is not final for testing purposes. DO NOT OVERRIDE IT, override
* {@link #handleNonRaftCommand(Object)} instead.
*/
private void handleBehaviorChange(BehaviorState oldBehaviorState, RaftActorBehavior currentBehavior) {
RaftActorBehavior oldBehavior = oldBehaviorState.getBehavior();
- if (oldBehavior != currentBehavior){
+ if (oldBehavior != currentBehavior) {
onStateChanged();
}
/**
* Returns the reference to the RaftActor.
*
- * @return A reference to the RaftActor itself. This could be used to send messages
- * to the RaftActor
+ * @return the reference to the RaftActor itself. This can be used to send messages to the RaftActor
*/
ActorRef getActor();
/**
* Returns the PeerInfo for the given peer.
*
- * @param peerId
- * @return the PeerInfo or null if not found.
+ * @param peerId the id of the peer
+ * @return the PeerInfo or null if not found
*/
@Nullable
PeerInfo getPeerInfo(String peerId);
this.persistenceProvider = persistenceProvider;
this.log = logger;
- for(Map.Entry<String, String> e: peerAddresses.entrySet()) {
+ for (Map.Entry<String, String> e: peerAddresses.entrySet()) {
peerInfoMap.put(e.getKey(), new PeerInfo(e.getKey(), e.getValue(), VotingState.VOTING));
}
}
}
@Override
- public ActorRef actorOf(Props props){
+ public ActorRef actorOf(Props props) {
return context.actorOf(props);
}
@Override
- public ActorSelection actorSelection(String path){
+ public ActorSelection actorSelection(String path) {
return context.actorSelection(path);
}
}
@Override
+ @SuppressWarnings("checkstyle:IllegalCatch")
public Optional<Cluster> getCluster() {
- if(cluster == null) {
+ if (cluster == null) {
try {
cluster = Optional.of(Cluster.get(getActorSystem()));
- } catch(Exception e) {
+ } catch (Exception e) {
// An exception means there's no cluster configured. This will only happen in unit tests.
log.debug("{}: Could not obtain Cluster: {}", getId(), e);
cluster = Optional.empty();
public String getPeerAddress(String peerId) {
String peerAddress;
PeerInfo peerInfo = peerInfoMap.get(peerId);
- if(peerInfo != null) {
+ if (peerInfo != null) {
peerAddress = peerInfo.getAddress();
- if(peerAddress == null) {
+ if (peerAddress == null) {
peerAddress = configParams.getPeerAddressResolver().resolve(peerId);
peerInfo.setAddress(peerAddress);
}
}
@Override
- public void updatePeerIds(ServerConfigurationPayload serverConfig){
+ public void updatePeerIds(ServerConfigurationPayload serverConfig) {
votingMember = true;
boolean foundSelf = false;
Set<String> currentPeers = new HashSet<>(this.getPeerIds());
- for(ServerInfo server: serverConfig.getServerConfig()) {
- if(getId().equals(server.getId())) {
+ for (ServerInfo server : serverConfig.getServerConfig()) {
+ if (getId().equals(server.getId())) {
foundSelf = true;
- if(!server.isVoting()) {
+ if (!server.isVoting()) {
votingMember = false;
}
} else {
- VotingState votingState = server.isVoting() ? VotingState.VOTING: VotingState.NON_VOTING;
- if(!currentPeers.contains(server.getId())) {
+ VotingState votingState = server.isVoting() ? VotingState.VOTING : VotingState.NON_VOTING;
+ if (!currentPeers.contains(server.getId())) {
this.addToPeers(server.getId(), null, votingState);
} else {
this.getPeerInfo(server.getId()).setVotingState(votingState);
}
}
- for(String peerIdToRemove: currentPeers) {
+ for (String peerIdToRemove : currentPeers) {
this.removePeer(peerIdToRemove);
}
- if(!foundSelf) {
+ if (!foundSelf) {
votingMember = false;
}
@Override
public void removePeer(String name) {
- if(getId().equals(name)) {
+ if (getId().equals(name)) {
votingMember = false;
} else {
peerInfoMap.remove(name);
@Override public ActorSelection getPeerActorSelection(String peerId) {
String peerAddress = getPeerAddress(peerId);
- if(peerAddress != null){
+ if (peerAddress != null) {
return actorSelection(peerAddress);
}
return null;
@Override
public void setPeerAddress(String peerId, String peerAddress) {
PeerInfo peerInfo = peerInfoMap.get(peerId);
- if(peerInfo != null) {
+ if (peerInfo != null) {
log.info("Peer address for peer {} set to {}", peerId, peerAddress);
peerInfo.setAddress(peerAddress);
}
@Override
public SnapshotManager getSnapshotManager() {
- if(snapshotManager == null){
+ if (snapshotManager == null) {
snapshotManager = new SnapshotManager(this, log);
}
return snapshotManager;
}
Collection<PeerInfo> peers = getPeers();
List<ServerInfo> newConfig = new ArrayList<>(peers.size() + 1);
- for(PeerInfo peer: peers) {
+ for (PeerInfo peer: peers) {
newConfig.add(new ServerInfo(peer.getId(), peer.isVoting()));
}
- if(includeSelf) {
+ if (includeSelf) {
newConfig.add(new ServerInfo(getId(), votingMember));
}
@Override
public boolean anyVotingPeers() {
- if(numVotingPeers < 0) {
+ if (numVotingPeers < 0) {
numVotingPeers = 0;
- for(PeerInfo info: getPeers()) {
- if(info.isVoting()) {
+ for (PeerInfo info: getPeers()) {
+ if (info.isVoting()) {
numVotingPeers++;
}
}
this.currentBehavior = Preconditions.checkNotNull(behavior);
}
+ @SuppressWarnings("checkstyle:IllegalCatch")
void close() {
if (currentBehavior != null) {
try {
/**
* A raft actor support class that participates in leadership transfer. An instance is created upon
* initialization of leadership transfer.
- * <p>
+ * <p/>
* The transfer process is as follows:
* <ol>
* <li>Send a LeaderStateChanged message with a null leader Id to the local RoleChangeNotifier to notify
* possibly complete work that was suspended while we were transferring.</li>
* <li>On notification of the new leader from the RaftActor or on time out, notify {@link OnComplete} callbacks.</li>
* </ol>
- * <p>
+ * <p/>
* NOTE: All methods on this class must be called on the actor's thread dispatcher as they may access/modify
* internal state.
*
transferTimer.start();
Optional<ActorRef> roleChangeNotifier = raftActor.getRoleChangeNotifier();
- if(roleChangeNotifier.isPresent()) {
+ if (roleChangeNotifier.isPresent()) {
roleChangeNotifier.get().tell(raftActor.newLeaderStateChanged(context.getId(), null,
currentBehavior.getLeaderPayloadVersion()), raftActor.self());
}
- for(String peerId: context.getPeerIds()) {
+ for (String peerId: context.getPeerIds()) {
ActorSelection followerActor = context.getPeerActorSelection(peerId);
- if(followerActor != null) {
+ if (followerActor != null) {
followerActor.tell(LeaderTransitioning.INSTANCE, context.getActor());
}
}
void doTransfer() {
RaftActorBehavior behavior = raftActor.getCurrentBehavior();
// Sanity check...
- if(behavior instanceof Leader) {
+ if (behavior instanceof Leader) {
isTransferring = true;
((Leader)behavior).transferLeadership(this);
} else {
// safely run on the actor's thread dispatcher.
FiniteDuration timeout = FiniteDuration.create(newLeaderTimeoutInMillis, TimeUnit.MILLISECONDS);
newLeaderTimer = raftActor.getContext().system().scheduler().scheduleOnce(timeout, raftActor.self(),
- (Runnable) () -> {
- LOG.debug("{}: leader not elected in time", raftActor.persistenceId());
- finish(true);
- }, raftActor.getContext().system().dispatcher(), raftActor.self());
+ (Runnable) () -> {
+ LOG.debug("{}: leader not elected in time", raftActor.persistenceId());
+ finish(true);
+ }, raftActor.getContext().system().dispatcher(), raftActor.self());
}
void onNewLeader(String newLeader) {
- if(newLeader != null && newLeaderTimer != null) {
+ if (newLeader != null && newLeaderTimer != null) {
LOG.debug("{}: leader changed to {}", raftActor.persistenceId(), newLeader);
newLeaderTimer.cancel();
finish(true);
private void finish(boolean success) {
isTransferring = false;
- if(transferTimer.isRunning()) {
+ if (transferTimer.isRunning()) {
transferTimer.stop();
- if(success) {
+ if (success) {
LOG.info("{}: Successfully transferred leadership to {} in {}", raftActor.persistenceId(),
raftActor.getLeaderId(), transferTimer);
} else {
}
}
- for(OnComplete onComplete: onCompleteCallbacks) {
- if(success) {
+ for (OnComplete onComplete: onCompleteCallbacks) {
+ if (success) {
onComplete.onSuccess(raftActor.self());
} else {
onComplete.onFailure(raftActor.self());
interface OnComplete {
void onSuccess(ActorRef raftActorRef);
+
void onFailure(ActorRef raftActorRef);
}
}
import org.opendaylight.controller.cluster.raft.persisted.UpdateElectionTerm;
import org.opendaylight.controller.cluster.raft.protobuff.client.messages.PersistentPayload;
import org.slf4j.Logger;
+
/**
* Support class that handles persistence recovery for a RaftActor.
*
* @author Thomas Pantelis
*/
-
class RaftActorRecoverySupport {
private final RaftActorContext context;
private final RaftActorRecoveryCohort cohort;
return recoveryComplete;
}
+ @SuppressWarnings("checkstyle:IllegalCatch")
private void possiblyRestoreFromSnapshot() {
byte[] restoreFromSnapshot = cohort.getRestoreFromSnapshot();
if (restoreFromSnapshot == null) {
class RaftActorServerConfigurationSupport {
private static final Logger LOG = LoggerFactory.getLogger(RaftActorServerConfigurationSupport.class);
+ @SuppressWarnings("checkstyle:MemberName")
private final OperationState IDLE = new Idle();
private final RaftActor raftActor;
}
boolean handleMessage(Object message, ActorRef sender) {
- if(message instanceof AddServer) {
+ if (message instanceof AddServer) {
onAddServer((AddServer) message, sender);
return true;
- } else if(message instanceof RemoveServer) {
+ } else if (message instanceof RemoveServer) {
onRemoveServer((RemoveServer) message, sender);
return true;
- } else if(message instanceof ChangeServersVotingStatus) {
+ } else if (message instanceof ChangeServersVotingStatus) {
onChangeServersVotingStatus((ChangeServersVotingStatus) message, sender);
return true;
} else if (message instanceof ServerOperationTimeout) {
} else if (message instanceof UnInitializedFollowerSnapshotReply) {
currentOperationState.onUnInitializedFollowerSnapshotReply((UnInitializedFollowerSnapshotReply) message);
return true;
- } else if(message instanceof ApplyState) {
+ } else if (message instanceof ApplyState) {
return onApplyState((ApplyState) message);
- } else if(message instanceof SnapshotComplete) {
+ } else if (message instanceof SnapshotComplete) {
currentOperationState.onSnapshotComplete();
return false;
} else {
// Therefore, if the local server is currently non-voting and is to be changed to voting and there is
// no current leader, we will try to elect a leader using the new server config in order to replicate
// the change and progress.
- boolean localServerChangingToVoting = Boolean.TRUE.equals(message.
- getServerVotingStatusMap().get(raftActor.getRaftActorContext().getId()));
+ boolean localServerChangingToVoting = Boolean.TRUE.equals(message
+ .getServerVotingStatusMap().get(raftActor.getRaftActorContext().getId()));
boolean hasNoLeader = raftActor.getLeaderId() == null;
- if(localServerChangingToVoting && !raftContext.isVotingMember() && hasNoLeader) {
+ if (localServerChangingToVoting && !raftContext.isVotingMember() && hasNoLeader) {
currentOperationState.onNewOperation(new ChangeServersVotingStatusContext(message, sender, true));
} else {
onNewOperation(new ChangeServersVotingStatusContext(message, sender, false));
private void onRemoveServer(RemoveServer removeServer, ActorRef sender) {
LOG.debug("{}: onRemoveServer: {}, state: {}", raftContext.getId(), removeServer, currentOperationState);
boolean isSelf = removeServer.getServerId().equals(raftContext.getId());
- if(isSelf && !raftContext.hasFollowers()) {
+ if (isSelf && !raftContext.hasFollowers()) {
sender.tell(new RemoveServerReply(ServerChangeStatus.NOT_SUPPORTED, raftActor.getLeaderId()),
raftActor.getSelf());
- } else if(!isSelf && !raftContext.getPeerIds().contains(removeServer.getServerId())) {
+ } else if (!isSelf && !raftContext.getPeerIds().contains(removeServer.getServerId())) {
sender.tell(new RemoveServerReply(ServerChangeStatus.DOES_NOT_EXIST, raftActor.getLeaderId()),
raftActor.getSelf());
} else {
private boolean onApplyState(ApplyState applyState) {
Payload data = applyState.getReplicatedLogEntry().getData();
- if(data instanceof ServerConfigurationPayload) {
+ if (data instanceof ServerConfigurationPayload) {
currentOperationState.onApplyState(applyState);
return true;
}
void onNewLeader(String newLeader) {
}
- protected void persistNewServerConfiguration(ServerOperationContext<?> operationContext){
+ protected void persistNewServerConfiguration(ServerOperationContext<?> operationContext) {
raftContext.setDynamicServerConfigurationInUse();
ServerConfigurationPayload payload = raftContext.getPeerServerInfo(
sendReply(operationContext, ServerChangeStatus.OK);
}
- protected void operationComplete(ServerOperationContext<?> operationContext, @Nullable ServerChangeStatus replyStatus) {
- if(replyStatus != null) {
+ protected void operationComplete(ServerOperationContext<?> operationContext,
+ @Nullable ServerChangeStatus replyStatus) {
+ if (replyStatus != null) {
sendReply(operationContext, replyStatus);
}
currentOperationState = IDLE;
ServerOperationContext<?> nextOperation = pendingOperationsQueue.poll();
- if(nextOperation != null) {
+ if (nextOperation != null) {
RaftActorServerConfigurationSupport.this.onNewOperation(nextOperation);
}
}
protected void sendReply(ServerOperationContext<?> operationContext, ServerChangeStatus status) {
- LOG.debug("{}: Returning {} for operation {}", raftContext.getId(), status, operationContext.getOperation());
+ LOG.debug("{}: Returning {} for operation {}", raftContext.getId(), status,
+ operationContext.getOperation());
operationContext.getClientRequestor().tell(operationContext.newReply(status, raftActor.getLeaderId()),
raftActor.self());
public void onApplyState(ApplyState applyState) {
// Sanity check - we could get an ApplyState from a previous operation that timed out so make
// sure it's meant for us.
- if(operationContext.getContextId().equals(applyState.getIdentifier())) {
+ if (operationContext.getContextId().equals(applyState.getIdentifier())) {
LOG.info("{}: {} has been successfully replicated to a majority of followers", raftContext.getId(),
applyState.getReplicatedLogEntry().getData());
// Fail any pending operations
ServerOperationContext<?> nextOperation = pendingOperationsQueue.poll();
- while(nextOperation != null) {
+ while (nextOperation != null) {
sendReply(nextOperation, ServerChangeStatus.PRIOR_REQUEST_CONSENSUS_TIMEOUT);
nextOperation = pendingOperationsQueue.poll();
}
@Override
public void onNewOperation(ServerOperationContext<?> newOperationContext) {
- if(timedOut) {
+ if (timedOut) {
sendReply(newOperationContext, ServerChangeStatus.PRIOR_REQUEST_CONSENSUS_TIMEOUT);
} else {
super.onNewOperation(newOperationContext);
raftContext.removePeer(serverId);
boolean isLeader = raftActor.isLeader();
- if(isLeader) {
+ if (isLeader) {
AbstractLeader leader = (AbstractLeader) raftActor.getCurrentBehavior();
leader.removeFollower(serverId);
}
- operationComplete(getAddServerContext(), isLeader ? ServerChangeStatus.TIMEOUT : ServerChangeStatus.NO_LEADER);
+ operationComplete(getAddServerContext(), isLeader ? ServerChangeStatus.TIMEOUT
+ : ServerChangeStatus.NO_LEADER);
}
}
@Override
public void initiate() {
- AbstractLeader leader = (AbstractLeader) raftActor.getCurrentBehavior();
+ final AbstractLeader leader = (AbstractLeader) raftActor.getCurrentBehavior();
AddServer addServer = getAddServerContext().getOperation();
LOG.debug("{}: Initiating {}", raftContext.getId(), addServer);
- if(raftContext.getPeerInfo(addServer.getNewServerId()) != null) {
+ if (raftContext.getPeerInfo(addServer.getNewServerId()) != null) {
operationComplete(getAddServerContext(), ServerChangeStatus.ALREADY_EXISTS);
return;
}
leader.addFollower(addServer.getNewServerId());
- if(votingState == VotingState.VOTING_NOT_INITIALIZED){
+ if (votingState == VotingState.VOTING_NOT_INITIALIZED) {
// schedule the install snapshot timeout timer
Cancellable installSnapshotTimer = newInstallSnapshotTimer();
- if(leader.initiateCaptureSnapshot(addServer.getNewServerId())) {
+ if (leader.initiateCaptureSnapshot(addServer.getNewServerId())) {
LOG.debug("{}: Initiating capture snapshot for new server {}", raftContext.getId(),
addServer.getNewServerId());
// Sanity check to guard against receiving an UnInitializedFollowerSnapshotReply from a prior
// add server operation that timed out.
- if(getAddServerContext().getOperation().getNewServerId().equals(followerId) && raftActor.isLeader()) {
+ if (getAddServerContext().getOperation().getNewServerId().equals(followerId) && raftActor.isLeader()) {
AbstractLeader leader = (AbstractLeader) raftActor.getCurrentBehavior();
raftContext.getPeerInfo(followerId).setVotingState(VotingState.VOTING);
leader.updateMinReplicaCount();
public void onSnapshotComplete() {
LOG.debug("{}: onSnapshotComplete", raftContext.getId());
- if(!raftActor.isLeader()) {
+ if (!raftActor.isLeader()) {
LOG.debug("{}: No longer the leader", raftContext.getId());
return;
}
AbstractLeader leader = (AbstractLeader) raftActor.getCurrentBehavior();
- if(leader.initiateCaptureSnapshot(getAddServerContext().getOperation().getNewServerId())) {
+ if (leader.initiateCaptureSnapshot(getAddServerContext().getOperation().getNewServerId())) {
LOG.debug("{}: Initiating capture snapshot for new server {}", raftContext.getId(),
getAddServerContext().getOperation().getNewServerId());
}
}
- private static final class ServerOperationContextIdentifier extends AbstractUUIDIdentifier<ServerOperationContextIdentifier> {
+ private static final class ServerOperationContextIdentifier
+ extends AbstractUUIDIdentifier<ServerOperationContextIdentifier> {
private static final long serialVersionUID = 1L;
ServerOperationContextIdentifier() {
private final ActorRef clientRequestor;
private final Identifier contextId;
- ServerOperationContext(T operation, ActorRef clientRequestor){
+ ServerOperationContext(T operation, ActorRef clientRequestor) {
this.operation = operation;
this.clientRequestor = clientRequestor;
contextId = new ServerOperationContextIdentifier();
}
}
- private final class InitialRemoveServerState extends RemoveServerState implements InitialOperationState{
+ private final class InitialRemoveServerState extends RemoveServerState implements InitialOperationState {
protected InitialRemoveServerState(RemoveServerContext removeServerContext) {
super(removeServerContext);
@Override
void operationComplete(RaftActor raftActor, boolean succeeded) {
- if(peerAddress != null) {
- raftActor.context().actorSelection(peerAddress).tell(new ServerRemoved(getOperation().getServerId()), raftActor.getSelf());
+ if (peerAddress != null) {
+ raftActor.context().actorSelection(peerAddress).tell(
+ new ServerRemoved(getOperation().getServerId()), raftActor.getSelf());
}
}
void operationComplete(final RaftActor raftActor, boolean succeeded) {
// If this leader changed to non-voting we need to step down as leader so we'll try to transfer
// leadership.
- boolean localServerChangedToNonVoting = Boolean.FALSE.equals(getOperation().
- getServerVotingStatusMap().get(raftActor.getRaftActorContext().getId()));
+ boolean localServerChangedToNonVoting = Boolean.FALSE.equals(getOperation()
+ .getServerVotingStatusMap().get(raftActor.getRaftActorContext().getId()));
if (succeeded && localServerChangedToNonVoting) {
LOG.debug("Leader changed to non-voting - trying leadership transfer");
raftActor.becomeNonVoting();
public void initiate() {
LOG.debug("Initiating ChangeServersVotingStatusState");
- if(tryToElectLeader) {
+ if (tryToElectLeader) {
initiateLocalLeaderElection();
- } else if(updateLocalPeerInfo()) {
+ } else if (updateLocalPeerInfo()) {
persistNewServerConfiguration(changeVotingStatusContext);
}
}
LOG.debug("{}: Sending local ElectionTimeout to start leader election", raftContext.getId());
ServerConfigurationPayload previousServerConfig = raftContext.getPeerServerInfo(true);
- if(!updateLocalPeerInfo()) {
+ if (!updateLocalPeerInfo()) {
return;
}
// Check if new voting state would leave us with no voting members.
boolean atLeastOneVoting = false;
- for(ServerInfo info: newServerInfoList) {
- if(info.isVoting()) {
+ for (ServerInfo info: newServerInfoList) {
+ if (info.isVoting()) {
atLeastOneVoting = true;
break;
}
}
- if(!atLeastOneVoting) {
+ if (!atLeastOneVoting) {
operationComplete(changeVotingStatusContext, ServerChangeStatus.INVALID_REQUEST);
return false;
}
raftContext.updatePeerIds(new ServerConfigurationPayload(newServerInfoList));
- if(raftActor.getCurrentBehavior() instanceof AbstractLeader) {
+ if (raftActor.getCurrentBehavior() instanceof AbstractLeader) {
AbstractLeader leader = (AbstractLeader) raftActor.getCurrentBehavior();
leader.updateMinReplicaCount();
}
}
private List<ServerInfo> newServerInfoList() {
- Map<String, Boolean> serverVotingStatusMap = changeVotingStatusContext.getOperation().getServerVotingStatusMap();
+ Map<String, Boolean> serverVotingStatusMap = changeVotingStatusContext.getOperation()
+ .getServerVotingStatusMap();
List<ServerInfo> newServerInfoList = new ArrayList<>();
- for(String peerId: raftContext.getPeerIds()) {
- newServerInfoList.add(new ServerInfo(peerId, serverVotingStatusMap.containsKey(peerId) ?
- serverVotingStatusMap.get(peerId) : raftContext.getPeerInfo(peerId).isVoting()));
+ for (String peerId: raftContext.getPeerIds()) {
+ newServerInfoList.add(new ServerInfo(peerId, serverVotingStatusMap.containsKey(peerId)
+ ? serverVotingStatusMap.get(peerId) : raftContext.getPeerInfo(peerId).isVoting()));
}
newServerInfoList.add(new ServerInfo(raftContext.getId(), serverVotingStatusMap.containsKey(
- raftContext.getId()) ? serverVotingStatusMap.get(raftContext.getId()) : raftContext.isVotingMember()));
+ raftContext.getId()) ? serverVotingStatusMap.get(raftContext.getId())
+ : raftContext.isVotingMember()));
return newServerInfoList;
}
@Override
void onNewLeader(String newLeader) {
- if(newLeader == null) {
+ if (newLeader == null) {
return;
}
timer.cancel();
- if(raftActor.isLeader()) {
+ if (raftActor.isLeader()) {
persistNewServerConfiguration(operationContext);
} else {
// Edge case - some other node became leader so forward the operation.
// tried yet.
Map<String, Boolean> serverVotingStatusMap = operationContext.getOperation().getServerVotingStatusMap();
ActorSelection forwardToPeerActor = null;
- for(Map.Entry<String, Boolean> e: serverVotingStatusMap.entrySet()) {
+ for (Map.Entry<String, Boolean> e: serverVotingStatusMap.entrySet()) {
Boolean isVoting = e.getValue();
String serverId = e.getKey();
PeerInfo peerInfo = raftContext.getPeerInfo(serverId);
- if(isVoting && peerInfo != null && !peerInfo.isVoting() && !serversVisited.contains(serverId)) {
+ if (isVoting && peerInfo != null && !peerInfo.isVoting() && !serversVisited.contains(serverId)) {
ActorSelection actor = raftContext.getPeerActorSelection(serverId);
- if(actor != null) {
+ if (actor != null) {
forwardToPeerActor = actor;
break;
}
}
}
- if(forwardToPeerActor != null) {
+ if (forwardToPeerActor != null) {
LOG.debug("{}: Found server {} to forward to", raftContext.getId(), forwardToPeerActor);
forwardToPeerActor.tell(new ChangeServersVotingStatus(serverVotingStatusMap, serversVisited),
static class ServerOperationTimeout {
private final String loggingContext;
- ServerOperationTimeout(String loggingContext){
- this.loggingContext = Preconditions.checkNotNull(loggingContext, "loggingContext should not be null");
+ ServerOperationTimeout(String loggingContext) {
+ this.loggingContext = Preconditions.checkNotNull(loggingContext, "loggingContext should not be null");
}
String getLoggingContext() {
import javax.annotation.Nullable;
/**
- * Represents the ReplicatedLog that needs to be kept in sync by the RaftActor
+ * Represents the ReplicatedLog that needs to be kept in sync by the RaftActor.
*/
public interface ReplicatedLog {
long NO_MAX_SIZE = -1;
*
* @param index the index of the log entry
* @return the ReplicatedLogEntry if found, otherwise null if the adjusted index less than 0 or
- * greater than the size of the in-memory journal.
+ * greater than the size of the in-memory journal
*/
@Nullable
ReplicatedLogEntry get(long index);
long removeFrom(long index);
/**
- * Removes entries from the in-memory log a nd the persisted log starting at the given index.
- * <p>
+ * Removes entries from the in-memory log and the persisted log starting at the given index.
+ * <p/>
* The persisted information would then be used during recovery to properly
* reconstruct the state of the in-memory replicated log
*
* @param index the index of the first log entry to remove
- * @return
+ * @return true if entries were removed, false otherwise
*/
boolean removeFromAndPersist(long index);
@Nonnull List<ReplicatedLogEntry> getFrom(long index, int maxEntries, long maxDataSize);
/**
+ * Returns the number of entries in the journal.
*
- * @return the number of entries in the journal
+ * @return the number of entries
*/
long size();
/**
- * Checks if the entry at the specified index is present or not
+ * Checks if the entry at the specified index is present or not.
*
* @param index the index of the log entry
* @return true if the entry is present in the in-memory journal
boolean isPresent(long index);
/**
- * Checks if the entry is present in a snapshot
+ * Checks if the entry is present in a snapshot.
*
* @param index the index of the log entry
- * @return true if the entry is in the snapshot. false if the entry is not
- * in the snapshot even if the entry may be present in the replicated log
+ * @return true if the entry is in the snapshot. false if the entry is not in the snapshot even if the entry may
+ * be present in the replicated log
*/
boolean isInSnapshot(long index);
/**
- * Get the index of the snapshot
+ * Returns the index of the snapshot.
*
* @return the index from which the snapshot was created. -1 otherwise.
*/
long getSnapshotIndex();
/**
- * Get the term of the snapshot
+ * Returns the term of the snapshot.
*
- * @return the term of the index from which the snapshot was created. -1
- * otherwise
+ * @return the term of the index from which the snapshot was created. -1 otherwise
*/
long getSnapshotTerm();
/**
- * sets the snapshot index in the replicated log
- * @param snapshotIndex
+ * Sets the snapshot index in the replicated log.
+ *
+ * @param snapshotIndex the index to set
*/
void setSnapshotIndex(long snapshotIndex);
/**
- * sets snapshot term
- * @param snapshotTerm
+ * Sets snapshot term.
+ *
+ * @param snapshotTerm the term to set
*/
void setSnapshotTerm(long snapshotTerm);
/**
- * Clears the journal entries with startIndex(inclusive) and endIndex (exclusive)
- * @param startIndex
- * @param endIndex
+ * Clears the journal entries with startIndex (inclusive) and endIndex (exclusive).
+ *
+ * @param startIndex the start index (inclusive)
+ * @param endIndex the end index (exclusive)
*/
void clear(int startIndex, int endIndex);
/**
- * Handles all the bookkeeping in order to perform a rollback in the
- * event of SaveSnapshotFailure
- * @param snapshotCapturedIndex
- * @param snapshotCapturedTerm
+ * Handles all the bookkeeping in order to perform a rollback in the event of SaveSnapshotFailure.
+ *
+ * @param snapshotCapturedIndex the new snapshot index
+ * @param snapshotCapturedTerm the new snapshot term
*/
void snapshotPreCommit(long snapshotCapturedIndex, long snapshotCapturedTerm);
void snapshotCommit();
/**
- * Restores the replicated log to a state in the event of a save snapshot failure
+ * Restores the replicated log to a state in the event of a save snapshot failure.
*/
void snapshotRollback();
/**
- * Returns the size of the data in the log (in bytes)
+ * Returns the size of the data in the log (in bytes).
*
- * @return the size of the data in the log (in bytes).
+ * @return the size of the data in the log (in bytes)
*/
int dataSize();
private final RaftActorContext context;
private long dataSizeSinceLastSnapshot = 0L;
- private ReplicatedLogImpl(final long snapshotIndex, final long snapshotTerm, final List<ReplicatedLogEntry> unAppliedEntries,
+ private ReplicatedLogImpl(final long snapshotIndex, final long snapshotTerm,
+ final List<ReplicatedLogEntry> unAppliedEntries,
final RaftActorContext context) {
super(snapshotIndex, snapshotTerm, unAppliedEntries, context.getId());
this.context = Preconditions.checkNotNull(context);
public boolean removeFromAndPersist(final long logEntryIndex) {
// FIXME: Maybe this should be done after the command is saved
long adjustedIndex = removeFrom(logEntryIndex);
- if(adjustedIndex >= 0) {
+ if (adjustedIndex >= 0) {
context.getPersistenceProvider().persist(new DeleteEntries(adjustedIndex), deleteProcedure);
return true;
}
return false;
}
- @Override
- public void appendAndPersist(final ReplicatedLogEntry replicatedLogEntry) {
- appendAndPersist(replicatedLogEntry, null);
- }
-
@Override
public void captureSnapshotIfReady(final ReplicatedLogEntry replicatedLogEntry) {
final ConfigParams config = context.getConfigParams();
}
}
+ @Override
+ public void appendAndPersist(final ReplicatedLogEntry replicatedLogEntry) {
+ appendAndPersist(replicatedLogEntry, null);
+ }
+
@Override
public void appendAndPersist(final ReplicatedLogEntry replicatedLogEntry,
final Procedure<ReplicatedLogEntry> callback) {
context.getLogger().debug("{}: Append log entry and persist {} ", context.getId(), replicatedLogEntry);
- // FIXME : By adding the replicated log entry to the in-memory journal we are not truly ensuring durability of the logs
- if(!append(replicatedLogEntry)) {
+ // FIXME : By adding the replicated log entry to the in-memory journal we are not truly ensuring durability
+ // of the logs
+ if (!append(replicatedLogEntry)) {
return;
}
public String toString() {
return "Snapshot [lastIndex=" + lastIndex + ", lastTerm=" + lastTerm + ", lastAppliedIndex=" + lastAppliedIndex
+ ", lastAppliedTerm=" + lastAppliedTerm + ", unAppliedEntries size=" + unAppliedEntries.size()
- + ", state size=" + state.length + ", electionTerm=" + electionTerm + ", electionVotedFor=" + electionVotedFor
- + ", ServerConfigPayload=" + serverConfig + "]";
+ + ", state size=" + state.length + ", electionTerm=" + electionTerm + ", electionVotedFor="
+ + electionVotedFor + ", ServerConfigPayload=" + serverConfig + "]";
}
}
*/
public class SnapshotManager implements SnapshotState {
+ @SuppressWarnings("checkstyle:MemberName")
private final SnapshotState IDLE = new Idle();
+
+ @SuppressWarnings({"checkstyle:MemberName", "checkstyle:AbbreviationAsWordInName"})
private final SnapshotState PERSISTING = new Persisting();
+
+ @SuppressWarnings({"checkstyle:MemberName", "checkstyle:AbbreviationAsWordInName"})
private final SnapshotState CREATING = new Creating();
private final Logger log;
lastLogEntryIndex = lastLogEntry.getIndex();
lastLogEntryTerm = lastLogEntry.getTerm();
} else {
- log.debug("{}: Capturing Snapshot : lastLogEntry is null. Using lastAppliedIndex {} and lastAppliedTerm {} instead.",
- persistenceId(), lastAppliedIndex, lastAppliedTerm);
+ log.debug("{}: Capturing Snapshot : lastLogEntry is null. Using lastAppliedIndex {} and "
+ + "lastAppliedTerm {} instead.", persistenceId(), lastAppliedIndex, lastAppliedTerm);
}
return new CaptureSnapshot(lastLogEntryIndex, lastLogEntryTerm, lastAppliedIndex, lastAppliedTerm,
return false;
}
+ @SuppressWarnings("checkstyle:IllegalCatch")
private boolean capture(ReplicatedLogEntry lastLogEntry, long replicatedToAllIndex, String targetFollower) {
captureSnapshot = newCaptureSnapshot(lastLogEntry, replicatedToAllIndex, targetFollower != null);
if (dataSizeThresholdExceeded || logSizeExceededSnapshotBatchCount) {
if (log.isDebugEnabled()) {
if (dataSizeThresholdExceeded) {
- log.debug("{}: log data size {} exceeds the memory threshold {} - doing snapshotPreCommit with index {}",
- context.getId(), context.getReplicatedLog().dataSize(), dataThreshold,
- captureSnapshot.getLastAppliedIndex());
+ log.debug("{}: log data size {} exceeds the memory threshold {} - doing snapshotPreCommit "
+ + "with index {}", context.getId(), context.getReplicatedLog().dataSize(),
+ dataThreshold, captureSnapshot.getLastAppliedIndex());
} else {
- log.debug("{}: log size {} exceeds the snapshot batch count {} - doing snapshotPreCommit with index {}",
- context.getId(), context.getReplicatedLog().size(),
+ log.debug("{}: log size {} exceeds the snapshot batch count {} - doing snapshotPreCommit with "
+ + "index {}", context.getId(), context.getReplicatedLog().size(),
context.getConfigParams().getSnapshotBatchCount(),
captureSnapshot.getLastAppliedIndex());
}
private class Persisting extends AbstractSnapshotState {
@Override
+ @SuppressWarnings("checkstyle:IllegalCatch")
public void commit(final long sequenceNumber, long timeStamp) {
log.debug("{}: Snapshot success - sequence number: {}", persistenceId(), sequenceNumber);
*/
public interface SnapshotState {
/**
- * @return true when a snapshot is being captured
+ * Returns whether or not a capture is in progress.
+ *
+ * @return true when a snapshot is being captured, false otherwise
*/
boolean isCapturing();
/**
- * Initiate capture snapshot
+ * Initiates a capture snapshot.
*
* @param lastLogEntry the last entry in the replicated log
* @param replicatedToAllIndex the current replicatedToAllIndex
- *
* @return true if capture was started
*/
boolean capture(ReplicatedLogEntry lastLogEntry, long replicatedToAllIndex);
/**
- * Initiate capture snapshot for the purposing of installing that snapshot
- *
- * @param lastLogEntry
- * @param replicatedToAllIndex
- * @param targetFollower
+ * Initiates a capture snapshot for the purposing of installing the snapshot on a follower.
*
+ * @param lastLogEntry the last entry in the replicated log
+ * @param replicatedToAllIndex the current replicatedToAllIndex
+ * @param targetFollower the id of the follower on which to install
* @return true if capture was started
*/
boolean captureToInstall(ReplicatedLogEntry lastLogEntry, long replicatedToAllIndex, String targetFollower);
void apply(ApplySnapshot snapshot);
/**
- * Persist the snapshot
+ * Persists a snapshot.
*
- * @param snapshotBytes
- * @param currentBehavior
- * @param totalMemory
+ * @param snapshotBytes the snapshot bytes
+ * @param totalMemory the total memory threshold
*/
void persist(byte[] snapshotBytes, long totalMemory);
/**
- * Commit the snapshot by trimming the log
+ * Commit the snapshot by trimming the log.
*
- * @param sequenceNumber
- * @param timeStamp
+ * @param sequenceNumber the sequence number of the persisted snapshot
+ * @param timeStamp the time stamp of the persisted snapshot
*/
void commit(long sequenceNumber, long timeStamp);
/**
- * Rollback the snapshot
+ * Rolls back the snapshot on failure.
*/
void rollback();
/**
- * Trim the log
+ * Trims the in-memory log.
*
- * @param desiredTrimIndex
+ * @param desiredTrimIndex the desired index to trim from
* @return the actual trim index
*/
long trimLog(long desiredTrimIndex);
}
void setSnapshotBytes(ByteString snapshotBytes) {
- if(this.snapshotBytes != null) {
+ if (this.snapshotBytes != null) {
return;
}
this.snapshotBytes = snapshotBytes;
int size = snapshotBytes.size();
- totalChunks = (size / snapshotChunkSize) + (size % snapshotChunkSize > 0 ? 1 : 0);
+ totalChunks = size / snapshotChunkSize + (size % snapshotChunkSize > 0 ? 1 : 0);
LOG.debug("{}: Snapshot {} bytes, total chunks to send: {}", logName, size, totalChunks);
}
int incrementOffset() {
- if(replyStatus) {
+ if (replyStatus) {
// if prev chunk failed, we would want to sent the same chunk again
offset = offset + snapshotChunkSize;
}
int size = snapshotChunkSize;
if (snapshotChunkSize > snapshotLength) {
size = snapshotLength;
- } else if ((start + snapshotChunkSize) > snapshotLength) {
+ } else if (start + snapshotChunkSize > snapshotLength) {
size = snapshotLength - start;
}
}
/**
- * reset should be called when the Follower needs to be sent the snapshot from the beginning
+ * Reset should be called when the Follower needs to be sent the snapshot from the beginning.
*/
- void reset(){
+ void reset() {
offset = 0;
replyStatus = false;
replyReceivedForOffset = offset;
assertEquals("getEntries size", expected.getEntries().size(), actual.getEntries().size());
Iterator<ReplicatedLogEntry> iter = expected.getEntries().iterator();
- for(ReplicatedLogEntry e: actual.getEntries()) {
+ for (ReplicatedLogEntry e: actual.getEntries()) {
verifyReplicatedLogEntry(iter.next(), e);
}
}