2 * Copyright (c) 2015 Brocade Communications Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.controller.cluster.raft;
10 import akka.actor.ActorRef;
11 import akka.actor.ActorSelection;
12 import akka.actor.Cancellable;
13 import com.google.common.base.Preconditions;
14 import java.util.ArrayDeque;
15 import java.util.ArrayList;
16 import java.util.Collection;
17 import java.util.HashSet;
18 import java.util.List;
20 import java.util.Queue;
21 import java.util.UUID;
22 import javax.annotation.Nullable;
23 import org.opendaylight.controller.cluster.raft.ServerConfigurationPayload.ServerInfo;
24 import org.opendaylight.controller.cluster.raft.base.messages.ApplyState;
25 import org.opendaylight.controller.cluster.raft.base.messages.ElectionTimeout;
26 import org.opendaylight.controller.cluster.raft.base.messages.SnapshotComplete;
27 import org.opendaylight.controller.cluster.raft.behaviors.AbstractLeader;
28 import org.opendaylight.controller.cluster.raft.messages.AddServer;
29 import org.opendaylight.controller.cluster.raft.messages.AddServerReply;
30 import org.opendaylight.controller.cluster.raft.messages.ChangeServersVotingStatus;
31 import org.opendaylight.controller.cluster.raft.messages.RemoveServer;
32 import org.opendaylight.controller.cluster.raft.messages.RemoveServerReply;
33 import org.opendaylight.controller.cluster.raft.messages.ServerChangeReply;
34 import org.opendaylight.controller.cluster.raft.messages.ServerChangeStatus;
35 import org.opendaylight.controller.cluster.raft.messages.ServerRemoved;
36 import org.opendaylight.controller.cluster.raft.messages.UnInitializedFollowerSnapshotReply;
37 import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload;
38 import org.slf4j.Logger;
39 import org.slf4j.LoggerFactory;
40 import scala.concurrent.duration.FiniteDuration;
43 * Handles server configuration related messages for a RaftActor.
45 * @author Thomas Pantelis
47 class RaftActorServerConfigurationSupport {
48 private static final Logger LOG = LoggerFactory.getLogger(RaftActorServerConfigurationSupport.class);
50 private final OperationState IDLE = new Idle();
52 private final RaftActor raftActor;
54 private final RaftActorContext raftContext;
56 private final Queue<ServerOperationContext<?>> pendingOperationsQueue = new ArrayDeque<>();
58 private OperationState currentOperationState = IDLE;
60 RaftActorServerConfigurationSupport(RaftActor raftActor) {
61 this.raftActor = raftActor;
62 this.raftContext = raftActor.getRaftActorContext();
65 boolean handleMessage(Object message, ActorRef sender) {
66 if(message instanceof AddServer) {
67 onAddServer((AddServer) message, sender);
69 } else if(message instanceof RemoveServer) {
70 onRemoveServer((RemoveServer) message, sender);
72 } else if(message instanceof ChangeServersVotingStatus) {
73 onChangeServersVotingStatus((ChangeServersVotingStatus) message, sender);
75 } else if (message instanceof ServerOperationTimeout) {
76 currentOperationState.onServerOperationTimeout((ServerOperationTimeout) message);
78 } else if (message instanceof UnInitializedFollowerSnapshotReply) {
79 currentOperationState.onUnInitializedFollowerSnapshotReply((UnInitializedFollowerSnapshotReply) message);
81 } else if(message instanceof ApplyState) {
82 return onApplyState((ApplyState) message);
83 } else if(message instanceof SnapshotComplete) {
84 currentOperationState.onSnapshotComplete();
91 void onNewLeader(String leaderId) {
92 currentOperationState.onNewLeader(leaderId);
95 private void onChangeServersVotingStatus(ChangeServersVotingStatus message, ActorRef sender) {
96 LOG.debug("{}: onChangeServersVotingStatus: {}, state: {}", raftContext.getId(), message,
97 currentOperationState);
99 // The following check is a special case. Normally we fail an operation if there's no leader.
100 // Consider a scenario where one has 2 geographically-separated 3-node clusters, one a primary and
101 // the other a backup such that if the primary cluster is lost, the backup can take over. In this
102 // scenario, we have a logical 6-node cluster where the primary sub-cluster is configured as voting
103 // and the backup sub-cluster as non-voting such that the primary cluster can make progress without
104 // consensus from the backup cluster while still replicating to the backup. On fail-over to the backup,
105 // a request would be sent to a member of the backup cluster to flip the voting states, ie make the
106 // backup sub-cluster voting and the lost primary non-voting. However since the primary majority
107 // cluster is lost, there would be no leader to apply, persist and replicate the server config change.
108 // Therefore, if the local server is currently non-voting and is to be changed to voting and there is
109 // no current leader, we will try to elect a leader using the new server config in order to replicate
110 // the change and progress.
111 boolean localServerChangingToVoting = Boolean.TRUE.equals(message.
112 getServerVotingStatusMap().get(raftActor.getRaftActorContext().getId()));
113 boolean hasNoLeader = raftActor.getLeaderId() == null;
114 if(localServerChangingToVoting && !raftContext.isVotingMember() && hasNoLeader) {
115 currentOperationState.onNewOperation(new ChangeServersVotingStatusContext(message, sender, true));
117 onNewOperation(new ChangeServersVotingStatusContext(message, sender, false));
121 private void onRemoveServer(RemoveServer removeServer, ActorRef sender) {
122 LOG.debug("{}: onRemoveServer: {}, state: {}", raftContext.getId(), removeServer, currentOperationState);
123 boolean isSelf = removeServer.getServerId().equals(raftContext.getId());
124 if(isSelf && !raftContext.hasFollowers()) {
125 sender.tell(new RemoveServerReply(ServerChangeStatus.NOT_SUPPORTED, raftActor.getLeaderId()),
126 raftActor.getSelf());
127 } else if(!isSelf && !raftContext.getPeerIds().contains(removeServer.getServerId())) {
128 sender.tell(new RemoveServerReply(ServerChangeStatus.DOES_NOT_EXIST, raftActor.getLeaderId()),
129 raftActor.getSelf());
131 String serverAddress = isSelf ? raftActor.self().path().toString() :
132 raftContext.getPeerAddress(removeServer.getServerId());
133 onNewOperation(new RemoveServerContext(removeServer, serverAddress, sender));
137 private boolean onApplyState(ApplyState applyState) {
138 Payload data = applyState.getReplicatedLogEntry().getData();
139 if(data instanceof ServerConfigurationPayload) {
140 currentOperationState.onApplyState(applyState);
148 * The algorithm for AddServer is as follows:
150 * <li>Add the new server as a peer.</li>
151 * <li>Add the new follower to the leader.</li>
152 * <li>If new server should be voting member</li>
154 * <li>Initialize FollowerState to VOTING_NOT_INITIALIZED.</li>
155 * <li>Initiate install snapshot to the new follower.</li>
156 * <li>When install snapshot complete, mark the follower as VOTING and re-calculate majority vote count.</li>
158 * <li>Persist and replicate ServerConfigurationPayload with the new server list.</li>
159 * <li>On replication consensus, respond to caller with OK.</li>
161 * If the install snapshot times out after a period of 2 * election time out
163 * <li>Remove the new server as a peer.</li>
164 * <li>Remove the new follower from the leader.</li>
165 * <li>Respond to caller with TIMEOUT.</li>
168 private void onAddServer(AddServer addServer, ActorRef sender) {
169 LOG.debug("{}: onAddServer: {}, state: {}", raftContext.getId(), addServer, currentOperationState);
171 onNewOperation(new AddServerContext(addServer, sender));
174 private void onNewOperation(ServerOperationContext<?> operationContext) {
175 if (raftActor.isLeader()) {
176 currentOperationState.onNewOperation(operationContext);
178 ActorSelection leader = raftActor.getLeader();
179 if (leader != null) {
180 LOG.debug("{}: Not leader - forwarding to leader {}", raftContext.getId(), leader);
181 leader.tell(operationContext.getOperation(), operationContext.getClientRequestor());
183 LOG.debug("{}: No leader - returning NO_LEADER reply", raftContext.getId());
184 operationContext.getClientRequestor().tell(operationContext.newReply(
185 ServerChangeStatus.NO_LEADER, null), raftActor.self());
191 * Interface for the initial state for a server operation.
193 private interface InitialOperationState {
198 * Abstract base class for a server operation FSM state. Handles common behavior for all states.
200 private abstract class OperationState {
201 void onNewOperation(ServerOperationContext<?> operationContext) {
202 // We're currently processing another operation so queue it to be processed later.
204 LOG.debug("{}: Server operation already in progress - queueing {}", raftContext.getId(),
205 operationContext.getOperation());
207 pendingOperationsQueue.add(operationContext);
210 void onServerOperationTimeout(ServerOperationTimeout timeout) {
211 LOG.debug("onServerOperationTimeout should not be called in state {}", this);
214 void onUnInitializedFollowerSnapshotReply(UnInitializedFollowerSnapshotReply reply) {
215 LOG.debug("onUnInitializedFollowerSnapshotReply was called in state {}", this);
218 void onApplyState(ApplyState applyState) {
219 LOG.debug("onApplyState was called in state {}", this);
222 void onSnapshotComplete() {
226 void onNewLeader(String newLeader) {
229 protected void persistNewServerConfiguration(ServerOperationContext<?> operationContext){
230 raftContext.setDynamicServerConfigurationInUse();
232 ServerConfigurationPayload payload = raftContext.getPeerServerInfo(
233 operationContext.includeSelfInNewConfiguration(raftActor));
234 LOG.debug("{}: New server configuration : {}", raftContext.getId(), payload.getServerConfig());
236 raftActor.persistData(operationContext.getClientRequestor(), operationContext.getContextId(), payload);
238 currentOperationState = new Persisting(operationContext, newTimer(new ServerOperationTimeout(
239 operationContext.getLoggingContext())));
241 sendReply(operationContext, ServerChangeStatus.OK);
244 protected void operationComplete(ServerOperationContext<?> operationContext, @Nullable ServerChangeStatus replyStatus) {
245 if(replyStatus != null) {
246 sendReply(operationContext, replyStatus);
249 operationContext.operationComplete(raftActor, replyStatus == null || replyStatus == ServerChangeStatus.OK);
254 protected void changeToIdleState() {
255 currentOperationState = IDLE;
257 ServerOperationContext<?> nextOperation = pendingOperationsQueue.poll();
258 if(nextOperation != null) {
259 RaftActorServerConfigurationSupport.this.onNewOperation(nextOperation);
263 protected void sendReply(ServerOperationContext<?> operationContext, ServerChangeStatus status) {
264 LOG.debug("{}: Returning {} for operation {}", raftContext.getId(), status, operationContext.getOperation());
266 operationContext.getClientRequestor().tell(operationContext.newReply(status, raftActor.getLeaderId()),
270 Cancellable newTimer(Object message) {
271 return newTimer(raftContext.getConfigParams().getElectionTimeOutInterval().$times(2), message);
274 Cancellable newTimer(FiniteDuration timeout, Object message) {
275 return raftContext.getActorSystem().scheduler().scheduleOnce(
276 timeout, raftContext.getActor(), message,
277 raftContext.getActorSystem().dispatcher(), raftContext.getActor());
281 public String toString() {
282 return getClass().getSimpleName();
287 * The state when no server operation is in progress. It immediately initiates new server operations.
289 private final class Idle extends OperationState {
291 public void onNewOperation(ServerOperationContext<?> operationContext) {
292 operationContext.newInitialOperationState(RaftActorServerConfigurationSupport.this).initiate();
296 public void onApplyState(ApplyState applyState) {
297 // Noop - we override b/c ApplyState is called normally for followers in the idle state.
302 * The state when a new server configuration is being persisted and replicated.
304 private final class Persisting extends OperationState {
305 private final ServerOperationContext<?> operationContext;
306 private final Cancellable timer;
307 private boolean timedOut = false;
309 Persisting(ServerOperationContext<?> operationContext, Cancellable timer) {
310 this.operationContext = operationContext;
315 public void onApplyState(ApplyState applyState) {
316 // Sanity check - we could get an ApplyState from a previous operation that timed out so make
317 // sure it's meant for us.
318 if(operationContext.getContextId().equals(applyState.getIdentifier())) {
319 LOG.info("{}: {} has been successfully replicated to a majority of followers", raftContext.getId(),
320 applyState.getReplicatedLogEntry().getData());
323 operationComplete(operationContext, null);
328 public void onServerOperationTimeout(ServerOperationTimeout timeout) {
329 LOG.warn("{}: Timeout occured while replicating the new server configuration for {}", raftContext.getId(),
330 timeout.getLoggingContext());
334 // Fail any pending operations
335 ServerOperationContext<?> nextOperation = pendingOperationsQueue.poll();
336 while(nextOperation != null) {
337 sendReply(nextOperation, ServerChangeStatus.PRIOR_REQUEST_CONSENSUS_TIMEOUT);
338 nextOperation = pendingOperationsQueue.poll();
343 public void onNewOperation(ServerOperationContext<?> operationContext) {
345 sendReply(operationContext, ServerChangeStatus.PRIOR_REQUEST_CONSENSUS_TIMEOUT);
347 super.onNewOperation(operationContext);
353 * Abstract base class for an AddServer operation state.
355 private abstract class AddServerState extends OperationState {
356 private final AddServerContext addServerContext;
358 AddServerState(AddServerContext addServerContext) {
359 this.addServerContext = addServerContext;
362 AddServerContext getAddServerContext() {
363 return addServerContext;
366 Cancellable newInstallSnapshotTimer() {
367 return newTimer(new ServerOperationTimeout(addServerContext.getOperation().getNewServerId()));
370 void handleInstallSnapshotTimeout(ServerOperationTimeout timeout) {
371 String serverId = timeout.getLoggingContext();
373 LOG.debug("{}: handleInstallSnapshotTimeout for new server {}", raftContext.getId(), serverId);
376 raftContext.removePeer(serverId);
378 boolean isLeader = raftActor.isLeader();
380 AbstractLeader leader = (AbstractLeader) raftActor.getCurrentBehavior();
381 leader.removeFollower(serverId);
384 operationComplete(getAddServerContext(), isLeader ? ServerChangeStatus.TIMEOUT : ServerChangeStatus.NO_LEADER);
390 * The initial state for the AddServer operation. It adds the new follower as a peer and initiates
391 * snapshot capture, if necessary.
393 private final class InitialAddServerState extends AddServerState implements InitialOperationState {
394 InitialAddServerState(AddServerContext addServerContext) {
395 super(addServerContext);
399 public void initiate() {
400 AbstractLeader leader = (AbstractLeader) raftActor.getCurrentBehavior();
401 AddServer addServer = getAddServerContext().getOperation();
403 LOG.debug("{}: Initiating {}", raftContext.getId(), addServer);
405 if(raftContext.getPeerInfo(addServer.getNewServerId()) != null) {
406 operationComplete(getAddServerContext(), ServerChangeStatus.ALREADY_EXISTS);
410 VotingState votingState = addServer.isVotingMember() ? VotingState.VOTING_NOT_INITIALIZED :
411 VotingState.NON_VOTING;
412 raftContext.addToPeers(addServer.getNewServerId(), addServer.getNewServerAddress(), votingState);
414 leader.addFollower(addServer.getNewServerId());
416 if(votingState == VotingState.VOTING_NOT_INITIALIZED){
417 // schedule the install snapshot timeout timer
418 Cancellable installSnapshotTimer = newInstallSnapshotTimer();
419 if(leader.initiateCaptureSnapshot(addServer.getNewServerId())) {
420 LOG.debug("{}: Initiating capture snapshot for new server {}", raftContext.getId(),
421 addServer.getNewServerId());
423 currentOperationState = new InstallingSnapshot(getAddServerContext(), installSnapshotTimer);
425 LOG.debug("{}: Snapshot already in progress - waiting for completion", raftContext.getId());
427 currentOperationState = new WaitingForPriorSnapshotComplete(getAddServerContext(),
428 installSnapshotTimer);
431 LOG.debug("{}: New follower is non-voting - directly persisting new server configuration",
432 raftContext.getId());
434 persistNewServerConfiguration(getAddServerContext());
440 * The AddServer operation state for when the catch-up snapshot is being installed. It handles successful
443 private final class InstallingSnapshot extends AddServerState {
444 private final Cancellable installSnapshotTimer;
446 InstallingSnapshot(AddServerContext addServerContext, Cancellable installSnapshotTimer) {
447 super(addServerContext);
448 this.installSnapshotTimer = Preconditions.checkNotNull(installSnapshotTimer);
452 public void onServerOperationTimeout(ServerOperationTimeout timeout) {
453 handleInstallSnapshotTimeout(timeout);
455 LOG.warn("{}: Timeout occured for new server {} while installing snapshot", raftContext.getId(),
456 timeout.getLoggingContext());
460 public void onUnInitializedFollowerSnapshotReply(UnInitializedFollowerSnapshotReply reply) {
461 LOG.debug("{}: onUnInitializedFollowerSnapshotReply: {}", raftContext.getId(), reply);
463 String followerId = reply.getFollowerId();
465 // Sanity check to guard against receiving an UnInitializedFollowerSnapshotReply from a prior
466 // add server operation that timed out.
467 if(getAddServerContext().getOperation().getNewServerId().equals(followerId) && raftActor.isLeader()) {
468 AbstractLeader leader = (AbstractLeader) raftActor.getCurrentBehavior();
469 raftContext.getPeerInfo(followerId).setVotingState(VotingState.VOTING);
470 leader.updateMinReplicaCount();
472 persistNewServerConfiguration(getAddServerContext());
474 installSnapshotTimer.cancel();
476 LOG.debug("{}: Dropping UnInitializedFollowerSnapshotReply for server {}: {}",
477 raftContext.getId(), followerId,
478 !raftActor.isLeader() ? "not leader" : "server Id doesn't match");
484 * The AddServer operation state for when there is a snapshot already in progress. When the current
485 * snapshot completes, it initiates an install snapshot.
487 private final class WaitingForPriorSnapshotComplete extends AddServerState {
488 private final Cancellable snapshotTimer;
490 WaitingForPriorSnapshotComplete(AddServerContext addServerContext, Cancellable snapshotTimer) {
491 super(addServerContext);
492 this.snapshotTimer = Preconditions.checkNotNull(snapshotTimer);
496 public void onSnapshotComplete() {
497 LOG.debug("{}: onSnapshotComplete", raftContext.getId());
499 if(!raftActor.isLeader()) {
500 LOG.debug("{}: No longer the leader", raftContext.getId());
504 AbstractLeader leader = (AbstractLeader) raftActor.getCurrentBehavior();
505 if(leader.initiateCaptureSnapshot(getAddServerContext().getOperation().getNewServerId())) {
506 LOG.debug("{}: Initiating capture snapshot for new server {}", raftContext.getId(),
507 getAddServerContext().getOperation().getNewServerId());
509 currentOperationState = new InstallingSnapshot(getAddServerContext(),
510 newInstallSnapshotTimer());
512 snapshotTimer.cancel();
517 public void onServerOperationTimeout(ServerOperationTimeout timeout) {
518 handleInstallSnapshotTimeout(timeout);
520 LOG.warn("{}: Timeout occured for new server {} while waiting for prior snapshot to complete",
521 raftContext.getId(), timeout.getLoggingContext());
526 * Stores context information for a server operation.
528 * @param <T> the operation type
530 private static abstract class ServerOperationContext<T> {
531 private final T operation;
532 private final ActorRef clientRequestor;
533 private final String contextId;
535 ServerOperationContext(T operation, ActorRef clientRequestor){
536 this.operation = operation;
537 this.clientRequestor = clientRequestor;
538 contextId = UUID.randomUUID().toString();
541 String getContextId() {
549 ActorRef getClientRequestor() {
550 return clientRequestor;
553 void operationComplete(RaftActor raftActor, boolean succeeded) {
556 boolean includeSelfInNewConfiguration(RaftActor raftActor) {
560 abstract Object newReply(ServerChangeStatus status, String leaderId);
562 abstract InitialOperationState newInitialOperationState(RaftActorServerConfigurationSupport support);
564 abstract String getLoggingContext();
568 * Stores context information for an AddServer operation.
570 private static class AddServerContext extends ServerOperationContext<AddServer> {
571 AddServerContext(AddServer addServer, ActorRef clientRequestor) {
572 super(addServer, clientRequestor);
576 Object newReply(ServerChangeStatus status, String leaderId) {
577 return new AddServerReply(status, leaderId);
581 InitialOperationState newInitialOperationState(RaftActorServerConfigurationSupport support) {
582 return support.new InitialAddServerState(this);
586 String getLoggingContext() {
587 return getOperation().getNewServerId();
591 private abstract class RemoveServerState extends OperationState {
592 private final RemoveServerContext removeServerContext;
594 protected RemoveServerState(RemoveServerContext removeServerContext) {
595 this.removeServerContext = Preconditions.checkNotNull(removeServerContext);
599 public RemoveServerContext getRemoveServerContext() {
600 return removeServerContext;
604 private final class InitialRemoveServerState extends RemoveServerState implements InitialOperationState{
606 protected InitialRemoveServerState(RemoveServerContext removeServerContext) {
607 super(removeServerContext);
611 public void initiate() {
612 String serverId = getRemoveServerContext().getOperation().getServerId();
613 raftContext.removePeer(serverId);
614 ((AbstractLeader)raftActor.getCurrentBehavior()).removeFollower(serverId);
616 persistNewServerConfiguration(getRemoveServerContext());
620 private static class RemoveServerContext extends ServerOperationContext<RemoveServer> {
621 private final String peerAddress;
623 RemoveServerContext(RemoveServer operation, String peerAddress, ActorRef clientRequestor) {
624 super(operation, clientRequestor);
625 this.peerAddress = peerAddress;
629 Object newReply(ServerChangeStatus status, String leaderId) {
630 return new RemoveServerReply(status, leaderId);
634 InitialOperationState newInitialOperationState(RaftActorServerConfigurationSupport support) {
635 return support.new InitialRemoveServerState(this);
639 void operationComplete(RaftActor raftActor, boolean succeeded) {
640 if(peerAddress != null) {
641 raftActor.context().actorSelection(peerAddress).tell(new ServerRemoved(getOperation().getServerId()), raftActor.getSelf());
646 boolean includeSelfInNewConfiguration(RaftActor raftActor) {
647 return !getOperation().getServerId().equals(raftActor.getId());
651 String getLoggingContext() {
652 return getOperation().getServerId();
656 private static class ChangeServersVotingStatusContext extends ServerOperationContext<ChangeServersVotingStatus> {
657 private final boolean tryToElectLeader;
659 ChangeServersVotingStatusContext(ChangeServersVotingStatus convertMessage, ActorRef clientRequestor,
660 boolean tryToElectLeader) {
661 super(convertMessage, clientRequestor);
662 this.tryToElectLeader = tryToElectLeader;
666 InitialOperationState newInitialOperationState(RaftActorServerConfigurationSupport support) {
667 return support.new ChangeServersVotingStatusState(this, tryToElectLeader);
671 Object newReply(ServerChangeStatus status, String leaderId) {
672 return new ServerChangeReply(status, leaderId);
676 void operationComplete(final RaftActor raftActor, boolean succeeded) {
677 // If this leader changed to non-voting we need to step down as leader so we'll try to transfer
679 boolean localServerChangedToNonVoting = Boolean.FALSE.equals(getOperation().
680 getServerVotingStatusMap().get(raftActor.getRaftActorContext().getId()));
681 if (succeeded && localServerChangedToNonVoting) {
682 raftActor.becomeNonVoting();
687 String getLoggingContext() {
688 return getOperation().toString();
692 private class ChangeServersVotingStatusState extends OperationState implements InitialOperationState {
693 private final ChangeServersVotingStatusContext changeVotingStatusContext;
694 private final boolean tryToElectLeader;
696 ChangeServersVotingStatusState(ChangeServersVotingStatusContext changeVotingStatusContext,
697 boolean tryToElectLeader) {
698 this.changeVotingStatusContext = changeVotingStatusContext;
699 this.tryToElectLeader = tryToElectLeader;
703 public void initiate() {
704 LOG.debug("Initiating ChangeServersVotingStatusState");
706 if(tryToElectLeader) {
707 initiateLocalLeaderElection();
709 updateLocalPeerInfo();
711 persistNewServerConfiguration(changeVotingStatusContext);
715 private void initiateLocalLeaderElection() {
716 LOG.debug("{}: Sending local ElectionTimeout to start leader election", raftContext.getId());
718 ServerConfigurationPayload previousServerConfig = raftContext.getPeerServerInfo(true);
719 updateLocalPeerInfo();
721 raftContext.getActor().tell(ElectionTimeout.INSTANCE, raftContext.getActor());
723 currentOperationState = new WaitingForLeaderElected(changeVotingStatusContext, previousServerConfig);
726 private void updateLocalPeerInfo() {
727 List<ServerInfo> newServerInfoList = newServerInfoList();
729 raftContext.updatePeerIds(new ServerConfigurationPayload(newServerInfoList));
730 if(raftActor.getCurrentBehavior() instanceof AbstractLeader) {
731 AbstractLeader leader = (AbstractLeader) raftActor.getCurrentBehavior();
732 leader.updateMinReplicaCount();
736 private List<ServerInfo> newServerInfoList() {
737 Map<String, Boolean> serverVotingStatusMap = changeVotingStatusContext.getOperation().getServerVotingStatusMap();
738 List<ServerInfo> newServerInfoList = new ArrayList<>();
739 for(String peerId: raftContext.getPeerIds()) {
740 newServerInfoList.add(new ServerInfo(peerId, serverVotingStatusMap.containsKey(peerId) ?
741 serverVotingStatusMap.get(peerId) : raftContext.getPeerInfo(peerId).isVoting()));
744 newServerInfoList.add(new ServerInfo(raftContext.getId(), serverVotingStatusMap.containsKey(
745 raftContext.getId()) ? serverVotingStatusMap.get(raftContext.getId()) : raftContext.isVotingMember()));
747 return newServerInfoList;
751 private class WaitingForLeaderElected extends OperationState {
752 private final ServerConfigurationPayload previousServerConfig;
753 private final ChangeServersVotingStatusContext operationContext;
754 private final Cancellable timer;
756 WaitingForLeaderElected(ChangeServersVotingStatusContext operationContext,
757 ServerConfigurationPayload previousServerConfig) {
758 this.operationContext = operationContext;
759 this.previousServerConfig = previousServerConfig;
761 timer = newTimer(raftContext.getConfigParams().getElectionTimeOutInterval(),
762 new ServerOperationTimeout(operationContext.getLoggingContext()));
766 void onNewLeader(String newLeader) {
767 LOG.debug("{}: New leader {} elected", raftContext.getId(), newLeader);
771 if(raftActor.isLeader()) {
772 persistNewServerConfiguration(operationContext);
774 // Edge case - some other node became leader so forward the operation.
775 LOG.debug("{}: Forwarding {} to new leader", raftContext.getId(), operationContext.getOperation());
777 // Revert the local server config change.
778 raftContext.updatePeerIds(previousServerConfig);
781 RaftActorServerConfigurationSupport.this.onNewOperation(operationContext);
786 void onServerOperationTimeout(ServerOperationTimeout timeout) {
787 LOG.warn("{}: Leader election timed out - cannot apply operation {}",
788 raftContext.getId(), timeout.getLoggingContext());
790 // Revert the local server config change.
791 raftContext.updatePeerIds(previousServerConfig);
792 raftActor.initializeBehavior();
794 tryToForwardOperationToAnotherServer();
797 private void tryToForwardOperationToAnotherServer() {
798 Collection<String> serversVisited = new HashSet<>(operationContext.getOperation().getServersVisited());
800 LOG.debug("{}: tryToForwardOperationToAnotherServer - servers already visited {}", raftContext.getId(),
803 serversVisited.add(raftContext.getId());
805 // Try to find another whose state is being changed from non-voting to voting and that we haven't
807 Map<String, Boolean> serverVotingStatusMap = operationContext.getOperation().getServerVotingStatusMap();
808 ActorSelection forwardToPeerActor = null;
809 for(Map.Entry<String, Boolean> e: serverVotingStatusMap.entrySet()) {
810 Boolean isVoting = e.getValue();
811 String serverId = e.getKey();
812 PeerInfo peerInfo = raftContext.getPeerInfo(serverId);
813 if(isVoting && peerInfo != null && !peerInfo.isVoting() && !serversVisited.contains(serverId)) {
814 ActorSelection actor = raftContext.getPeerActorSelection(serverId);
816 forwardToPeerActor = actor;
822 if(forwardToPeerActor != null) {
823 LOG.debug("{}: Found server {} to forward to", raftContext.getId(), forwardToPeerActor);
825 forwardToPeerActor.tell(new ChangeServersVotingStatus(serverVotingStatusMap, serversVisited),
826 operationContext.getClientRequestor());
829 operationComplete(operationContext, ServerChangeStatus.NO_LEADER);
834 static class ServerOperationTimeout {
835 private final String loggingContext;
837 ServerOperationTimeout(String loggingContext){
838 this.loggingContext = Preconditions.checkNotNull(loggingContext, "loggingContext should not be null");
841 String getLoggingContext() {
842 return loggingContext;