X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-akka-raft%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fraft%2FRaftActorServerConfigurationSupport.java;h=3a3511668c4bb82c7c216b56b5a89aa356521375;hp=16c091a4c3490c56ff7e10c3038d32f86cca2f84;hb=29c8203015aa8f2891c305e82f0cf70c3de3f281;hpb=c9943f5bc72d4cde9356d3bd4cf73d36f4b2f754 diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActorServerConfigurationSupport.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActorServerConfigurationSupport.java index 16c091a4c3..3a3511668c 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActorServerConfigurationSupport.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActorServerConfigurationSupport.java @@ -50,6 +50,7 @@ import scala.concurrent.duration.FiniteDuration; class RaftActorServerConfigurationSupport { private static final Logger LOG = LoggerFactory.getLogger(RaftActorServerConfigurationSupport.class); + @SuppressWarnings("checkstyle:MemberName") private final OperationState IDLE = new Idle(); private final RaftActor raftActor; @@ -60,19 +61,19 @@ class RaftActorServerConfigurationSupport { private OperationState currentOperationState = IDLE; - RaftActorServerConfigurationSupport(RaftActor raftActor) { + RaftActorServerConfigurationSupport(final RaftActor raftActor) { this.raftActor = raftActor; this.raftContext = raftActor.getRaftActorContext(); } - boolean handleMessage(Object message, ActorRef sender) { - if(message instanceof AddServer) { + boolean handleMessage(final Object message, final ActorRef sender) { + if (message instanceof AddServer) { onAddServer((AddServer) message, sender); return true; - } else if(message instanceof RemoveServer) { + } else if (message instanceof RemoveServer) { onRemoveServer((RemoveServer) message, sender); return true; - } else if(message instanceof ChangeServersVotingStatus) { + } else if (message instanceof ChangeServersVotingStatus) { onChangeServersVotingStatus((ChangeServersVotingStatus) message, sender); return true; } else if (message instanceof ServerOperationTimeout) { @@ -81,9 +82,9 @@ class RaftActorServerConfigurationSupport { } else if (message instanceof UnInitializedFollowerSnapshotReply) { currentOperationState.onUnInitializedFollowerSnapshotReply((UnInitializedFollowerSnapshotReply) message); return true; - } else if(message instanceof ApplyState) { + } else if (message instanceof ApplyState) { return onApplyState((ApplyState) message); - } else if(message instanceof SnapshotComplete) { + } else if (message instanceof SnapshotComplete) { currentOperationState.onSnapshotComplete(); return false; } else { @@ -91,11 +92,11 @@ class RaftActorServerConfigurationSupport { } } - void onNewLeader(String leaderId) { + void onNewLeader(final String leaderId) { currentOperationState.onNewLeader(leaderId); } - private void onChangeServersVotingStatus(ChangeServersVotingStatus message, ActorRef sender) { + private void onChangeServersVotingStatus(final ChangeServersVotingStatus message, final ActorRef sender) { LOG.debug("{}: onChangeServersVotingStatus: {}, state: {}", raftContext.getId(), message, currentOperationState); @@ -111,23 +112,23 @@ class RaftActorServerConfigurationSupport { // Therefore, if the local server is currently non-voting and is to be changed to voting and there is // no current leader, we will try to elect a leader using the new server config in order to replicate // the change and progress. - boolean localServerChangingToVoting = Boolean.TRUE.equals(message. - getServerVotingStatusMap().get(raftActor.getRaftActorContext().getId())); + boolean localServerChangingToVoting = Boolean.TRUE.equals(message + .getServerVotingStatusMap().get(raftActor.getRaftActorContext().getId())); boolean hasNoLeader = raftActor.getLeaderId() == null; - if(localServerChangingToVoting && !raftContext.isVotingMember() && hasNoLeader) { + if (localServerChangingToVoting && !raftContext.isVotingMember() && hasNoLeader) { currentOperationState.onNewOperation(new ChangeServersVotingStatusContext(message, sender, true)); } else { onNewOperation(new ChangeServersVotingStatusContext(message, sender, false)); } } - private void onRemoveServer(RemoveServer removeServer, ActorRef sender) { + private void onRemoveServer(final RemoveServer removeServer, final ActorRef sender) { LOG.debug("{}: onRemoveServer: {}, state: {}", raftContext.getId(), removeServer, currentOperationState); boolean isSelf = removeServer.getServerId().equals(raftContext.getId()); - if(isSelf && !raftContext.hasFollowers()) { + if (isSelf && !raftContext.hasFollowers()) { sender.tell(new RemoveServerReply(ServerChangeStatus.NOT_SUPPORTED, raftActor.getLeaderId()), raftActor.getSelf()); - } else if(!isSelf && !raftContext.getPeerIds().contains(removeServer.getServerId())) { + } else if (!isSelf && !raftContext.getPeerIds().contains(removeServer.getServerId())) { sender.tell(new RemoveServerReply(ServerChangeStatus.DOES_NOT_EXIST, raftActor.getLeaderId()), raftActor.getSelf()); } else { @@ -137,9 +138,9 @@ class RaftActorServerConfigurationSupport { } } - private boolean onApplyState(ApplyState applyState) { + private boolean onApplyState(final ApplyState applyState) { Payload data = applyState.getReplicatedLogEntry().getData(); - if(data instanceof ServerConfigurationPayload) { + if (data instanceof ServerConfigurationPayload) { currentOperationState.onApplyState(applyState); return true; } @@ -148,7 +149,7 @@ class RaftActorServerConfigurationSupport { } /** - * The algorithm for AddServer is as follows: + * Add a server. The algorithm for AddServer is as follows: * */ - private void onAddServer(AddServer addServer, ActorRef sender) { + private void onAddServer(final AddServer addServer, final ActorRef sender) { LOG.debug("{}: onAddServer: {}, state: {}", raftContext.getId(), addServer, currentOperationState); onNewOperation(new AddServerContext(addServer, sender)); } - private void onNewOperation(ServerOperationContext operationContext) { + private void onNewOperation(final ServerOperationContext operationContext) { if (raftActor.isLeader()) { currentOperationState.onNewOperation(operationContext); } else { @@ -201,7 +202,7 @@ class RaftActorServerConfigurationSupport { * Abstract base class for a server operation FSM state. Handles common behavior for all states. */ private abstract class OperationState { - void onNewOperation(ServerOperationContext operationContext) { + void onNewOperation(final ServerOperationContext operationContext) { // We're currently processing another operation so queue it to be processed later. LOG.debug("{}: Server operation already in progress - queueing {}", raftContext.getId(), @@ -210,15 +211,15 @@ class RaftActorServerConfigurationSupport { pendingOperationsQueue.add(operationContext); } - void onServerOperationTimeout(ServerOperationTimeout timeout) { + void onServerOperationTimeout(final ServerOperationTimeout timeout) { LOG.debug("onServerOperationTimeout should not be called in state {}", this); } - void onUnInitializedFollowerSnapshotReply(UnInitializedFollowerSnapshotReply reply) { + void onUnInitializedFollowerSnapshotReply(final UnInitializedFollowerSnapshotReply reply) { LOG.debug("onUnInitializedFollowerSnapshotReply was called in state {}", this); } - void onApplyState(ApplyState applyState) { + void onApplyState(final ApplyState applyState) { LOG.debug("onApplyState was called in state {}", this); } @@ -226,17 +227,18 @@ class RaftActorServerConfigurationSupport { } - void onNewLeader(String newLeader) { + void onNewLeader(final String newLeader) { } - protected void persistNewServerConfiguration(ServerOperationContext operationContext){ + protected void persistNewServerConfiguration(final ServerOperationContext operationContext) { raftContext.setDynamicServerConfigurationInUse(); ServerConfigurationPayload payload = raftContext.getPeerServerInfo( operationContext.includeSelfInNewConfiguration(raftActor)); LOG.debug("{}: New server configuration : {}", raftContext.getId(), payload.getServerConfig()); - raftActor.persistData(operationContext.getClientRequestor(), operationContext.getContextId(), payload); + raftActor.persistData(operationContext.getClientRequestor(), operationContext.getContextId(), + payload, false); currentOperationState = new Persisting(operationContext, newTimer(new ServerOperationTimeout( operationContext.getLoggingContext()))); @@ -244,8 +246,9 @@ class RaftActorServerConfigurationSupport { sendReply(operationContext, ServerChangeStatus.OK); } - protected void operationComplete(ServerOperationContext operationContext, @Nullable ServerChangeStatus replyStatus) { - if(replyStatus != null) { + protected void operationComplete(final ServerOperationContext operationContext, + @Nullable final ServerChangeStatus replyStatus) { + if (replyStatus != null) { sendReply(operationContext, replyStatus); } @@ -258,23 +261,24 @@ class RaftActorServerConfigurationSupport { currentOperationState = IDLE; ServerOperationContext nextOperation = pendingOperationsQueue.poll(); - if(nextOperation != null) { + if (nextOperation != null) { RaftActorServerConfigurationSupport.this.onNewOperation(nextOperation); } } - protected void sendReply(ServerOperationContext operationContext, ServerChangeStatus status) { - LOG.debug("{}: Returning {} for operation {}", raftContext.getId(), status, operationContext.getOperation()); + protected void sendReply(final ServerOperationContext operationContext, final ServerChangeStatus status) { + LOG.debug("{}: Returning {} for operation {}", raftContext.getId(), status, + operationContext.getOperation()); operationContext.getClientRequestor().tell(operationContext.newReply(status, raftActor.getLeaderId()), raftActor.self()); } - Cancellable newTimer(Object message) { + Cancellable newTimer(final Object message) { return newTimer(raftContext.getConfigParams().getElectionTimeOutInterval().$times(2), message); } - Cancellable newTimer(FiniteDuration timeout, Object message) { + Cancellable newTimer(final FiniteDuration timeout, final Object message) { return raftContext.getActorSystem().scheduler().scheduleOnce( timeout, raftContext.getActor(), message, raftContext.getActorSystem().dispatcher(), raftContext.getActor()); @@ -291,12 +295,12 @@ class RaftActorServerConfigurationSupport { */ private final class Idle extends OperationState { @Override - public void onNewOperation(ServerOperationContext operationContext) { + public void onNewOperation(final ServerOperationContext operationContext) { operationContext.newInitialOperationState(RaftActorServerConfigurationSupport.this).initiate(); } @Override - public void onApplyState(ApplyState applyState) { + public void onApplyState(final ApplyState applyState) { // Noop - we override b/c ApplyState is called normally for followers in the idle state. } } @@ -309,16 +313,16 @@ class RaftActorServerConfigurationSupport { private final Cancellable timer; private boolean timedOut = false; - Persisting(ServerOperationContext operationContext, Cancellable timer) { + Persisting(final ServerOperationContext operationContext, final Cancellable timer) { this.operationContext = operationContext; this.timer = timer; } @Override - public void onApplyState(ApplyState applyState) { + public void onApplyState(final ApplyState applyState) { // Sanity check - we could get an ApplyState from a previous operation that timed out so make // sure it's meant for us. - if(operationContext.getContextId().equals(applyState.getIdentifier())) { + if (operationContext.getContextId().equals(applyState.getIdentifier())) { LOG.info("{}: {} has been successfully replicated to a majority of followers", raftContext.getId(), applyState.getReplicatedLogEntry().getData()); @@ -328,7 +332,7 @@ class RaftActorServerConfigurationSupport { } @Override - public void onServerOperationTimeout(ServerOperationTimeout timeout) { + public void onServerOperationTimeout(final ServerOperationTimeout timeout) { LOG.warn("{}: Timeout occured while replicating the new server configuration for {}", raftContext.getId(), timeout.getLoggingContext()); @@ -336,15 +340,15 @@ class RaftActorServerConfigurationSupport { // Fail any pending operations ServerOperationContext nextOperation = pendingOperationsQueue.poll(); - while(nextOperation != null) { + while (nextOperation != null) { sendReply(nextOperation, ServerChangeStatus.PRIOR_REQUEST_CONSENSUS_TIMEOUT); nextOperation = pendingOperationsQueue.poll(); } } @Override - public void onNewOperation(ServerOperationContext newOperationContext) { - if(timedOut) { + public void onNewOperation(final ServerOperationContext newOperationContext) { + if (timedOut) { sendReply(newOperationContext, ServerChangeStatus.PRIOR_REQUEST_CONSENSUS_TIMEOUT); } else { super.onNewOperation(newOperationContext); @@ -358,7 +362,7 @@ class RaftActorServerConfigurationSupport { private abstract class AddServerState extends OperationState { private final AddServerContext addServerContext; - AddServerState(AddServerContext addServerContext) { + AddServerState(final AddServerContext addServerContext) { this.addServerContext = addServerContext; } @@ -370,7 +374,7 @@ class RaftActorServerConfigurationSupport { return newTimer(new ServerOperationTimeout(addServerContext.getOperation().getNewServerId())); } - void handleInstallSnapshotTimeout(ServerOperationTimeout timeout) { + void handleInstallSnapshotTimeout(final ServerOperationTimeout timeout) { String serverId = timeout.getLoggingContext(); LOG.debug("{}: handleInstallSnapshotTimeout for new server {}", raftContext.getId(), serverId); @@ -379,12 +383,13 @@ class RaftActorServerConfigurationSupport { raftContext.removePeer(serverId); boolean isLeader = raftActor.isLeader(); - if(isLeader) { + if (isLeader) { AbstractLeader leader = (AbstractLeader) raftActor.getCurrentBehavior(); leader.removeFollower(serverId); } - operationComplete(getAddServerContext(), isLeader ? ServerChangeStatus.TIMEOUT : ServerChangeStatus.NO_LEADER); + operationComplete(getAddServerContext(), isLeader ? ServerChangeStatus.TIMEOUT + : ServerChangeStatus.NO_LEADER); } } @@ -394,18 +399,18 @@ class RaftActorServerConfigurationSupport { * snapshot capture, if necessary. */ private final class InitialAddServerState extends AddServerState implements InitialOperationState { - InitialAddServerState(AddServerContext addServerContext) { + InitialAddServerState(final AddServerContext addServerContext) { super(addServerContext); } @Override public void initiate() { - AbstractLeader leader = (AbstractLeader) raftActor.getCurrentBehavior(); + final AbstractLeader leader = (AbstractLeader) raftActor.getCurrentBehavior(); AddServer addServer = getAddServerContext().getOperation(); LOG.debug("{}: Initiating {}", raftContext.getId(), addServer); - if(raftContext.getPeerInfo(addServer.getNewServerId()) != null) { + if (raftContext.getPeerInfo(addServer.getNewServerId()) != null) { operationComplete(getAddServerContext(), ServerChangeStatus.ALREADY_EXISTS); return; } @@ -416,10 +421,10 @@ class RaftActorServerConfigurationSupport { leader.addFollower(addServer.getNewServerId()); - if(votingState == VotingState.VOTING_NOT_INITIALIZED){ + if (votingState == VotingState.VOTING_NOT_INITIALIZED) { // schedule the install snapshot timeout timer Cancellable installSnapshotTimer = newInstallSnapshotTimer(); - if(leader.initiateCaptureSnapshot(addServer.getNewServerId())) { + if (leader.initiateCaptureSnapshot(addServer.getNewServerId())) { LOG.debug("{}: Initiating capture snapshot for new server {}", raftContext.getId(), addServer.getNewServerId()); @@ -446,13 +451,13 @@ class RaftActorServerConfigurationSupport { private final class InstallingSnapshot extends AddServerState { private final Cancellable installSnapshotTimer; - InstallingSnapshot(AddServerContext addServerContext, Cancellable installSnapshotTimer) { + InstallingSnapshot(final AddServerContext addServerContext, final Cancellable installSnapshotTimer) { super(addServerContext); this.installSnapshotTimer = Preconditions.checkNotNull(installSnapshotTimer); } @Override - public void onServerOperationTimeout(ServerOperationTimeout timeout) { + public void onServerOperationTimeout(final ServerOperationTimeout timeout) { handleInstallSnapshotTimeout(timeout); LOG.warn("{}: Timeout occured for new server {} while installing snapshot", raftContext.getId(), @@ -460,14 +465,14 @@ class RaftActorServerConfigurationSupport { } @Override - public void onUnInitializedFollowerSnapshotReply(UnInitializedFollowerSnapshotReply reply) { + public void onUnInitializedFollowerSnapshotReply(final UnInitializedFollowerSnapshotReply reply) { LOG.debug("{}: onUnInitializedFollowerSnapshotReply: {}", raftContext.getId(), reply); String followerId = reply.getFollowerId(); // Sanity check to guard against receiving an UnInitializedFollowerSnapshotReply from a prior // add server operation that timed out. - if(getAddServerContext().getOperation().getNewServerId().equals(followerId) && raftActor.isLeader()) { + if (getAddServerContext().getOperation().getNewServerId().equals(followerId) && raftActor.isLeader()) { AbstractLeader leader = (AbstractLeader) raftActor.getCurrentBehavior(); raftContext.getPeerInfo(followerId).setVotingState(VotingState.VOTING); leader.updateMinReplicaCount(); @@ -490,7 +495,7 @@ class RaftActorServerConfigurationSupport { private final class WaitingForPriorSnapshotComplete extends AddServerState { private final Cancellable snapshotTimer; - WaitingForPriorSnapshotComplete(AddServerContext addServerContext, Cancellable snapshotTimer) { + WaitingForPriorSnapshotComplete(final AddServerContext addServerContext, final Cancellable snapshotTimer) { super(addServerContext); this.snapshotTimer = Preconditions.checkNotNull(snapshotTimer); } @@ -499,13 +504,13 @@ class RaftActorServerConfigurationSupport { public void onSnapshotComplete() { LOG.debug("{}: onSnapshotComplete", raftContext.getId()); - if(!raftActor.isLeader()) { + if (!raftActor.isLeader()) { LOG.debug("{}: No longer the leader", raftContext.getId()); return; } AbstractLeader leader = (AbstractLeader) raftActor.getCurrentBehavior(); - if(leader.initiateCaptureSnapshot(getAddServerContext().getOperation().getNewServerId())) { + if (leader.initiateCaptureSnapshot(getAddServerContext().getOperation().getNewServerId())) { LOG.debug("{}: Initiating capture snapshot for new server {}", raftContext.getId(), getAddServerContext().getOperation().getNewServerId()); @@ -517,7 +522,7 @@ class RaftActorServerConfigurationSupport { } @Override - public void onServerOperationTimeout(ServerOperationTimeout timeout) { + public void onServerOperationTimeout(final ServerOperationTimeout timeout) { handleInstallSnapshotTimeout(timeout); LOG.warn("{}: Timeout occured for new server {} while waiting for prior snapshot to complete", @@ -525,7 +530,8 @@ class RaftActorServerConfigurationSupport { } } - private static final class ServerOperationContextIdentifier extends AbstractUUIDIdentifier { + private static final class ServerOperationContextIdentifier + extends AbstractUUIDIdentifier { private static final long serialVersionUID = 1L; ServerOperationContextIdentifier() { @@ -543,7 +549,7 @@ class RaftActorServerConfigurationSupport { private final ActorRef clientRequestor; private final Identifier contextId; - ServerOperationContext(T operation, ActorRef clientRequestor){ + ServerOperationContext(final T operation, final ActorRef clientRequestor) { this.operation = operation; this.clientRequestor = clientRequestor; contextId = new ServerOperationContextIdentifier(); @@ -561,10 +567,10 @@ class RaftActorServerConfigurationSupport { return clientRequestor; } - void operationComplete(RaftActor raftActor, boolean succeeded) { + void operationComplete(final RaftActor raftActor, final boolean succeeded) { } - boolean includeSelfInNewConfiguration(RaftActor raftActor) { + boolean includeSelfInNewConfiguration(final RaftActor raftActor) { return true; } @@ -579,17 +585,17 @@ class RaftActorServerConfigurationSupport { * Stores context information for an AddServer operation. */ private static class AddServerContext extends ServerOperationContext { - AddServerContext(AddServer addServer, ActorRef clientRequestor) { + AddServerContext(final AddServer addServer, final ActorRef clientRequestor) { super(addServer, clientRequestor); } @Override - Object newReply(ServerChangeStatus status, String leaderId) { + Object newReply(final ServerChangeStatus status, final String leaderId) { return new AddServerReply(status, leaderId); } @Override - InitialOperationState newInitialOperationState(RaftActorServerConfigurationSupport support) { + InitialOperationState newInitialOperationState(final RaftActorServerConfigurationSupport support) { return support.new InitialAddServerState(this); } @@ -602,7 +608,7 @@ class RaftActorServerConfigurationSupport { private abstract class RemoveServerState extends OperationState { private final RemoveServerContext removeServerContext; - protected RemoveServerState(RemoveServerContext removeServerContext) { + protected RemoveServerState(final RemoveServerContext removeServerContext) { this.removeServerContext = Preconditions.checkNotNull(removeServerContext); } @@ -612,9 +618,9 @@ class RaftActorServerConfigurationSupport { } } - private final class InitialRemoveServerState extends RemoveServerState implements InitialOperationState{ + private final class InitialRemoveServerState extends RemoveServerState implements InitialOperationState { - protected InitialRemoveServerState(RemoveServerContext removeServerContext) { + protected InitialRemoveServerState(final RemoveServerContext removeServerContext) { super(removeServerContext); } @@ -622,7 +628,9 @@ class RaftActorServerConfigurationSupport { public void initiate() { String serverId = getRemoveServerContext().getOperation().getServerId(); raftContext.removePeer(serverId); - ((AbstractLeader)raftActor.getCurrentBehavior()).removeFollower(serverId); + AbstractLeader leader = (AbstractLeader)raftActor.getCurrentBehavior(); + leader.removeFollower(serverId); + leader.updateMinReplicaCount(); persistNewServerConfiguration(getRemoveServerContext()); } @@ -631,30 +639,31 @@ class RaftActorServerConfigurationSupport { private static class RemoveServerContext extends ServerOperationContext { private final String peerAddress; - RemoveServerContext(RemoveServer operation, String peerAddress, ActorRef clientRequestor) { + RemoveServerContext(final RemoveServer operation, final String peerAddress, final ActorRef clientRequestor) { super(operation, clientRequestor); this.peerAddress = peerAddress; } @Override - Object newReply(ServerChangeStatus status, String leaderId) { + Object newReply(final ServerChangeStatus status, final String leaderId) { return new RemoveServerReply(status, leaderId); } @Override - InitialOperationState newInitialOperationState(RaftActorServerConfigurationSupport support) { + InitialOperationState newInitialOperationState(final RaftActorServerConfigurationSupport support) { return support.new InitialRemoveServerState(this); } @Override - void operationComplete(RaftActor raftActor, boolean succeeded) { - if(peerAddress != null) { - raftActor.context().actorSelection(peerAddress).tell(new ServerRemoved(getOperation().getServerId()), raftActor.getSelf()); + void operationComplete(final RaftActor raftActor, final boolean succeeded) { + if (peerAddress != null) { + raftActor.context().actorSelection(peerAddress).tell( + new ServerRemoved(getOperation().getServerId()), raftActor.getSelf()); } } @Override - boolean includeSelfInNewConfiguration(RaftActor raftActor) { + boolean includeSelfInNewConfiguration(final RaftActor raftActor) { return !getOperation().getServerId().equals(raftActor.getId()); } @@ -667,31 +676,33 @@ class RaftActorServerConfigurationSupport { private static class ChangeServersVotingStatusContext extends ServerOperationContext { private final boolean tryToElectLeader; - ChangeServersVotingStatusContext(ChangeServersVotingStatus convertMessage, ActorRef clientRequestor, - boolean tryToElectLeader) { + ChangeServersVotingStatusContext(final ChangeServersVotingStatus convertMessage, final ActorRef clientRequestor, + final boolean tryToElectLeader) { super(convertMessage, clientRequestor); this.tryToElectLeader = tryToElectLeader; } @Override - InitialOperationState newInitialOperationState(RaftActorServerConfigurationSupport support) { + InitialOperationState newInitialOperationState(final RaftActorServerConfigurationSupport support) { return support.new ChangeServersVotingStatusState(this, tryToElectLeader); } @Override - Object newReply(ServerChangeStatus status, String leaderId) { + Object newReply(final ServerChangeStatus status, final String leaderId) { return new ServerChangeReply(status, leaderId); } @Override - void operationComplete(final RaftActor raftActor, boolean succeeded) { + void operationComplete(final RaftActor raftActor, final boolean succeeded) { // If this leader changed to non-voting we need to step down as leader so we'll try to transfer // leadership. - boolean localServerChangedToNonVoting = Boolean.FALSE.equals(getOperation(). - getServerVotingStatusMap().get(raftActor.getRaftActorContext().getId())); + boolean localServerChangedToNonVoting = Boolean.FALSE.equals(getOperation() + .getServerVotingStatusMap().get(raftActor.getRaftActorContext().getId())); if (succeeded && localServerChangedToNonVoting) { LOG.debug("Leader changed to non-voting - trying leadership transfer"); raftActor.becomeNonVoting(); + } else if (raftActor.isLeader()) { + raftActor.onVotingStateChangeComplete(); } } @@ -705,8 +716,8 @@ class RaftActorServerConfigurationSupport { private final ChangeServersVotingStatusContext changeVotingStatusContext; private final boolean tryToElectLeader; - ChangeServersVotingStatusState(ChangeServersVotingStatusContext changeVotingStatusContext, - boolean tryToElectLeader) { + ChangeServersVotingStatusState(final ChangeServersVotingStatusContext changeVotingStatusContext, + final boolean tryToElectLeader) { this.changeVotingStatusContext = changeVotingStatusContext; this.tryToElectLeader = tryToElectLeader; } @@ -715,9 +726,9 @@ class RaftActorServerConfigurationSupport { public void initiate() { LOG.debug("Initiating ChangeServersVotingStatusState"); - if(tryToElectLeader) { + if (tryToElectLeader) { initiateLocalLeaderElection(); - } else if(updateLocalPeerInfo()) { + } else if (updateLocalPeerInfo()) { persistNewServerConfiguration(changeVotingStatusContext); } } @@ -726,7 +737,7 @@ class RaftActorServerConfigurationSupport { LOG.debug("{}: Sending local ElectionTimeout to start leader election", raftContext.getId()); ServerConfigurationPayload previousServerConfig = raftContext.getPeerServerInfo(true); - if(!updateLocalPeerInfo()) { + if (!updateLocalPeerInfo()) { return; } @@ -740,20 +751,20 @@ class RaftActorServerConfigurationSupport { // Check if new voting state would leave us with no voting members. boolean atLeastOneVoting = false; - for(ServerInfo info: newServerInfoList) { - if(info.isVoting()) { + for (ServerInfo info: newServerInfoList) { + if (info.isVoting()) { atLeastOneVoting = true; break; } } - if(!atLeastOneVoting) { + if (!atLeastOneVoting) { operationComplete(changeVotingStatusContext, ServerChangeStatus.INVALID_REQUEST); return false; } raftContext.updatePeerIds(new ServerConfigurationPayload(newServerInfoList)); - if(raftActor.getCurrentBehavior() instanceof AbstractLeader) { + if (raftActor.getCurrentBehavior() instanceof AbstractLeader) { AbstractLeader leader = (AbstractLeader) raftActor.getCurrentBehavior(); leader.updateMinReplicaCount(); } @@ -762,15 +773,17 @@ class RaftActorServerConfigurationSupport { } private List newServerInfoList() { - Map serverVotingStatusMap = changeVotingStatusContext.getOperation().getServerVotingStatusMap(); + Map serverVotingStatusMap = changeVotingStatusContext.getOperation() + .getServerVotingStatusMap(); List newServerInfoList = new ArrayList<>(); - for(String peerId: raftContext.getPeerIds()) { - newServerInfoList.add(new ServerInfo(peerId, serverVotingStatusMap.containsKey(peerId) ? - serverVotingStatusMap.get(peerId) : raftContext.getPeerInfo(peerId).isVoting())); + for (String peerId: raftContext.getPeerIds()) { + newServerInfoList.add(new ServerInfo(peerId, serverVotingStatusMap.containsKey(peerId) + ? serverVotingStatusMap.get(peerId) : raftContext.getPeerInfo(peerId).isVoting())); } newServerInfoList.add(new ServerInfo(raftContext.getId(), serverVotingStatusMap.containsKey( - raftContext.getId()) ? serverVotingStatusMap.get(raftContext.getId()) : raftContext.isVotingMember())); + raftContext.getId()) ? serverVotingStatusMap.get(raftContext.getId()) + : raftContext.isVotingMember())); return newServerInfoList; } @@ -781,8 +794,8 @@ class RaftActorServerConfigurationSupport { private final ChangeServersVotingStatusContext operationContext; private final Cancellable timer; - WaitingForLeaderElected(ChangeServersVotingStatusContext operationContext, - ServerConfigurationPayload previousServerConfig) { + WaitingForLeaderElected(final ChangeServersVotingStatusContext operationContext, + final ServerConfigurationPayload previousServerConfig) { this.operationContext = operationContext; this.previousServerConfig = previousServerConfig; @@ -791,8 +804,8 @@ class RaftActorServerConfigurationSupport { } @Override - void onNewLeader(String newLeader) { - if(newLeader == null) { + void onNewLeader(final String newLeader) { + if (newLeader == null) { return; } @@ -800,7 +813,7 @@ class RaftActorServerConfigurationSupport { timer.cancel(); - if(raftActor.isLeader()) { + if (raftActor.isLeader()) { persistNewServerConfiguration(operationContext); } else { // Edge case - some other node became leader so forward the operation. @@ -815,7 +828,7 @@ class RaftActorServerConfigurationSupport { } @Override - void onServerOperationTimeout(ServerOperationTimeout timeout) { + void onServerOperationTimeout(final ServerOperationTimeout timeout) { LOG.warn("{}: Leader election timed out - cannot apply operation {}", raftContext.getId(), timeout.getLoggingContext()); @@ -838,20 +851,20 @@ class RaftActorServerConfigurationSupport { // tried yet. Map serverVotingStatusMap = operationContext.getOperation().getServerVotingStatusMap(); ActorSelection forwardToPeerActor = null; - for(Map.Entry e: serverVotingStatusMap.entrySet()) { + for (Map.Entry e: serverVotingStatusMap.entrySet()) { Boolean isVoting = e.getValue(); String serverId = e.getKey(); PeerInfo peerInfo = raftContext.getPeerInfo(serverId); - if(isVoting && peerInfo != null && !peerInfo.isVoting() && !serversVisited.contains(serverId)) { + if (isVoting && peerInfo != null && !peerInfo.isVoting() && !serversVisited.contains(serverId)) { ActorSelection actor = raftContext.getPeerActorSelection(serverId); - if(actor != null) { + if (actor != null) { forwardToPeerActor = actor; break; } } } - if(forwardToPeerActor != null) { + if (forwardToPeerActor != null) { LOG.debug("{}: Found server {} to forward to", raftContext.getId(), forwardToPeerActor); forwardToPeerActor.tell(new ChangeServersVotingStatus(serverVotingStatusMap, serversVisited), @@ -866,8 +879,8 @@ class RaftActorServerConfigurationSupport { static class ServerOperationTimeout { private final String loggingContext; - ServerOperationTimeout(String loggingContext){ - this.loggingContext = Preconditions.checkNotNull(loggingContext, "loggingContext should not be null"); + ServerOperationTimeout(final String loggingContext) { + this.loggingContext = Preconditions.checkNotNull(loggingContext, "loggingContext should not be null"); } String getLoggingContext() {