X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-akka-raft%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fraft%2FRaftActorServerConfigurationSupport.java;h=7012e0db86f8847b3660f78de8cefb3a1f8c7a6a;hp=e78f39cdb1be0db11a39b1a91255effa5c3dc0aa;hb=24ace09aacc620fd9768e0a7004e802f9385bcfc;hpb=f2b5692224570e7ecccb139594ed55237efeec03 diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActorServerConfigurationSupport.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActorServerConfigurationSupport.java index e78f39cdb1..7012e0db86 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActorServerConfigurationSupport.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActorServerConfigurationSupport.java @@ -13,14 +13,16 @@ import akka.actor.Cancellable; import com.google.common.base.Preconditions; import java.util.ArrayDeque; import java.util.ArrayList; +import java.util.Collection; +import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Queue; import java.util.UUID; import javax.annotation.Nullable; -import org.opendaylight.controller.cluster.raft.RaftActorLeadershipTransferCohort.OnComplete; import org.opendaylight.controller.cluster.raft.ServerConfigurationPayload.ServerInfo; import org.opendaylight.controller.cluster.raft.base.messages.ApplyState; +import org.opendaylight.controller.cluster.raft.base.messages.ElectionTimeout; import org.opendaylight.controller.cluster.raft.base.messages.SnapshotComplete; import org.opendaylight.controller.cluster.raft.behaviors.AbstractLeader; import org.opendaylight.controller.cluster.raft.messages.AddServer; @@ -35,6 +37,7 @@ import org.opendaylight.controller.cluster.raft.messages.UnInitializedFollowerSn import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import scala.concurrent.duration.FiniteDuration; /** * Handles server configuration related messages for a RaftActor. @@ -85,16 +88,39 @@ class RaftActorServerConfigurationSupport { } } + void onNewLeader(String leaderId) { + currentOperationState.onNewLeader(leaderId); + } + private void onChangeServersVotingStatus(ChangeServersVotingStatus message, ActorRef sender) { LOG.debug("{}: onChangeServersVotingStatus: {}, state: {}", raftContext.getId(), message, currentOperationState); - onNewOperation(new ChangeServersVotingStatusContext(message, sender)); + // The following check is a special case. Normally we fail an operation if there's no leader. + // Consider a scenario where one has 2 geographically-separated 3-node clusters, one a primary and + // the other a backup such that if the primary cluster is lost, the backup can take over. In this + // scenario, we have a logical 6-node cluster where the primary sub-cluster is configured as voting + // and the backup sub-cluster as non-voting such that the primary cluster can make progress without + // consensus from the backup cluster while still replicating to the backup. On fail-over to the backup, + // a request would be sent to a member of the backup cluster to flip the voting states, ie make the + // backup sub-cluster voting and the lost primary non-voting. However since the primary majority + // cluster is lost, there would be no leader to apply, persist and replicate the server config change. + // Therefore, if the local server is currently non-voting and is to be changed to voting and there is + // no current leader, we will try to elect a leader using the new server config in order to replicate + // the change and progress. + boolean localServerChangingToVoting = Boolean.TRUE.equals(message. + getServerVotingStatusMap().get(raftActor.getRaftActorContext().getId())); + boolean hasNoLeader = raftActor.getLeaderId() == null; + if(localServerChangingToVoting && !raftContext.isVotingMember() && hasNoLeader) { + currentOperationState.onNewOperation(new ChangeServersVotingStatusContext(message, sender, true)); + } else { + onNewOperation(new ChangeServersVotingStatusContext(message, sender, false)); + } } private void onRemoveServer(RemoveServer removeServer, ActorRef sender) { LOG.debug("{}: onRemoveServer: {}, state: {}", raftContext.getId(), removeServer, currentOperationState); - boolean isSelf = removeServer.getServerId().equals(raftActor.getId()); + boolean isSelf = removeServer.getServerId().equals(raftContext.getId()); if(isSelf && !raftContext.hasFollowers()) { sender.tell(new RemoveServerReply(ServerChangeStatus.NOT_SUPPORTED, raftActor.getLeaderId()), raftActor.getSelf()); @@ -152,7 +178,7 @@ class RaftActorServerConfigurationSupport { ActorSelection leader = raftActor.getLeader(); if (leader != null) { LOG.debug("{}: Not leader - forwarding to leader {}", raftContext.getId(), leader); - leader.forward(operationContext.getOperation(), raftActor.getContext()); + leader.tell(operationContext.getOperation(), operationContext.getClientRequestor()); } else { LOG.debug("{}: No leader - returning NO_LEADER reply", raftContext.getId()); operationContext.getClientRequestor().tell(operationContext.newReply( @@ -197,6 +223,9 @@ class RaftActorServerConfigurationSupport { } + void onNewLeader(String newLeader) { + } + protected void persistNewServerConfiguration(ServerOperationContext operationContext){ raftContext.setDynamicServerConfigurationInUse(); @@ -219,6 +248,10 @@ class RaftActorServerConfigurationSupport { operationContext.operationComplete(raftActor, replyStatus == null || replyStatus == ServerChangeStatus.OK); + changeToIdleState(); + } + + protected void changeToIdleState() { currentOperationState = IDLE; ServerOperationContext nextOperation = pendingOperationsQueue.poll(); @@ -235,8 +268,12 @@ class RaftActorServerConfigurationSupport { } Cancellable newTimer(Object message) { + return newTimer(raftContext.getConfigParams().getElectionTimeOutInterval().$times(2), message); + } + + Cancellable newTimer(FiniteDuration timeout, Object message) { return raftContext.getActorSystem().scheduler().scheduleOnce( - raftContext.getConfigParams().getElectionTimeOutInterval().$times(2), raftContext.getActor(), message, + timeout, raftContext.getActor(), message, raftContext.getActorSystem().dispatcher(), raftContext.getActor()); } @@ -279,7 +316,7 @@ class RaftActorServerConfigurationSupport { // Sanity check - we could get an ApplyState from a previous operation that timed out so make // sure it's meant for us. if(operationContext.getContextId().equals(applyState.getIdentifier())) { - LOG.info("{}: {} has been successfully replicated to a majority of followers", raftActor.getId(), + LOG.info("{}: {} has been successfully replicated to a majority of followers", raftContext.getId(), applyState.getReplicatedLogEntry().getData()); timer.cancel(); @@ -617,13 +654,17 @@ class RaftActorServerConfigurationSupport { } private static class ChangeServersVotingStatusContext extends ServerOperationContext { - ChangeServersVotingStatusContext(ChangeServersVotingStatus convertMessage, ActorRef clientRequestor) { + private final boolean tryToElectLeader; + + ChangeServersVotingStatusContext(ChangeServersVotingStatus convertMessage, ActorRef clientRequestor, + boolean tryToElectLeader) { super(convertMessage, clientRequestor); + this.tryToElectLeader = tryToElectLeader; } @Override InitialOperationState newInitialOperationState(RaftActorServerConfigurationSupport support) { - return support.new ChangeServersVotingStatusState(this); + return support.new ChangeServersVotingStatusState(this, tryToElectLeader); } @Override @@ -638,7 +679,7 @@ class RaftActorServerConfigurationSupport { boolean localServerChangedToNonVoting = Boolean.FALSE.equals(getOperation(). getServerVotingStatusMap().get(raftActor.getRaftActorContext().getId())); if(succeeded && localServerChangedToNonVoting && raftActor.isLeader()) { - raftActor.initiateLeadershipTransfer(new OnComplete() { + raftActor.initiateLeadershipTransfer(new RaftActorLeadershipTransferCohort.OnComplete() { @Override public void onSuccess(ActorRef raftActorRef, ActorRef replyTo) { LOG.debug("{}: leader transfer succeeded after change to non-voting", raftActor.persistenceId()); @@ -664,21 +705,55 @@ class RaftActorServerConfigurationSupport { @Override String getLoggingContext() { - return getOperation().getServerVotingStatusMap().toString(); + return getOperation().toString(); } } private class ChangeServersVotingStatusState extends OperationState implements InitialOperationState { private final ChangeServersVotingStatusContext changeVotingStatusContext; + private final boolean tryToElectLeader; - ChangeServersVotingStatusState(ChangeServersVotingStatusContext changeVotingStatusContext) { + ChangeServersVotingStatusState(ChangeServersVotingStatusContext changeVotingStatusContext, + boolean tryToElectLeader) { this.changeVotingStatusContext = changeVotingStatusContext; + this.tryToElectLeader = tryToElectLeader; } @Override public void initiate() { LOG.debug("Initiating ChangeServersVotingStatusState"); + if(tryToElectLeader) { + initiateLocalLeaderElection(); + } else { + updateLocalPeerInfo(); + + persistNewServerConfiguration(changeVotingStatusContext); + } + } + + private void initiateLocalLeaderElection() { + LOG.debug("{}: Sending local ElectionTimeout to start leader election", raftContext.getId()); + + ServerConfigurationPayload previousServerConfig = raftContext.getPeerServerInfo(true); + updateLocalPeerInfo(); + + raftContext.getActor().tell(ElectionTimeout.INSTANCE, raftContext.getActor()); + + currentOperationState = new WaitingForLeaderElected(changeVotingStatusContext, previousServerConfig); + } + + private void updateLocalPeerInfo() { + List newServerInfoList = newServerInfoList(); + + raftContext.updatePeerIds(new ServerConfigurationPayload(newServerInfoList)); + if(raftActor.getCurrentBehavior() instanceof AbstractLeader) { + AbstractLeader leader = (AbstractLeader) raftActor.getCurrentBehavior(); + leader.updateMinReplicaCount(); + } + } + + private List newServerInfoList() { Map serverVotingStatusMap = changeVotingStatusContext.getOperation().getServerVotingStatusMap(); List newServerInfoList = new ArrayList<>(); for(String peerId: raftContext.getPeerIds()) { @@ -689,11 +764,90 @@ class RaftActorServerConfigurationSupport { newServerInfoList.add(new ServerInfo(raftContext.getId(), serverVotingStatusMap.containsKey( raftContext.getId()) ? serverVotingStatusMap.get(raftContext.getId()) : raftContext.isVotingMember())); - raftContext.updatePeerIds(new ServerConfigurationPayload(newServerInfoList)); - AbstractLeader leader = (AbstractLeader) raftActor.getCurrentBehavior(); - leader.updateMinReplicaCount(); + return newServerInfoList; + } + } - persistNewServerConfiguration(changeVotingStatusContext); + private class WaitingForLeaderElected extends OperationState { + private final ServerConfigurationPayload previousServerConfig; + private final ChangeServersVotingStatusContext operationContext; + private final Cancellable timer; + + WaitingForLeaderElected(ChangeServersVotingStatusContext operationContext, + ServerConfigurationPayload previousServerConfig) { + this.operationContext = operationContext; + this.previousServerConfig = previousServerConfig; + + timer = newTimer(raftContext.getConfigParams().getElectionTimeOutInterval(), + new ServerOperationTimeout(operationContext.getLoggingContext())); + } + + @Override + void onNewLeader(String newLeader) { + LOG.debug("{}: New leader {} elected", raftContext.getId(), newLeader); + + timer.cancel(); + + if(raftActor.isLeader()) { + persistNewServerConfiguration(operationContext); + } else { + // Edge case - some other node became leader so forward the operation. + LOG.debug("{}: Forwarding {} to new leader", raftContext.getId(), operationContext.getOperation()); + + // Revert the local server config change. + raftContext.updatePeerIds(previousServerConfig); + + changeToIdleState(); + RaftActorServerConfigurationSupport.this.onNewOperation(operationContext); + } + } + + @Override + void onServerOperationTimeout(ServerOperationTimeout timeout) { + LOG.warn("{}: Leader election timed out - cannot apply operation {}", + raftContext.getId(), timeout.getLoggingContext()); + + // Revert the local server config change. + raftContext.updatePeerIds(previousServerConfig); + raftActor.initializeBehavior(); + + tryToForwardOperationToAnotherServer(); + } + + private void tryToForwardOperationToAnotherServer() { + Collection serversVisited = new HashSet<>(operationContext.getOperation().getServersVisited()); + + LOG.debug("{}: tryToForwardOperationToAnotherServer - servers already visited {}", raftContext.getId(), + serversVisited); + + serversVisited.add(raftContext.getId()); + + // Try to find another whose state is being changed from non-voting to voting and that we haven't + // tried yet. + Map serverVotingStatusMap = operationContext.getOperation().getServerVotingStatusMap(); + ActorSelection forwardToPeerActor = null; + for(Map.Entry e: serverVotingStatusMap.entrySet()) { + Boolean isVoting = e.getValue(); + String serverId = e.getKey(); + PeerInfo peerInfo = raftContext.getPeerInfo(serverId); + if(isVoting && peerInfo != null && !peerInfo.isVoting() && !serversVisited.contains(serverId)) { + ActorSelection actor = raftContext.getPeerActorSelection(serverId); + if(actor != null) { + forwardToPeerActor = actor; + break; + } + } + } + + if(forwardToPeerActor != null) { + LOG.debug("{}: Found server {} to forward to", raftContext.getId(), forwardToPeerActor); + + forwardToPeerActor.tell(new ChangeServersVotingStatus(serverVotingStatusMap, serversVisited), + operationContext.getClientRequestor()); + changeToIdleState(); + } else { + operationComplete(operationContext, ServerChangeStatus.NO_LEADER); + } } }