2 * Copyright (c) 2015 Brocade Communications Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.controller.cluster.raft;
10 import static java.util.Objects.requireNonNull;
12 import akka.actor.ActorRef;
13 import akka.actor.ActorSelection;
14 import akka.actor.Cancellable;
15 import java.util.ArrayDeque;
16 import java.util.ArrayList;
17 import java.util.Collection;
18 import java.util.HashSet;
19 import java.util.List;
21 import java.util.Queue;
22 import java.util.UUID;
23 import org.eclipse.jdt.annotation.Nullable;
24 import org.opendaylight.controller.cluster.raft.base.messages.ApplyState;
25 import org.opendaylight.controller.cluster.raft.base.messages.SnapshotComplete;
26 import org.opendaylight.controller.cluster.raft.base.messages.TimeoutNow;
27 import org.opendaylight.controller.cluster.raft.behaviors.AbstractLeader;
28 import org.opendaylight.controller.cluster.raft.messages.AddServer;
29 import org.opendaylight.controller.cluster.raft.messages.AddServerReply;
30 import org.opendaylight.controller.cluster.raft.messages.ChangeServersVotingStatus;
31 import org.opendaylight.controller.cluster.raft.messages.Payload;
32 import org.opendaylight.controller.cluster.raft.messages.RemoveServer;
33 import org.opendaylight.controller.cluster.raft.messages.RemoveServerReply;
34 import org.opendaylight.controller.cluster.raft.messages.ServerChangeReply;
35 import org.opendaylight.controller.cluster.raft.messages.ServerChangeStatus;
36 import org.opendaylight.controller.cluster.raft.messages.ServerRemoved;
37 import org.opendaylight.controller.cluster.raft.messages.UnInitializedFollowerSnapshotReply;
38 import org.opendaylight.controller.cluster.raft.persisted.ServerConfigurationPayload;
39 import org.opendaylight.controller.cluster.raft.persisted.ServerInfo;
40 import org.opendaylight.yangtools.concepts.Identifier;
41 import org.opendaylight.yangtools.util.AbstractUUIDIdentifier;
42 import org.slf4j.Logger;
43 import org.slf4j.LoggerFactory;
44 import scala.concurrent.duration.FiniteDuration;
47 * Handles server configuration related messages for a RaftActor.
49 * @author Thomas Pantelis
51 class RaftActorServerConfigurationSupport {
52 private static final Logger LOG = LoggerFactory.getLogger(RaftActorServerConfigurationSupport.class);
54 @SuppressWarnings("checkstyle:MemberName")
55 private final OperationState IDLE = new Idle();
57 private final RaftActor raftActor;
59 private final RaftActorContext raftContext;
61 private final Queue<ServerOperationContext<?>> pendingOperationsQueue = new ArrayDeque<>();
63 private OperationState currentOperationState = IDLE;
65 RaftActorServerConfigurationSupport(final RaftActor raftActor) {
66 this.raftActor = raftActor;
67 raftContext = raftActor.getRaftActorContext();
70 boolean handleMessage(final Object message, final ActorRef sender) {
71 if (message instanceof AddServer addServer) {
72 onAddServer(addServer, sender);
74 } else if (message instanceof RemoveServer removeServer) {
75 onRemoveServer(removeServer, sender);
77 } else if (message instanceof ChangeServersVotingStatus changeServersVotingStatus) {
78 onChangeServersVotingStatus(changeServersVotingStatus, sender);
80 } else if (message instanceof ServerOperationTimeout serverOperationTimeout) {
81 currentOperationState.onServerOperationTimeout(serverOperationTimeout);
83 } else if (message instanceof UnInitializedFollowerSnapshotReply uninitFollowerSnapshotReply) {
84 currentOperationState.onUnInitializedFollowerSnapshotReply(uninitFollowerSnapshotReply);
86 } else if (message instanceof ApplyState applyState) {
87 return onApplyState(applyState);
88 } else if (message instanceof SnapshotComplete) {
89 currentOperationState.onSnapshotComplete();
96 void onNewLeader(final String leaderId) {
97 currentOperationState.onNewLeader(leaderId);
100 private void onChangeServersVotingStatus(final ChangeServersVotingStatus message, final ActorRef sender) {
101 LOG.debug("{}: onChangeServersVotingStatus: {}, state: {}", raftContext.getId(), message,
102 currentOperationState);
104 // The following check is a special case. Normally we fail an operation if there's no leader.
105 // Consider a scenario where one has 2 geographically-separated 3-node clusters, one a primary and
106 // the other a backup such that if the primary cluster is lost, the backup can take over. In this
107 // scenario, we have a logical 6-node cluster where the primary sub-cluster is configured as voting
108 // and the backup sub-cluster as non-voting such that the primary cluster can make progress without
109 // consensus from the backup cluster while still replicating to the backup. On fail-over to the backup,
110 // a request would be sent to a member of the backup cluster to flip the voting states, ie make the
111 // backup sub-cluster voting and the lost primary non-voting. However since the primary majority
112 // cluster is lost, there would be no leader to apply, persist and replicate the server config change.
113 // Therefore, if the local server is currently non-voting and is to be changed to voting and there is
114 // no current leader, we will try to elect a leader using the new server config in order to replicate
115 // the change and progress.
116 boolean localServerChangingToVoting = Boolean.TRUE.equals(message
117 .getServerVotingStatusMap().get(raftActor.getRaftActorContext().getId()));
118 boolean hasNoLeader = raftActor.getLeaderId() == null;
119 if (localServerChangingToVoting && !raftContext.isVotingMember() && hasNoLeader) {
120 currentOperationState.onNewOperation(new ChangeServersVotingStatusContext(message, sender, true));
122 onNewOperation(new ChangeServersVotingStatusContext(message, sender, false));
126 private void onRemoveServer(final RemoveServer removeServer, final ActorRef sender) {
127 LOG.debug("{}: onRemoveServer: {}, state: {}", raftContext.getId(), removeServer, currentOperationState);
128 boolean isSelf = removeServer.getServerId().equals(raftContext.getId());
129 if (isSelf && !raftContext.hasFollowers()) {
130 sender.tell(new RemoveServerReply(ServerChangeStatus.NOT_SUPPORTED, raftActor.getLeaderId()),
131 raftActor.getSelf());
132 } else if (!isSelf && !raftContext.getPeerIds().contains(removeServer.getServerId())) {
133 sender.tell(new RemoveServerReply(ServerChangeStatus.DOES_NOT_EXIST, raftActor.getLeaderId()),
134 raftActor.getSelf());
136 String serverAddress = isSelf ? raftActor.self().path().toString() :
137 raftContext.getPeerAddress(removeServer.getServerId());
138 onNewOperation(new RemoveServerContext(removeServer, serverAddress, sender));
142 private boolean onApplyState(final ApplyState applyState) {
143 Payload data = applyState.getReplicatedLogEntry().getData();
144 if (data instanceof ServerConfigurationPayload) {
145 currentOperationState.onApplyState(applyState);
153 * Add a server. The algorithm for AddServer is as follows:
155 * <li>Add the new server as a peer.</li>
156 * <li>Add the new follower to the leader.</li>
157 * <li>If new server should be voting member</li>
159 * <li>Initialize FollowerState to VOTING_NOT_INITIALIZED.</li>
160 * <li>Initiate install snapshot to the new follower.</li>
161 * <li>When install snapshot complete, mark the follower as VOTING and re-calculate majority vote count.</li>
163 * <li>Persist and replicate ServerConfigurationPayload with the new server list.</li>
164 * <li>On replication consensus, respond to caller with OK.</li>
166 * If the install snapshot times out after a period of 2 * election time out
168 * <li>Remove the new server as a peer.</li>
169 * <li>Remove the new follower from the leader.</li>
170 * <li>Respond to caller with TIMEOUT.</li>
173 private void onAddServer(final AddServer addServer, final ActorRef sender) {
174 LOG.debug("{}: onAddServer: {}, state: {}", raftContext.getId(), addServer, currentOperationState);
176 onNewOperation(new AddServerContext(addServer, sender));
179 private void onNewOperation(final ServerOperationContext<?> operationContext) {
180 if (raftActor.isLeader()) {
181 currentOperationState.onNewOperation(operationContext);
183 ActorSelection leader = raftActor.getLeader();
184 if (leader != null) {
185 LOG.debug("{}: Not leader - forwarding to leader {}", raftContext.getId(), leader);
186 leader.tell(operationContext.getOperation(), operationContext.getClientRequestor());
188 LOG.debug("{}: No leader - returning NO_LEADER reply", raftContext.getId());
189 operationContext.getClientRequestor().tell(operationContext.newReply(
190 ServerChangeStatus.NO_LEADER, null), raftActor.self());
196 * Interface for the initial state for a server operation.
198 private interface InitialOperationState {
203 * Abstract base class for a server operation FSM state. Handles common behavior for all states.
205 private abstract class OperationState {
206 void onNewOperation(final ServerOperationContext<?> operationContext) {
207 // We're currently processing another operation so queue it to be processed later.
209 LOG.debug("{}: Server operation already in progress - queueing {}", raftContext.getId(),
210 operationContext.getOperation());
212 pendingOperationsQueue.add(operationContext);
215 void onServerOperationTimeout(final ServerOperationTimeout timeout) {
216 LOG.debug("onServerOperationTimeout should not be called in state {}", this);
219 void onUnInitializedFollowerSnapshotReply(final UnInitializedFollowerSnapshotReply reply) {
220 LOG.debug("onUnInitializedFollowerSnapshotReply was called in state {}", this);
223 void onApplyState(final ApplyState applyState) {
224 LOG.debug("onApplyState was called in state {}", this);
227 void onSnapshotComplete() {
231 void onNewLeader(final String newLeader) {
234 protected void persistNewServerConfiguration(final ServerOperationContext<?> operationContext) {
235 raftContext.setDynamicServerConfigurationInUse();
237 ServerConfigurationPayload payload = raftContext.getPeerServerInfo(
238 operationContext.includeSelfInNewConfiguration(raftActor));
239 LOG.debug("{}: New server configuration : {}", raftContext.getId(), payload.getServerConfig());
241 raftActor.persistData(operationContext.getClientRequestor(), operationContext.getContextId(),
244 currentOperationState = new Persisting(operationContext, newTimer(new ServerOperationTimeout(
245 operationContext.getLoggingContext())));
247 sendReply(operationContext, ServerChangeStatus.OK);
250 protected void operationComplete(final ServerOperationContext<?> operationContext,
251 final @Nullable ServerChangeStatus replyStatus) {
252 if (replyStatus != null) {
253 sendReply(operationContext, replyStatus);
256 operationContext.operationComplete(raftActor, replyStatus == null || replyStatus == ServerChangeStatus.OK);
261 protected void changeToIdleState() {
262 currentOperationState = IDLE;
264 ServerOperationContext<?> nextOperation = pendingOperationsQueue.poll();
265 if (nextOperation != null) {
266 RaftActorServerConfigurationSupport.this.onNewOperation(nextOperation);
270 protected void sendReply(final ServerOperationContext<?> operationContext, final ServerChangeStatus status) {
271 LOG.debug("{}: Returning {} for operation {}", raftContext.getId(), status,
272 operationContext.getOperation());
274 operationContext.getClientRequestor().tell(operationContext.newReply(status, raftActor.getLeaderId()),
278 Cancellable newTimer(final Object message) {
279 return newTimer(raftContext.getConfigParams().getElectionTimeOutInterval().$times(2), message);
282 Cancellable newTimer(final FiniteDuration timeout, final Object message) {
283 return raftContext.getActorSystem().scheduler().scheduleOnce(
284 timeout, raftContext.getActor(), message,
285 raftContext.getActorSystem().dispatcher(), raftContext.getActor());
289 public String toString() {
290 return getClass().getSimpleName();
295 * The state when no server operation is in progress. It immediately initiates new server operations.
297 private final class Idle extends OperationState {
299 public void onNewOperation(final ServerOperationContext<?> operationContext) {
300 operationContext.newInitialOperationState(RaftActorServerConfigurationSupport.this).initiate();
304 public void onApplyState(final ApplyState applyState) {
305 // Noop - we override b/c ApplyState is called normally for followers in the idle state.
310 * The state when a new server configuration is being persisted and replicated.
312 private final class Persisting extends OperationState {
313 private final ServerOperationContext<?> operationContext;
314 private final Cancellable timer;
315 private boolean timedOut = false;
317 Persisting(final ServerOperationContext<?> operationContext, final Cancellable timer) {
318 this.operationContext = operationContext;
323 public void onApplyState(final ApplyState applyState) {
324 // Sanity check - we could get an ApplyState from a previous operation that timed out so make
325 // sure it's meant for us.
326 if (operationContext.getContextId().equals(applyState.getIdentifier())) {
327 LOG.info("{}: {} has been successfully replicated to a majority of followers", raftContext.getId(),
328 applyState.getReplicatedLogEntry().getData());
331 operationComplete(operationContext, null);
336 public void onServerOperationTimeout(final ServerOperationTimeout timeout) {
337 LOG.warn("{}: Timeout occured while replicating the new server configuration for {}", raftContext.getId(),
338 timeout.getLoggingContext());
342 // Fail any pending operations
343 ServerOperationContext<?> nextOperation = pendingOperationsQueue.poll();
344 while (nextOperation != null) {
345 sendReply(nextOperation, ServerChangeStatus.PRIOR_REQUEST_CONSENSUS_TIMEOUT);
346 nextOperation = pendingOperationsQueue.poll();
351 public void onNewOperation(final ServerOperationContext<?> newOperationContext) {
353 sendReply(newOperationContext, ServerChangeStatus.PRIOR_REQUEST_CONSENSUS_TIMEOUT);
355 super.onNewOperation(newOperationContext);
361 * Abstract base class for an AddServer operation state.
363 private abstract class AddServerState extends OperationState {
364 private final AddServerContext addServerContext;
366 AddServerState(final AddServerContext addServerContext) {
367 this.addServerContext = addServerContext;
370 AddServerContext getAddServerContext() {
371 return addServerContext;
374 Cancellable newInstallSnapshotTimer() {
375 return newTimer(new ServerOperationTimeout(addServerContext.getOperation().getNewServerId()));
378 void handleInstallSnapshotTimeout(final ServerOperationTimeout timeout) {
379 String serverId = timeout.getLoggingContext();
381 LOG.debug("{}: handleInstallSnapshotTimeout for new server {}", raftContext.getId(), serverId);
384 raftContext.removePeer(serverId);
386 boolean isLeader = raftActor.isLeader();
388 AbstractLeader leader = (AbstractLeader) raftActor.getCurrentBehavior();
389 leader.removeFollower(serverId);
392 operationComplete(getAddServerContext(), isLeader ? ServerChangeStatus.TIMEOUT
393 : ServerChangeStatus.NO_LEADER);
399 * The initial state for the AddServer operation. It adds the new follower as a peer and initiates
400 * snapshot capture, if necessary.
402 private final class InitialAddServerState extends AddServerState implements InitialOperationState {
403 InitialAddServerState(final AddServerContext addServerContext) {
404 super(addServerContext);
408 public void initiate() {
409 final AbstractLeader leader = (AbstractLeader) raftActor.getCurrentBehavior();
410 AddServer addServer = getAddServerContext().getOperation();
412 LOG.debug("{}: Initiating {}", raftContext.getId(), addServer);
414 if (raftContext.getPeerInfo(addServer.getNewServerId()) != null) {
415 operationComplete(getAddServerContext(), ServerChangeStatus.ALREADY_EXISTS);
419 VotingState votingState = addServer.isVotingMember() ? VotingState.VOTING_NOT_INITIALIZED :
420 VotingState.NON_VOTING;
421 raftContext.addToPeers(addServer.getNewServerId(), addServer.getNewServerAddress(), votingState);
423 leader.addFollower(addServer.getNewServerId());
425 if (votingState == VotingState.VOTING_NOT_INITIALIZED) {
426 // schedule the install snapshot timeout timer
427 Cancellable installSnapshotTimer = newInstallSnapshotTimer();
428 if (leader.initiateCaptureSnapshot(addServer.getNewServerId())) {
429 LOG.debug("{}: Initiating capture snapshot for new server {}", raftContext.getId(),
430 addServer.getNewServerId());
432 currentOperationState = new InstallingSnapshot(getAddServerContext(), installSnapshotTimer);
434 LOG.debug("{}: Snapshot already in progress - waiting for completion", raftContext.getId());
436 currentOperationState = new WaitingForPriorSnapshotComplete(getAddServerContext(),
437 installSnapshotTimer);
440 LOG.debug("{}: New follower is non-voting - directly persisting new server configuration",
441 raftContext.getId());
443 persistNewServerConfiguration(getAddServerContext());
449 * The AddServer operation state for when the catch-up snapshot is being installed. It handles successful
452 private final class InstallingSnapshot extends AddServerState {
453 private final Cancellable installSnapshotTimer;
455 InstallingSnapshot(final AddServerContext addServerContext, final Cancellable installSnapshotTimer) {
456 super(addServerContext);
457 this.installSnapshotTimer = requireNonNull(installSnapshotTimer);
461 public void onServerOperationTimeout(final ServerOperationTimeout timeout) {
462 handleInstallSnapshotTimeout(timeout);
464 LOG.warn("{}: Timeout occured for new server {} while installing snapshot", raftContext.getId(),
465 timeout.getLoggingContext());
469 public void onUnInitializedFollowerSnapshotReply(final UnInitializedFollowerSnapshotReply reply) {
470 LOG.debug("{}: onUnInitializedFollowerSnapshotReply: {}", raftContext.getId(), reply);
472 String followerId = reply.getFollowerId();
474 // Sanity check to guard against receiving an UnInitializedFollowerSnapshotReply from a prior
475 // add server operation that timed out.
476 if (getAddServerContext().getOperation().getNewServerId().equals(followerId) && raftActor.isLeader()) {
477 AbstractLeader leader = (AbstractLeader) raftActor.getCurrentBehavior();
478 raftContext.getPeerInfo(followerId).setVotingState(VotingState.VOTING);
479 leader.updateMinReplicaCount();
481 persistNewServerConfiguration(getAddServerContext());
483 installSnapshotTimer.cancel();
485 LOG.debug("{}: Dropping UnInitializedFollowerSnapshotReply for server {}: {}",
486 raftContext.getId(), followerId,
487 !raftActor.isLeader() ? "not leader" : "server Id doesn't match");
493 * The AddServer operation state for when there is a snapshot already in progress. When the current
494 * snapshot completes, it initiates an install snapshot.
496 private final class WaitingForPriorSnapshotComplete extends AddServerState {
497 private final Cancellable snapshotTimer;
499 WaitingForPriorSnapshotComplete(final AddServerContext addServerContext, final Cancellable snapshotTimer) {
500 super(addServerContext);
501 this.snapshotTimer = requireNonNull(snapshotTimer);
505 public void onSnapshotComplete() {
506 LOG.debug("{}: onSnapshotComplete", raftContext.getId());
508 if (!raftActor.isLeader()) {
509 LOG.debug("{}: No longer the leader", raftContext.getId());
513 AbstractLeader leader = (AbstractLeader) raftActor.getCurrentBehavior();
514 if (leader.initiateCaptureSnapshot(getAddServerContext().getOperation().getNewServerId())) {
515 LOG.debug("{}: Initiating capture snapshot for new server {}", raftContext.getId(),
516 getAddServerContext().getOperation().getNewServerId());
518 currentOperationState = new InstallingSnapshot(getAddServerContext(),
519 newInstallSnapshotTimer());
521 snapshotTimer.cancel();
526 public void onServerOperationTimeout(final ServerOperationTimeout timeout) {
527 handleInstallSnapshotTimeout(timeout);
529 LOG.warn("{}: Timeout occured for new server {} while waiting for prior snapshot to complete",
530 raftContext.getId(), timeout.getLoggingContext());
534 private static final class ServerOperationContextIdentifier
535 extends AbstractUUIDIdentifier<ServerOperationContextIdentifier> {
536 private static final long serialVersionUID = 1L;
538 ServerOperationContextIdentifier() {
539 super(UUID.randomUUID());
544 * Stores context information for a server operation.
546 * @param <T> the operation type
548 private abstract static class ServerOperationContext<T> {
549 private final T operation;
550 private final ActorRef clientRequestor;
551 private final Identifier contextId;
553 ServerOperationContext(final T operation, final ActorRef clientRequestor) {
554 this.operation = operation;
555 this.clientRequestor = clientRequestor;
556 contextId = new ServerOperationContextIdentifier();
559 Identifier getContextId() {
567 ActorRef getClientRequestor() {
568 return clientRequestor;
571 void operationComplete(final RaftActor raftActor, final boolean succeeded) {
574 boolean includeSelfInNewConfiguration(final RaftActor raftActor) {
578 abstract Object newReply(ServerChangeStatus status, String leaderId);
580 abstract InitialOperationState newInitialOperationState(RaftActorServerConfigurationSupport support);
582 abstract String getLoggingContext();
586 * Stores context information for an AddServer operation.
588 private static class AddServerContext extends ServerOperationContext<AddServer> {
589 AddServerContext(final AddServer addServer, final ActorRef clientRequestor) {
590 super(addServer, clientRequestor);
594 Object newReply(final ServerChangeStatus status, final String leaderId) {
595 return new AddServerReply(status, leaderId);
599 InitialOperationState newInitialOperationState(final RaftActorServerConfigurationSupport support) {
600 return support.new InitialAddServerState(this);
604 String getLoggingContext() {
605 return getOperation().getNewServerId();
609 private abstract class RemoveServerState extends OperationState {
610 private final RemoveServerContext removeServerContext;
612 protected RemoveServerState(final RemoveServerContext removeServerContext) {
613 this.removeServerContext = requireNonNull(removeServerContext);
617 public RemoveServerContext getRemoveServerContext() {
618 return removeServerContext;
622 private final class InitialRemoveServerState extends RemoveServerState implements InitialOperationState {
624 protected InitialRemoveServerState(final RemoveServerContext removeServerContext) {
625 super(removeServerContext);
629 public void initiate() {
630 String serverId = getRemoveServerContext().getOperation().getServerId();
631 raftContext.removePeer(serverId);
632 AbstractLeader leader = (AbstractLeader)raftActor.getCurrentBehavior();
633 leader.removeFollower(serverId);
634 leader.updateMinReplicaCount();
636 persistNewServerConfiguration(getRemoveServerContext());
640 private static class RemoveServerContext extends ServerOperationContext<RemoveServer> {
641 private final String peerAddress;
643 RemoveServerContext(final RemoveServer operation, final String peerAddress, final ActorRef clientRequestor) {
644 super(operation, clientRequestor);
645 this.peerAddress = peerAddress;
649 Object newReply(final ServerChangeStatus status, final String leaderId) {
650 return new RemoveServerReply(status, leaderId);
654 InitialOperationState newInitialOperationState(final RaftActorServerConfigurationSupport support) {
655 return support.new InitialRemoveServerState(this);
659 void operationComplete(final RaftActor raftActor, final boolean succeeded) {
660 if (peerAddress != null) {
661 raftActor.context().actorSelection(peerAddress).tell(
662 new ServerRemoved(getOperation().getServerId()), raftActor.getSelf());
667 boolean includeSelfInNewConfiguration(final RaftActor raftActor) {
668 return !getOperation().getServerId().equals(raftActor.getId());
672 String getLoggingContext() {
673 return getOperation().getServerId();
677 private static class ChangeServersVotingStatusContext extends ServerOperationContext<ChangeServersVotingStatus> {
678 private final boolean tryToElectLeader;
680 ChangeServersVotingStatusContext(final ChangeServersVotingStatus convertMessage, final ActorRef clientRequestor,
681 final boolean tryToElectLeader) {
682 super(convertMessage, clientRequestor);
683 this.tryToElectLeader = tryToElectLeader;
687 InitialOperationState newInitialOperationState(final RaftActorServerConfigurationSupport support) {
688 return support.new ChangeServersVotingStatusState(this, tryToElectLeader);
692 Object newReply(final ServerChangeStatus status, final String leaderId) {
693 return new ServerChangeReply(status, leaderId);
697 void operationComplete(final RaftActor raftActor, final boolean succeeded) {
698 // If this leader changed to non-voting we need to step down as leader so we'll try to transfer
700 boolean localServerChangedToNonVoting = Boolean.FALSE.equals(getOperation()
701 .getServerVotingStatusMap().get(raftActor.getRaftActorContext().getId()));
702 if (succeeded && localServerChangedToNonVoting) {
703 LOG.debug("Leader changed to non-voting - trying leadership transfer");
704 raftActor.becomeNonVoting();
705 } else if (raftActor.isLeader()) {
706 raftActor.onVotingStateChangeComplete();
711 String getLoggingContext() {
712 return getOperation().toString();
716 private class ChangeServersVotingStatusState extends OperationState implements InitialOperationState {
717 private final ChangeServersVotingStatusContext changeVotingStatusContext;
718 private final boolean tryToElectLeader;
720 ChangeServersVotingStatusState(final ChangeServersVotingStatusContext changeVotingStatusContext,
721 final boolean tryToElectLeader) {
722 this.changeVotingStatusContext = changeVotingStatusContext;
723 this.tryToElectLeader = tryToElectLeader;
727 public void initiate() {
728 LOG.debug("Initiating ChangeServersVotingStatusState");
730 if (tryToElectLeader) {
731 initiateLocalLeaderElection();
732 } else if (updateLocalPeerInfo()) {
733 persistNewServerConfiguration(changeVotingStatusContext);
737 private void initiateLocalLeaderElection() {
738 LOG.debug("{}: Sending local ElectionTimeout to start leader election", raftContext.getId());
740 ServerConfigurationPayload previousServerConfig = raftContext.getPeerServerInfo(true);
741 if (!updateLocalPeerInfo()) {
745 raftContext.getActor().tell(TimeoutNow.INSTANCE, raftContext.getActor());
747 currentOperationState = new WaitingForLeaderElected(changeVotingStatusContext, previousServerConfig);
750 private boolean updateLocalPeerInfo() {
751 List<ServerInfo> newServerInfoList = newServerInfoList();
753 // Check if new voting state would leave us with no voting members.
754 boolean atLeastOneVoting = false;
755 for (ServerInfo info: newServerInfoList) {
756 if (info.isVoting()) {
757 atLeastOneVoting = true;
762 if (!atLeastOneVoting) {
763 operationComplete(changeVotingStatusContext, ServerChangeStatus.INVALID_REQUEST);
767 raftContext.updatePeerIds(new ServerConfigurationPayload(newServerInfoList));
768 if (raftActor.getCurrentBehavior() instanceof AbstractLeader leader) {
769 leader.updateMinReplicaCount();
775 private List<ServerInfo> newServerInfoList() {
776 Map<String, Boolean> serverVotingStatusMap = changeVotingStatusContext.getOperation()
777 .getServerVotingStatusMap();
778 List<ServerInfo> newServerInfoList = new ArrayList<>();
779 for (String peerId: raftContext.getPeerIds()) {
780 newServerInfoList.add(new ServerInfo(peerId, serverVotingStatusMap.containsKey(peerId)
781 ? serverVotingStatusMap.get(peerId) : raftContext.getPeerInfo(peerId).isVoting()));
784 newServerInfoList.add(new ServerInfo(raftContext.getId(), serverVotingStatusMap.containsKey(
785 raftContext.getId()) ? serverVotingStatusMap.get(raftContext.getId())
786 : raftContext.isVotingMember()));
788 return newServerInfoList;
792 private class WaitingForLeaderElected extends OperationState {
793 private final ServerConfigurationPayload previousServerConfig;
794 private final ChangeServersVotingStatusContext operationContext;
795 private final Cancellable timer;
797 WaitingForLeaderElected(final ChangeServersVotingStatusContext operationContext,
798 final ServerConfigurationPayload previousServerConfig) {
799 this.operationContext = operationContext;
800 this.previousServerConfig = previousServerConfig;
802 timer = newTimer(raftContext.getConfigParams().getElectionTimeOutInterval(),
803 new ServerOperationTimeout(operationContext.getLoggingContext()));
807 void onNewLeader(final String newLeader) {
808 if (newLeader == null) {
812 LOG.debug("{}: New leader {} elected", raftContext.getId(), newLeader);
816 if (raftActor.isLeader()) {
817 persistNewServerConfiguration(operationContext);
819 // Edge case - some other node became leader so forward the operation.
820 LOG.debug("{}: Forwarding {} to new leader", raftContext.getId(), operationContext.getOperation());
822 // Revert the local server config change.
823 raftContext.updatePeerIds(previousServerConfig);
826 RaftActorServerConfigurationSupport.this.onNewOperation(operationContext);
831 void onServerOperationTimeout(final ServerOperationTimeout timeout) {
832 LOG.warn("{}: Leader election timed out - cannot apply operation {}",
833 raftContext.getId(), timeout.getLoggingContext());
835 // Revert the local server config change.
836 raftContext.updatePeerIds(previousServerConfig);
837 raftActor.initializeBehavior();
839 tryToForwardOperationToAnotherServer();
842 private void tryToForwardOperationToAnotherServer() {
843 Collection<String> serversVisited = new HashSet<>(operationContext.getOperation().getServersVisited());
845 LOG.debug("{}: tryToForwardOperationToAnotherServer - servers already visited {}", raftContext.getId(),
848 serversVisited.add(raftContext.getId());
850 // Try to find another whose state is being changed from non-voting to voting and that we haven't
852 Map<String, Boolean> serverVotingStatusMap = operationContext.getOperation().getServerVotingStatusMap();
853 ActorSelection forwardToPeerActor = null;
854 for (Map.Entry<String, Boolean> e: serverVotingStatusMap.entrySet()) {
855 Boolean isVoting = e.getValue();
856 String serverId = e.getKey();
857 PeerInfo peerInfo = raftContext.getPeerInfo(serverId);
858 if (isVoting && peerInfo != null && !peerInfo.isVoting() && !serversVisited.contains(serverId)) {
859 ActorSelection actor = raftContext.getPeerActorSelection(serverId);
861 forwardToPeerActor = actor;
867 if (forwardToPeerActor != null) {
868 LOG.debug("{}: Found server {} to forward to", raftContext.getId(), forwardToPeerActor);
870 forwardToPeerActor.tell(new ChangeServersVotingStatus(serverVotingStatusMap, serversVisited),
871 operationContext.getClientRequestor());
874 operationComplete(operationContext, ServerChangeStatus.NO_LEADER);
879 static class ServerOperationTimeout {
880 private final String loggingContext;
882 ServerOperationTimeout(final String loggingContext) {
883 this.loggingContext = requireNonNull(loggingContext, "loggingContext should not be null");
886 String getLoggingContext() {
887 return loggingContext;