2 * Copyright (c) 2015 Brocade Communications Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.controller.cluster.raft;
10 import akka.actor.ActorRef;
11 import akka.actor.ActorSelection;
12 import akka.actor.Cancellable;
13 import com.google.common.base.Preconditions;
14 import java.util.ArrayDeque;
15 import java.util.ArrayList;
16 import java.util.List;
18 import java.util.Queue;
19 import java.util.UUID;
20 import javax.annotation.Nullable;
21 import org.opendaylight.controller.cluster.raft.RaftActorLeadershipTransferCohort.OnComplete;
22 import org.opendaylight.controller.cluster.raft.ServerConfigurationPayload.ServerInfo;
23 import org.opendaylight.controller.cluster.raft.base.messages.ApplyState;
24 import org.opendaylight.controller.cluster.raft.base.messages.SnapshotComplete;
25 import org.opendaylight.controller.cluster.raft.behaviors.AbstractLeader;
26 import org.opendaylight.controller.cluster.raft.messages.AddServer;
27 import org.opendaylight.controller.cluster.raft.messages.AddServerReply;
28 import org.opendaylight.controller.cluster.raft.messages.ChangeServersVotingStatus;
29 import org.opendaylight.controller.cluster.raft.messages.RemoveServer;
30 import org.opendaylight.controller.cluster.raft.messages.RemoveServerReply;
31 import org.opendaylight.controller.cluster.raft.messages.ServerChangeReply;
32 import org.opendaylight.controller.cluster.raft.messages.ServerChangeStatus;
33 import org.opendaylight.controller.cluster.raft.messages.ServerRemoved;
34 import org.opendaylight.controller.cluster.raft.messages.UnInitializedFollowerSnapshotReply;
35 import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload;
36 import org.slf4j.Logger;
37 import org.slf4j.LoggerFactory;
40 * Handles server configuration related messages for a RaftActor.
42 * @author Thomas Pantelis
44 class RaftActorServerConfigurationSupport {
45 private static final Logger LOG = LoggerFactory.getLogger(RaftActorServerConfigurationSupport.class);
47 private final OperationState IDLE = new Idle();
49 private final RaftActor raftActor;
51 private final RaftActorContext raftContext;
53 private final Queue<ServerOperationContext<?>> pendingOperationsQueue = new ArrayDeque<>();
55 private OperationState currentOperationState = IDLE;
57 RaftActorServerConfigurationSupport(RaftActor raftActor) {
58 this.raftActor = raftActor;
59 this.raftContext = raftActor.getRaftActorContext();
62 boolean handleMessage(Object message, ActorRef sender) {
63 if(message instanceof AddServer) {
64 onAddServer((AddServer) message, sender);
66 } else if(message instanceof RemoveServer) {
67 onRemoveServer((RemoveServer) message, sender);
69 } else if(message instanceof ChangeServersVotingStatus) {
70 onChangeServersVotingStatus((ChangeServersVotingStatus) message, sender);
72 } else if (message instanceof ServerOperationTimeout) {
73 currentOperationState.onServerOperationTimeout((ServerOperationTimeout) message);
75 } else if (message instanceof UnInitializedFollowerSnapshotReply) {
76 currentOperationState.onUnInitializedFollowerSnapshotReply((UnInitializedFollowerSnapshotReply) message);
78 } else if(message instanceof ApplyState) {
79 return onApplyState((ApplyState) message);
80 } else if(message instanceof SnapshotComplete) {
81 currentOperationState.onSnapshotComplete();
88 private void onChangeServersVotingStatus(ChangeServersVotingStatus message, ActorRef sender) {
89 LOG.debug("{}: onChangeServersVotingStatus: {}, state: {}", raftContext.getId(), message,
90 currentOperationState);
92 onNewOperation(new ChangeServersVotingStatusContext(message, sender));
95 private void onRemoveServer(RemoveServer removeServer, ActorRef sender) {
96 LOG.debug("{}: onRemoveServer: {}, state: {}", raftContext.getId(), removeServer, currentOperationState);
97 boolean isSelf = removeServer.getServerId().equals(raftActor.getId());
98 if(isSelf && !raftContext.hasFollowers()) {
99 sender.tell(new RemoveServerReply(ServerChangeStatus.NOT_SUPPORTED, raftActor.getLeaderId()),
100 raftActor.getSelf());
101 } else if(!isSelf && !raftContext.getPeerIds().contains(removeServer.getServerId())) {
102 sender.tell(new RemoveServerReply(ServerChangeStatus.DOES_NOT_EXIST, raftActor.getLeaderId()),
103 raftActor.getSelf());
105 String serverAddress = isSelf ? raftActor.self().path().toString() :
106 raftContext.getPeerAddress(removeServer.getServerId());
107 onNewOperation(new RemoveServerContext(removeServer, serverAddress, sender));
111 private boolean onApplyState(ApplyState applyState) {
112 Payload data = applyState.getReplicatedLogEntry().getData();
113 if(data instanceof ServerConfigurationPayload) {
114 currentOperationState.onApplyState(applyState);
122 * The algorithm for AddServer is as follows:
124 * <li>Add the new server as a peer.</li>
125 * <li>Add the new follower to the leader.</li>
126 * <li>If new server should be voting member</li>
128 * <li>Initialize FollowerState to VOTING_NOT_INITIALIZED.</li>
129 * <li>Initiate install snapshot to the new follower.</li>
130 * <li>When install snapshot complete, mark the follower as VOTING and re-calculate majority vote count.</li>
132 * <li>Persist and replicate ServerConfigurationPayload with the new server list.</li>
133 * <li>On replication consensus, respond to caller with OK.</li>
135 * If the install snapshot times out after a period of 2 * election time out
137 * <li>Remove the new server as a peer.</li>
138 * <li>Remove the new follower from the leader.</li>
139 * <li>Respond to caller with TIMEOUT.</li>
142 private void onAddServer(AddServer addServer, ActorRef sender) {
143 LOG.debug("{}: onAddServer: {}, state: {}", raftContext.getId(), addServer, currentOperationState);
145 onNewOperation(new AddServerContext(addServer, sender));
148 private void onNewOperation(ServerOperationContext<?> operationContext) {
149 if (raftActor.isLeader()) {
150 currentOperationState.onNewOperation(operationContext);
152 ActorSelection leader = raftActor.getLeader();
153 if (leader != null) {
154 LOG.debug("{}: Not leader - forwarding to leader {}", raftContext.getId(), leader);
155 leader.forward(operationContext.getOperation(), raftActor.getContext());
157 LOG.debug("{}: No leader - returning NO_LEADER reply", raftContext.getId());
158 operationContext.getClientRequestor().tell(operationContext.newReply(
159 ServerChangeStatus.NO_LEADER, null), raftActor.self());
165 * Interface for the initial state for a server operation.
167 private interface InitialOperationState {
172 * Abstract base class for a server operation FSM state. Handles common behavior for all states.
174 private abstract class OperationState {
175 void onNewOperation(ServerOperationContext<?> operationContext) {
176 // We're currently processing another operation so queue it to be processed later.
178 LOG.debug("{}: Server operation already in progress - queueing {}", raftContext.getId(),
179 operationContext.getOperation());
181 pendingOperationsQueue.add(operationContext);
184 void onServerOperationTimeout(ServerOperationTimeout timeout) {
185 LOG.debug("onServerOperationTimeout should not be called in state {}", this);
188 void onUnInitializedFollowerSnapshotReply(UnInitializedFollowerSnapshotReply reply) {
189 LOG.debug("onUnInitializedFollowerSnapshotReply was called in state {}", this);
192 void onApplyState(ApplyState applyState) {
193 LOG.debug("onApplyState was called in state {}", this);
196 void onSnapshotComplete() {
200 protected void persistNewServerConfiguration(ServerOperationContext<?> operationContext){
201 raftContext.setDynamicServerConfigurationInUse();
203 ServerConfigurationPayload payload = raftContext.getPeerServerInfo(
204 operationContext.includeSelfInNewConfiguration(raftActor));
205 LOG.debug("{}: New server configuration : {}", raftContext.getId(), payload.getServerConfig());
207 raftActor.persistData(operationContext.getClientRequestor(), operationContext.getContextId(), payload);
209 currentOperationState = new Persisting(operationContext, newTimer(new ServerOperationTimeout(
210 operationContext.getLoggingContext())));
212 sendReply(operationContext, ServerChangeStatus.OK);
215 protected void operationComplete(ServerOperationContext<?> operationContext, @Nullable ServerChangeStatus replyStatus) {
216 if(replyStatus != null) {
217 sendReply(operationContext, replyStatus);
220 operationContext.operationComplete(raftActor, replyStatus == null || replyStatus == ServerChangeStatus.OK);
222 currentOperationState = IDLE;
224 ServerOperationContext<?> nextOperation = pendingOperationsQueue.poll();
225 if(nextOperation != null) {
226 RaftActorServerConfigurationSupport.this.onNewOperation(nextOperation);
230 protected void sendReply(ServerOperationContext<?> operationContext, ServerChangeStatus status) {
231 LOG.debug("{}: Returning {} for operation {}", raftContext.getId(), status, operationContext.getOperation());
233 operationContext.getClientRequestor().tell(operationContext.newReply(status, raftActor.getLeaderId()),
237 Cancellable newTimer(Object message) {
238 return raftContext.getActorSystem().scheduler().scheduleOnce(
239 raftContext.getConfigParams().getElectionTimeOutInterval().$times(2), raftContext.getActor(), message,
240 raftContext.getActorSystem().dispatcher(), raftContext.getActor());
244 public String toString() {
245 return getClass().getSimpleName();
250 * The state when no server operation is in progress. It immediately initiates new server operations.
252 private final class Idle extends OperationState {
254 public void onNewOperation(ServerOperationContext<?> operationContext) {
255 operationContext.newInitialOperationState(RaftActorServerConfigurationSupport.this).initiate();
259 public void onApplyState(ApplyState applyState) {
260 // Noop - we override b/c ApplyState is called normally for followers in the idle state.
265 * The state when a new server configuration is being persisted and replicated.
267 private final class Persisting extends OperationState {
268 private final ServerOperationContext<?> operationContext;
269 private final Cancellable timer;
270 private boolean timedOut = false;
272 Persisting(ServerOperationContext<?> operationContext, Cancellable timer) {
273 this.operationContext = operationContext;
278 public void onApplyState(ApplyState applyState) {
279 // Sanity check - we could get an ApplyState from a previous operation that timed out so make
280 // sure it's meant for us.
281 if(operationContext.getContextId().equals(applyState.getIdentifier())) {
282 LOG.info("{}: {} has been successfully replicated to a majority of followers", raftActor.getId(),
283 applyState.getReplicatedLogEntry().getData());
286 operationComplete(operationContext, null);
291 public void onServerOperationTimeout(ServerOperationTimeout timeout) {
292 LOG.warn("{}: Timeout occured while replicating the new server configuration for {}", raftContext.getId(),
293 timeout.getLoggingContext());
297 // Fail any pending operations
298 ServerOperationContext<?> nextOperation = pendingOperationsQueue.poll();
299 while(nextOperation != null) {
300 sendReply(nextOperation, ServerChangeStatus.PRIOR_REQUEST_CONSENSUS_TIMEOUT);
301 nextOperation = pendingOperationsQueue.poll();
306 public void onNewOperation(ServerOperationContext<?> operationContext) {
308 sendReply(operationContext, ServerChangeStatus.PRIOR_REQUEST_CONSENSUS_TIMEOUT);
310 super.onNewOperation(operationContext);
316 * Abstract base class for an AddServer operation state.
318 private abstract class AddServerState extends OperationState {
319 private final AddServerContext addServerContext;
321 AddServerState(AddServerContext addServerContext) {
322 this.addServerContext = addServerContext;
325 AddServerContext getAddServerContext() {
326 return addServerContext;
329 Cancellable newInstallSnapshotTimer() {
330 return newTimer(new ServerOperationTimeout(addServerContext.getOperation().getNewServerId()));
333 void handleInstallSnapshotTimeout(ServerOperationTimeout timeout) {
334 String serverId = timeout.getLoggingContext();
336 LOG.debug("{}: handleInstallSnapshotTimeout for new server {}", raftContext.getId(), serverId);
339 raftContext.removePeer(serverId);
341 boolean isLeader = raftActor.isLeader();
343 AbstractLeader leader = (AbstractLeader) raftActor.getCurrentBehavior();
344 leader.removeFollower(serverId);
347 operationComplete(getAddServerContext(), isLeader ? ServerChangeStatus.TIMEOUT : ServerChangeStatus.NO_LEADER);
353 * The initial state for the AddServer operation. It adds the new follower as a peer and initiates
354 * snapshot capture, if necessary.
356 private final class InitialAddServerState extends AddServerState implements InitialOperationState {
357 InitialAddServerState(AddServerContext addServerContext) {
358 super(addServerContext);
362 public void initiate() {
363 AbstractLeader leader = (AbstractLeader) raftActor.getCurrentBehavior();
364 AddServer addServer = getAddServerContext().getOperation();
366 LOG.debug("{}: Initiating {}", raftContext.getId(), addServer);
368 if(raftContext.getPeerInfo(addServer.getNewServerId()) != null) {
369 operationComplete(getAddServerContext(), ServerChangeStatus.ALREADY_EXISTS);
373 VotingState votingState = addServer.isVotingMember() ? VotingState.VOTING_NOT_INITIALIZED :
374 VotingState.NON_VOTING;
375 raftContext.addToPeers(addServer.getNewServerId(), addServer.getNewServerAddress(), votingState);
377 leader.addFollower(addServer.getNewServerId());
379 if(votingState == VotingState.VOTING_NOT_INITIALIZED){
380 // schedule the install snapshot timeout timer
381 Cancellable installSnapshotTimer = newInstallSnapshotTimer();
382 if(leader.initiateCaptureSnapshot(addServer.getNewServerId())) {
383 LOG.debug("{}: Initiating capture snapshot for new server {}", raftContext.getId(),
384 addServer.getNewServerId());
386 currentOperationState = new InstallingSnapshot(getAddServerContext(), installSnapshotTimer);
388 LOG.debug("{}: Snapshot already in progress - waiting for completion", raftContext.getId());
390 currentOperationState = new WaitingForPriorSnapshotComplete(getAddServerContext(),
391 installSnapshotTimer);
394 LOG.debug("{}: New follower is non-voting - directly persisting new server configuration",
395 raftContext.getId());
397 persistNewServerConfiguration(getAddServerContext());
403 * The AddServer operation state for when the catch-up snapshot is being installed. It handles successful
406 private final class InstallingSnapshot extends AddServerState {
407 private final Cancellable installSnapshotTimer;
409 InstallingSnapshot(AddServerContext addServerContext, Cancellable installSnapshotTimer) {
410 super(addServerContext);
411 this.installSnapshotTimer = Preconditions.checkNotNull(installSnapshotTimer);
415 public void onServerOperationTimeout(ServerOperationTimeout timeout) {
416 handleInstallSnapshotTimeout(timeout);
418 LOG.warn("{}: Timeout occured for new server {} while installing snapshot", raftContext.getId(),
419 timeout.getLoggingContext());
423 public void onUnInitializedFollowerSnapshotReply(UnInitializedFollowerSnapshotReply reply) {
424 LOG.debug("{}: onUnInitializedFollowerSnapshotReply: {}", raftContext.getId(), reply);
426 String followerId = reply.getFollowerId();
428 // Sanity check to guard against receiving an UnInitializedFollowerSnapshotReply from a prior
429 // add server operation that timed out.
430 if(getAddServerContext().getOperation().getNewServerId().equals(followerId) && raftActor.isLeader()) {
431 AbstractLeader leader = (AbstractLeader) raftActor.getCurrentBehavior();
432 raftContext.getPeerInfo(followerId).setVotingState(VotingState.VOTING);
433 leader.updateMinReplicaCount();
435 persistNewServerConfiguration(getAddServerContext());
437 installSnapshotTimer.cancel();
439 LOG.debug("{}: Dropping UnInitializedFollowerSnapshotReply for server {}: {}",
440 raftContext.getId(), followerId,
441 !raftActor.isLeader() ? "not leader" : "server Id doesn't match");
447 * The AddServer operation state for when there is a snapshot already in progress. When the current
448 * snapshot completes, it initiates an install snapshot.
450 private final class WaitingForPriorSnapshotComplete extends AddServerState {
451 private final Cancellable snapshotTimer;
453 WaitingForPriorSnapshotComplete(AddServerContext addServerContext, Cancellable snapshotTimer) {
454 super(addServerContext);
455 this.snapshotTimer = Preconditions.checkNotNull(snapshotTimer);
459 public void onSnapshotComplete() {
460 LOG.debug("{}: onSnapshotComplete", raftContext.getId());
462 if(!raftActor.isLeader()) {
463 LOG.debug("{}: No longer the leader", raftContext.getId());
467 AbstractLeader leader = (AbstractLeader) raftActor.getCurrentBehavior();
468 if(leader.initiateCaptureSnapshot(getAddServerContext().getOperation().getNewServerId())) {
469 LOG.debug("{}: Initiating capture snapshot for new server {}", raftContext.getId(),
470 getAddServerContext().getOperation().getNewServerId());
472 currentOperationState = new InstallingSnapshot(getAddServerContext(),
473 newInstallSnapshotTimer());
475 snapshotTimer.cancel();
480 public void onServerOperationTimeout(ServerOperationTimeout timeout) {
481 handleInstallSnapshotTimeout(timeout);
483 LOG.warn("{}: Timeout occured for new server {} while waiting for prior snapshot to complete",
484 raftContext.getId(), timeout.getLoggingContext());
489 * Stores context information for a server operation.
491 * @param <T> the operation type
493 private static abstract class ServerOperationContext<T> {
494 private final T operation;
495 private final ActorRef clientRequestor;
496 private final String contextId;
498 ServerOperationContext(T operation, ActorRef clientRequestor){
499 this.operation = operation;
500 this.clientRequestor = clientRequestor;
501 contextId = UUID.randomUUID().toString();
504 String getContextId() {
512 ActorRef getClientRequestor() {
513 return clientRequestor;
516 void operationComplete(RaftActor raftActor, boolean succeeded) {
519 boolean includeSelfInNewConfiguration(RaftActor raftActor) {
523 abstract Object newReply(ServerChangeStatus status, String leaderId);
525 abstract InitialOperationState newInitialOperationState(RaftActorServerConfigurationSupport support);
527 abstract String getLoggingContext();
531 * Stores context information for an AddServer operation.
533 private static class AddServerContext extends ServerOperationContext<AddServer> {
534 AddServerContext(AddServer addServer, ActorRef clientRequestor) {
535 super(addServer, clientRequestor);
539 Object newReply(ServerChangeStatus status, String leaderId) {
540 return new AddServerReply(status, leaderId);
544 InitialOperationState newInitialOperationState(RaftActorServerConfigurationSupport support) {
545 return support.new InitialAddServerState(this);
549 String getLoggingContext() {
550 return getOperation().getNewServerId();
554 private abstract class RemoveServerState extends OperationState {
555 private final RemoveServerContext removeServerContext;
557 protected RemoveServerState(RemoveServerContext removeServerContext) {
558 this.removeServerContext = Preconditions.checkNotNull(removeServerContext);
562 public RemoveServerContext getRemoveServerContext() {
563 return removeServerContext;
567 private final class InitialRemoveServerState extends RemoveServerState implements InitialOperationState{
569 protected InitialRemoveServerState(RemoveServerContext removeServerContext) {
570 super(removeServerContext);
574 public void initiate() {
575 String serverId = getRemoveServerContext().getOperation().getServerId();
576 raftContext.removePeer(serverId);
577 ((AbstractLeader)raftActor.getCurrentBehavior()).removeFollower(serverId);
579 persistNewServerConfiguration(getRemoveServerContext());
583 private static class RemoveServerContext extends ServerOperationContext<RemoveServer> {
584 private final String peerAddress;
586 RemoveServerContext(RemoveServer operation, String peerAddress, ActorRef clientRequestor) {
587 super(operation, clientRequestor);
588 this.peerAddress = peerAddress;
592 Object newReply(ServerChangeStatus status, String leaderId) {
593 return new RemoveServerReply(status, leaderId);
597 InitialOperationState newInitialOperationState(RaftActorServerConfigurationSupport support) {
598 return support.new InitialRemoveServerState(this);
602 void operationComplete(RaftActor raftActor, boolean succeeded) {
603 if(peerAddress != null) {
604 raftActor.context().actorSelection(peerAddress).tell(new ServerRemoved(getOperation().getServerId()), raftActor.getSelf());
609 boolean includeSelfInNewConfiguration(RaftActor raftActor) {
610 return !getOperation().getServerId().equals(raftActor.getId());
614 String getLoggingContext() {
615 return getOperation().getServerId();
619 private static class ChangeServersVotingStatusContext extends ServerOperationContext<ChangeServersVotingStatus> {
620 ChangeServersVotingStatusContext(ChangeServersVotingStatus convertMessage, ActorRef clientRequestor) {
621 super(convertMessage, clientRequestor);
625 InitialOperationState newInitialOperationState(RaftActorServerConfigurationSupport support) {
626 return support.new ChangeServersVotingStatusState(this);
630 Object newReply(ServerChangeStatus status, String leaderId) {
631 return new ServerChangeReply(status, leaderId);
635 void operationComplete(final RaftActor raftActor, boolean succeeded) {
636 // If this leader changed to non-voting we need to step down as leader so we'll try to transfer
638 boolean localServerChangedToNonVoting = Boolean.FALSE.equals(getOperation().
639 getServerVotingStatusMap().get(raftActor.getRaftActorContext().getId()));
640 if(succeeded && localServerChangedToNonVoting && raftActor.isLeader()) {
641 raftActor.initiateLeadershipTransfer(new OnComplete() {
643 public void onSuccess(ActorRef raftActorRef, ActorRef replyTo) {
644 LOG.debug("{}: leader transfer succeeded after change to non-voting", raftActor.persistenceId());
645 ensureFollowerState(raftActor);
649 public void onFailure(ActorRef raftActorRef, ActorRef replyTo) {
650 LOG.debug("{}: leader transfer failed after change to non-voting", raftActor.persistenceId());
651 ensureFollowerState(raftActor);
654 private void ensureFollowerState(RaftActor raftActor) {
655 // Whether or not leadership transfer succeeded, we have to step down as leader and
656 // switch to Follower so ensure that.
657 if(raftActor.getRaftState() != RaftState.Follower) {
658 raftActor.initializeBehavior();
666 String getLoggingContext() {
667 return getOperation().getServerVotingStatusMap().toString();
671 private class ChangeServersVotingStatusState extends OperationState implements InitialOperationState {
672 private final ChangeServersVotingStatusContext changeVotingStatusContext;
674 ChangeServersVotingStatusState(ChangeServersVotingStatusContext changeVotingStatusContext) {
675 this.changeVotingStatusContext = changeVotingStatusContext;
679 public void initiate() {
680 LOG.debug("Initiating ChangeServersVotingStatusState");
682 Map<String, Boolean> serverVotingStatusMap = changeVotingStatusContext.getOperation().getServerVotingStatusMap();
683 List<ServerInfo> newServerInfoList = new ArrayList<>();
684 for(String peerId: raftContext.getPeerIds()) {
685 newServerInfoList.add(new ServerInfo(peerId, serverVotingStatusMap.containsKey(peerId) ?
686 serverVotingStatusMap.get(peerId) : raftContext.getPeerInfo(peerId).isVoting()));
689 newServerInfoList.add(new ServerInfo(raftContext.getId(), serverVotingStatusMap.containsKey(
690 raftContext.getId()) ? serverVotingStatusMap.get(raftContext.getId()) : raftContext.isVotingMember()));
692 raftContext.updatePeerIds(new ServerConfigurationPayload(newServerInfoList));
693 AbstractLeader leader = (AbstractLeader) raftActor.getCurrentBehavior();
694 leader.updateMinReplicaCount();
696 persistNewServerConfiguration(changeVotingStatusContext);
700 static class ServerOperationTimeout {
701 private final String loggingContext;
703 ServerOperationTimeout(String loggingContext){
704 this.loggingContext = Preconditions.checkNotNull(loggingContext, "loggingContext should not be null");
707 String getLoggingContext() {
708 return loggingContext;