2 * Copyright (c) 2015 Brocade Communications Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.controller.cluster.raft;
10 import akka.actor.ActorRef;
11 import akka.actor.ActorSelection;
12 import akka.actor.Cancellable;
13 import com.google.common.base.Preconditions;
14 import java.util.ArrayDeque;
15 import java.util.Queue;
16 import java.util.UUID;
17 import javax.annotation.Nullable;
18 import org.opendaylight.controller.cluster.raft.base.messages.ApplyState;
19 import org.opendaylight.controller.cluster.raft.base.messages.SnapshotComplete;
20 import org.opendaylight.controller.cluster.raft.behaviors.AbstractLeader;
21 import org.opendaylight.controller.cluster.raft.messages.AddServer;
22 import org.opendaylight.controller.cluster.raft.messages.AddServerReply;
23 import org.opendaylight.controller.cluster.raft.messages.RemoveServer;
24 import org.opendaylight.controller.cluster.raft.messages.RemoveServerReply;
25 import org.opendaylight.controller.cluster.raft.messages.ServerChangeStatus;
26 import org.opendaylight.controller.cluster.raft.messages.ServerRemoved;
27 import org.opendaylight.controller.cluster.raft.messages.UnInitializedFollowerSnapshotReply;
28 import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload;
29 import org.slf4j.Logger;
30 import org.slf4j.LoggerFactory;
33 * Handles server configuration related messages for a RaftActor.
35 * @author Thomas Pantelis
37 class RaftActorServerConfigurationSupport {
38 private static final Logger LOG = LoggerFactory.getLogger(RaftActorServerConfigurationSupport.class);
40 private final OperationState IDLE = new Idle();
42 private final RaftActor raftActor;
44 private final RaftActorContext raftContext;
46 private final Queue<ServerOperationContext<?>> pendingOperationsQueue = new ArrayDeque<>();
48 private OperationState currentOperationState = IDLE;
50 RaftActorServerConfigurationSupport(RaftActor raftActor) {
51 this.raftActor = raftActor;
52 this.raftContext = raftActor.getRaftActorContext();
55 boolean handleMessage(Object message, ActorRef sender) {
56 if(message instanceof AddServer) {
57 onAddServer((AddServer) message, sender);
59 } else if(message instanceof RemoveServer) {
60 onRemoveServer((RemoveServer) message, sender);
62 } else if (message instanceof ServerOperationTimeout) {
63 currentOperationState.onServerOperationTimeout((ServerOperationTimeout) message);
65 } else if (message instanceof UnInitializedFollowerSnapshotReply) {
66 currentOperationState.onUnInitializedFollowerSnapshotReply((UnInitializedFollowerSnapshotReply) message);
68 } else if(message instanceof ApplyState) {
69 return onApplyState((ApplyState) message);
70 } else if(message instanceof SnapshotComplete) {
71 currentOperationState.onSnapshotComplete();
78 private void onRemoveServer(RemoveServer removeServer, ActorRef sender) {
79 LOG.debug("{}: onRemoveServer: {}, state: {}", raftContext.getId(), removeServer, currentOperationState);
80 boolean isSelf = removeServer.getServerId().equals(raftActor.getId());
81 if(isSelf && !raftContext.hasFollowers()) {
82 sender.tell(new RemoveServerReply(ServerChangeStatus.NOT_SUPPORTED, raftActor.getLeaderId()),
84 } else if(!isSelf && !raftContext.getPeerIds().contains(removeServer.getServerId())) {
85 sender.tell(new RemoveServerReply(ServerChangeStatus.DOES_NOT_EXIST, raftActor.getLeaderId()),
88 String serverAddress = isSelf ? raftActor.self().path().toString() :
89 raftContext.getPeerAddress(removeServer.getServerId());
90 onNewOperation(new RemoveServerContext(removeServer, serverAddress, sender));
94 private boolean onApplyState(ApplyState applyState) {
95 Payload data = applyState.getReplicatedLogEntry().getData();
96 if(data instanceof ServerConfigurationPayload) {
97 currentOperationState.onApplyState(applyState);
105 * The algorithm for AddServer is as follows:
107 * <li>Add the new server as a peer.</li>
108 * <li>Add the new follower to the leader.</li>
109 * <li>If new server should be voting member</li>
111 * <li>Initialize FollowerState to VOTING_NOT_INITIALIZED.</li>
112 * <li>Initiate install snapshot to the new follower.</li>
113 * <li>When install snapshot complete, mark the follower as VOTING and re-calculate majority vote count.</li>
115 * <li>Persist and replicate ServerConfigurationPayload with the new server list.</li>
116 * <li>On replication consensus, respond to caller with OK.</li>
118 * If the install snapshot times out after a period of 2 * election time out
120 * <li>Remove the new server as a peer.</li>
121 * <li>Remove the new follower from the leader.</li>
122 * <li>Respond to caller with TIMEOUT.</li>
125 private void onAddServer(AddServer addServer, ActorRef sender) {
126 LOG.debug("{}: onAddServer: {}, state: {}", raftContext.getId(), addServer, currentOperationState);
128 onNewOperation(new AddServerContext(addServer, sender));
131 private void onNewOperation(ServerOperationContext<?> operationContext) {
132 if (raftActor.isLeader()) {
133 currentOperationState.onNewOperation(operationContext);
135 ActorSelection leader = raftActor.getLeader();
136 if (leader != null) {
137 LOG.debug("{}: Not leader - forwarding to leader {}", raftContext.getId(), leader);
138 leader.forward(operationContext.getOperation(), raftActor.getContext());
140 LOG.debug("{}: No leader - returning NO_LEADER reply", raftContext.getId());
141 operationContext.getClientRequestor().tell(operationContext.newReply(
142 ServerChangeStatus.NO_LEADER, null), raftActor.self());
148 * Interface for a server operation FSM state.
150 private interface OperationState {
151 void onNewOperation(ServerOperationContext<?> operationContext);
153 void onServerOperationTimeout(ServerOperationTimeout timeout);
155 void onUnInitializedFollowerSnapshotReply(UnInitializedFollowerSnapshotReply reply);
157 void onApplyState(ApplyState applyState);
159 void onSnapshotComplete();
163 * Interface for the initial state for a server operation.
165 private interface InitialOperationState {
170 * Abstract base class for a server operation FSM state. Handles common behavior for all states.
172 private abstract class AbstractOperationState implements OperationState {
174 public void onNewOperation(ServerOperationContext<?> operationContext) {
175 // We're currently processing another operation so queue it to be processed later.
177 LOG.debug("{}: Server operation already in progress - queueing {}", raftContext.getId(),
178 operationContext.getOperation());
180 pendingOperationsQueue.add(operationContext);
184 public void onServerOperationTimeout(ServerOperationTimeout timeout) {
185 LOG.debug("onServerOperationTimeout should not be called in state {}", this);
189 public void onUnInitializedFollowerSnapshotReply(UnInitializedFollowerSnapshotReply reply) {
190 LOG.debug("onUnInitializedFollowerSnapshotReply was called in state {}", this);
194 public void onApplyState(ApplyState applyState) {
195 LOG.debug("onApplyState was called in state {}", this);
199 public void onSnapshotComplete() {
202 protected void persistNewServerConfiguration(ServerOperationContext<?> operationContext){
203 raftContext.setDynamicServerConfigurationInUse();
205 boolean includeSelf = !operationContext.getServerId().equals(raftActor.getId());
206 ServerConfigurationPayload payload = raftContext.getPeerServerInfo(includeSelf);
207 LOG.debug("{}: New server configuration : {}", raftContext.getId(), payload.getServerConfig());
209 raftActor.persistData(operationContext.getClientRequestor(), operationContext.getContextId(), payload);
211 currentOperationState = new Persisting(operationContext, newTimer(new ServerOperationTimeout(operationContext.getServerId())));
213 sendReply(operationContext, ServerChangeStatus.OK);
216 protected void operationComplete(ServerOperationContext<?> operationContext, @Nullable ServerChangeStatus replyStatus) {
217 if(replyStatus != null) {
218 sendReply(operationContext, replyStatus);
221 operationContext.operationComplete(raftActor, replyStatus);
223 currentOperationState = IDLE;
225 ServerOperationContext<?> nextOperation = pendingOperationsQueue.poll();
226 if(nextOperation != null) {
227 RaftActorServerConfigurationSupport.this.onNewOperation(nextOperation);
231 protected void sendReply(ServerOperationContext<?> operationContext, ServerChangeStatus status) {
232 LOG.debug("{}: Returning {} for operation {}", raftContext.getId(), status, operationContext.getOperation());
234 operationContext.getClientRequestor().tell(operationContext.newReply(status, raftActor.getLeaderId()),
238 Cancellable newTimer(Object message) {
239 return raftContext.getActorSystem().scheduler().scheduleOnce(
240 raftContext.getConfigParams().getElectionTimeOutInterval().$times(2), raftContext.getActor(), message,
241 raftContext.getActorSystem().dispatcher(), raftContext.getActor());
245 public String toString() {
246 return getClass().getSimpleName();
251 * The state when no server operation is in progress. It immediately initiates new server operations.
253 private class Idle extends AbstractOperationState {
255 public void onNewOperation(ServerOperationContext<?> operationContext) {
256 operationContext.newInitialOperationState(RaftActorServerConfigurationSupport.this).initiate();
260 public void onApplyState(ApplyState applyState) {
261 // Noop - we override b/c ApplyState is called normally for followers in the idle state.
266 * The state when a new server configuration is being persisted and replicated.
268 private class Persisting extends AbstractOperationState {
269 private final ServerOperationContext<?> operationContext;
270 private final Cancellable timer;
271 private boolean timedOut = false;
273 Persisting(ServerOperationContext<?> operationContext, Cancellable timer) {
274 this.operationContext = operationContext;
279 public void onApplyState(ApplyState applyState) {
280 // Sanity check - we could get an ApplyState from a previous operation that timed out so make
281 // sure it's meant for us.
282 if(operationContext.getContextId().equals(applyState.getIdentifier())) {
283 LOG.info("{}: {} has been successfully replicated to a majority of followers", raftActor.getId(),
284 applyState.getReplicatedLogEntry().getData());
287 operationComplete(operationContext, null);
292 public void onServerOperationTimeout(ServerOperationTimeout timeout) {
293 LOG.warn("{}: Timeout occured while replicating the new server configuration for {}", raftContext.getId(),
294 timeout.getServerId());
298 // Fail any pending operations
299 ServerOperationContext<?> nextOperation = pendingOperationsQueue.poll();
300 while(nextOperation != null) {
301 sendReply(nextOperation, ServerChangeStatus.PRIOR_REQUEST_CONSENSUS_TIMEOUT);
302 nextOperation = pendingOperationsQueue.poll();
307 public void onNewOperation(ServerOperationContext<?> operationContext) {
309 sendReply(operationContext, ServerChangeStatus.PRIOR_REQUEST_CONSENSUS_TIMEOUT);
311 super.onNewOperation(operationContext);
317 * Abstract base class for an AddServer operation state.
319 private abstract class AddServerState extends AbstractOperationState {
320 private final AddServerContext addServerContext;
322 AddServerState(AddServerContext addServerContext) {
323 this.addServerContext = addServerContext;
326 AddServerContext getAddServerContext() {
327 return addServerContext;
330 Cancellable newInstallSnapshotTimer() {
331 return newTimer(new ServerOperationTimeout(addServerContext.getOperation().getNewServerId()));
334 void handleInstallSnapshotTimeout(ServerOperationTimeout timeout) {
335 String serverId = timeout.getServerId();
337 LOG.debug("{}: handleInstallSnapshotTimeout for new server {}", raftContext.getId(), serverId);
340 raftContext.removePeer(serverId);
342 boolean isLeader = raftActor.isLeader();
344 AbstractLeader leader = (AbstractLeader) raftActor.getCurrentBehavior();
345 leader.removeFollower(serverId);
348 operationComplete(getAddServerContext(), isLeader ? ServerChangeStatus.TIMEOUT : ServerChangeStatus.NO_LEADER);
354 * The initial state for the AddServer operation. It adds the new follower as a peer and initiates
355 * snapshot capture, if necessary.
357 private class InitialAddServerState extends AddServerState implements InitialOperationState {
358 InitialAddServerState(AddServerContext addServerContext) {
359 super(addServerContext);
363 public void initiate() {
364 AbstractLeader leader = (AbstractLeader) raftActor.getCurrentBehavior();
365 AddServer addServer = getAddServerContext().getOperation();
367 LOG.debug("{}: Initiating {}", raftContext.getId(), addServer);
369 if(raftContext.getPeerInfo(addServer.getNewServerId()) != null) {
370 operationComplete(getAddServerContext(), ServerChangeStatus.ALREADY_EXISTS);
374 VotingState votingState = addServer.isVotingMember() ? VotingState.VOTING_NOT_INITIALIZED :
375 VotingState.NON_VOTING;
376 raftContext.addToPeers(addServer.getNewServerId(), addServer.getNewServerAddress(), votingState);
378 leader.addFollower(addServer.getNewServerId());
380 if(votingState == VotingState.VOTING_NOT_INITIALIZED){
381 // schedule the install snapshot timeout timer
382 Cancellable installSnapshotTimer = newInstallSnapshotTimer();
383 if(leader.initiateCaptureSnapshot(addServer.getNewServerId())) {
384 LOG.debug("{}: Initiating capture snapshot for new server {}", raftContext.getId(),
385 addServer.getNewServerId());
387 currentOperationState = new InstallingSnapshot(getAddServerContext(), installSnapshotTimer);
389 LOG.debug("{}: Snapshot already in progress - waiting for completion", raftContext.getId());
391 currentOperationState = new WaitingForPriorSnapshotComplete(getAddServerContext(),
392 installSnapshotTimer);
395 LOG.debug("{}: New follower is non-voting - directly persisting new server configuration",
396 raftContext.getId());
398 persistNewServerConfiguration(getAddServerContext());
404 * The AddServer operation state for when the catch-up snapshot is being installed. It handles successful
407 private class InstallingSnapshot extends AddServerState {
408 private final Cancellable installSnapshotTimer;
410 InstallingSnapshot(AddServerContext addServerContext, Cancellable installSnapshotTimer) {
411 super(addServerContext);
412 this.installSnapshotTimer = Preconditions.checkNotNull(installSnapshotTimer);
416 public void onServerOperationTimeout(ServerOperationTimeout timeout) {
417 handleInstallSnapshotTimeout(timeout);
419 LOG.warn("{}: Timeout occured for new server {} while installing snapshot", raftContext.getId(),
420 timeout.getServerId());
424 public void onUnInitializedFollowerSnapshotReply(UnInitializedFollowerSnapshotReply reply) {
425 LOG.debug("{}: onUnInitializedFollowerSnapshotReply: {}", raftContext.getId(), reply);
427 String followerId = reply.getFollowerId();
429 // Sanity check to guard against receiving an UnInitializedFollowerSnapshotReply from a prior
430 // add server operation that timed out.
431 if(getAddServerContext().getOperation().getNewServerId().equals(followerId) && raftActor.isLeader()) {
432 AbstractLeader leader = (AbstractLeader) raftActor.getCurrentBehavior();
433 raftContext.getPeerInfo(followerId).setVotingState(VotingState.VOTING);
434 leader.updateMinReplicaCount();
436 persistNewServerConfiguration(getAddServerContext());
438 installSnapshotTimer.cancel();
440 LOG.debug("{}: Dropping UnInitializedFollowerSnapshotReply for server {}: {}",
441 raftContext.getId(), followerId,
442 !raftActor.isLeader() ? "not leader" : "server Id doesn't match");
448 * The AddServer operation state for when there is a snapshot already in progress. When the current
449 * snapshot completes, it initiates an install snapshot.
451 private class WaitingForPriorSnapshotComplete extends AddServerState {
452 private final Cancellable snapshotTimer;
454 WaitingForPriorSnapshotComplete(AddServerContext addServerContext, Cancellable snapshotTimer) {
455 super(addServerContext);
456 this.snapshotTimer = Preconditions.checkNotNull(snapshotTimer);
460 public void onSnapshotComplete() {
461 LOG.debug("{}: onSnapshotComplete", raftContext.getId());
463 if(!raftActor.isLeader()) {
464 LOG.debug("{}: No longer the leader", raftContext.getId());
468 AbstractLeader leader = (AbstractLeader) raftActor.getCurrentBehavior();
469 if(leader.initiateCaptureSnapshot(getAddServerContext().getOperation().getNewServerId())) {
470 LOG.debug("{}: Initiating capture snapshot for new server {}", raftContext.getId(),
471 getAddServerContext().getOperation().getNewServerId());
473 currentOperationState = new InstallingSnapshot(getAddServerContext(),
474 newInstallSnapshotTimer());
476 snapshotTimer.cancel();
481 public void onServerOperationTimeout(ServerOperationTimeout timeout) {
482 handleInstallSnapshotTimeout(timeout);
484 LOG.warn("{}: Timeout occured for new server {} while waiting for prior snapshot to complete",
485 raftContext.getId(), timeout.getServerId());
490 * Stores context information for a server operation.
492 * @param <T> the operation type
494 private static abstract class ServerOperationContext<T> {
495 private final T operation;
496 private final ActorRef clientRequestor;
497 private final String contextId;
499 ServerOperationContext(T operation, ActorRef clientRequestor){
500 this.operation = operation;
501 this.clientRequestor = clientRequestor;
502 contextId = UUID.randomUUID().toString();
505 String getContextId() {
513 ActorRef getClientRequestor() {
514 return clientRequestor;
517 abstract Object newReply(ServerChangeStatus status, String leaderId);
519 abstract InitialOperationState newInitialOperationState(RaftActorServerConfigurationSupport support);
521 abstract void operationComplete(RaftActor raftActor, ServerChangeStatus serverChangeStatus);
523 abstract String getServerId();
527 * Stores context information for an AddServer operation.
529 private static class AddServerContext extends ServerOperationContext<AddServer> {
530 AddServerContext(AddServer addServer, ActorRef clientRequestor) {
531 super(addServer, clientRequestor);
535 Object newReply(ServerChangeStatus status, String leaderId) {
536 return new AddServerReply(status, leaderId);
540 InitialOperationState newInitialOperationState(RaftActorServerConfigurationSupport support) {
541 return support.new InitialAddServerState(this);
545 void operationComplete(RaftActor raftActor, ServerChangeStatus serverChangeStatus) {
550 String getServerId() {
551 return getOperation().getNewServerId();
555 private abstract class RemoveServerState extends AbstractOperationState {
556 private final RemoveServerContext removeServerContext;
559 protected RemoveServerState(RemoveServerContext removeServerContext) {
560 this.removeServerContext = Preconditions.checkNotNull(removeServerContext);
564 public RemoveServerContext getRemoveServerContext() {
565 return removeServerContext;
569 private class InitialRemoveServerState extends RemoveServerState implements InitialOperationState{
571 protected InitialRemoveServerState(RemoveServerContext removeServerContext) {
572 super(removeServerContext);
576 public void initiate() {
577 String serverId = getRemoveServerContext().getOperation().getServerId();
578 raftContext.removePeer(serverId);
579 ((AbstractLeader)raftActor.getCurrentBehavior()).removeFollower(serverId);
581 persistNewServerConfiguration(getRemoveServerContext());
585 private static class RemoveServerContext extends ServerOperationContext<RemoveServer> {
586 private final String peerAddress;
588 RemoveServerContext(RemoveServer operation, String peerAddress, ActorRef clientRequestor) {
589 super(operation, clientRequestor);
590 this.peerAddress = peerAddress;
594 Object newReply(ServerChangeStatus status, String leaderId) {
595 return new RemoveServerReply(status, leaderId);
599 InitialOperationState newInitialOperationState(RaftActorServerConfigurationSupport support) {
600 return support.new InitialRemoveServerState(this);
604 void operationComplete(RaftActor raftActor, ServerChangeStatus serverChangeStatus) {
605 if(peerAddress != null) {
606 raftActor.context().actorSelection(peerAddress).tell(new ServerRemoved(getOperation().getServerId()), raftActor.getSelf());
611 String getServerId() {
612 return getOperation().getServerId();
616 static class ServerOperationTimeout {
617 private final String serverId;
619 ServerOperationTimeout(String serverId){
620 this.serverId = Preconditions.checkNotNull(serverId, "serverId should not be null");
623 String getServerId() {