X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-akka-raft%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fraft%2FRaftActorServerConfigurationSupport.java;h=782ecc06a87ac11b19c6ef6b8335e6a9aa820b1a;hp=46fe6269e37a907db36fe3d0ca7a5b9b9b52bd58;hb=b0f8283587b5cc8573d29f66219cbe7f70e21e1b;hpb=d639c62a1e743ec846b15bf755b830b00f4cc7eb diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActorServerConfigurationSupport.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActorServerConfigurationSupport.java index 46fe6269e3..782ecc06a8 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActorServerConfigurationSupport.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActorServerConfigurationSupport.java @@ -20,10 +20,9 @@ import java.util.Map; import java.util.Queue; import java.util.UUID; import javax.annotation.Nullable; -import org.opendaylight.controller.cluster.raft.ServerConfigurationPayload.ServerInfo; import org.opendaylight.controller.cluster.raft.base.messages.ApplyState; -import org.opendaylight.controller.cluster.raft.base.messages.ElectionTimeout; import org.opendaylight.controller.cluster.raft.base.messages.SnapshotComplete; +import org.opendaylight.controller.cluster.raft.base.messages.TimeoutNow; import org.opendaylight.controller.cluster.raft.behaviors.AbstractLeader; import org.opendaylight.controller.cluster.raft.messages.AddServer; import org.opendaylight.controller.cluster.raft.messages.AddServerReply; @@ -34,7 +33,11 @@ import org.opendaylight.controller.cluster.raft.messages.ServerChangeReply; import org.opendaylight.controller.cluster.raft.messages.ServerChangeStatus; import org.opendaylight.controller.cluster.raft.messages.ServerRemoved; import org.opendaylight.controller.cluster.raft.messages.UnInitializedFollowerSnapshotReply; +import org.opendaylight.controller.cluster.raft.persisted.ServerConfigurationPayload; +import org.opendaylight.controller.cluster.raft.persisted.ServerInfo; import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload; +import org.opendaylight.yangtools.concepts.Identifier; +import org.opendaylight.yangtools.util.AbstractUUIDIdentifier; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import scala.concurrent.duration.FiniteDuration; @@ -47,6 +50,7 @@ import scala.concurrent.duration.FiniteDuration; class RaftActorServerConfigurationSupport { private static final Logger LOG = LoggerFactory.getLogger(RaftActorServerConfigurationSupport.class); + @SuppressWarnings("checkstyle:MemberName") private final OperationState IDLE = new Idle(); private final RaftActor raftActor; @@ -63,13 +67,13 @@ class RaftActorServerConfigurationSupport { } boolean handleMessage(Object message, ActorRef sender) { - if(message instanceof AddServer) { + if (message instanceof AddServer) { onAddServer((AddServer) message, sender); return true; - } else if(message instanceof RemoveServer) { + } else if (message instanceof RemoveServer) { onRemoveServer((RemoveServer) message, sender); return true; - } else if(message instanceof ChangeServersVotingStatus) { + } else if (message instanceof ChangeServersVotingStatus) { onChangeServersVotingStatus((ChangeServersVotingStatus) message, sender); return true; } else if (message instanceof ServerOperationTimeout) { @@ -78,9 +82,9 @@ class RaftActorServerConfigurationSupport { } else if (message instanceof UnInitializedFollowerSnapshotReply) { currentOperationState.onUnInitializedFollowerSnapshotReply((UnInitializedFollowerSnapshotReply) message); return true; - } else if(message instanceof ApplyState) { + } else if (message instanceof ApplyState) { return onApplyState((ApplyState) message); - } else if(message instanceof SnapshotComplete) { + } else if (message instanceof SnapshotComplete) { currentOperationState.onSnapshotComplete(); return false; } else { @@ -108,10 +112,10 @@ class RaftActorServerConfigurationSupport { // Therefore, if the local server is currently non-voting and is to be changed to voting and there is // no current leader, we will try to elect a leader using the new server config in order to replicate // the change and progress. - boolean localServerChangingToVoting = Boolean.TRUE.equals(message. - getServerVotingStatusMap().get(raftActor.getRaftActorContext().getId())); + boolean localServerChangingToVoting = Boolean.TRUE.equals(message + .getServerVotingStatusMap().get(raftActor.getRaftActorContext().getId())); boolean hasNoLeader = raftActor.getLeaderId() == null; - if(localServerChangingToVoting && !raftContext.isVotingMember() && hasNoLeader) { + if (localServerChangingToVoting && !raftContext.isVotingMember() && hasNoLeader) { currentOperationState.onNewOperation(new ChangeServersVotingStatusContext(message, sender, true)); } else { onNewOperation(new ChangeServersVotingStatusContext(message, sender, false)); @@ -121,10 +125,10 @@ class RaftActorServerConfigurationSupport { private void onRemoveServer(RemoveServer removeServer, ActorRef sender) { LOG.debug("{}: onRemoveServer: {}, state: {}", raftContext.getId(), removeServer, currentOperationState); boolean isSelf = removeServer.getServerId().equals(raftContext.getId()); - if(isSelf && !raftContext.hasFollowers()) { + if (isSelf && !raftContext.hasFollowers()) { sender.tell(new RemoveServerReply(ServerChangeStatus.NOT_SUPPORTED, raftActor.getLeaderId()), raftActor.getSelf()); - } else if(!isSelf && !raftContext.getPeerIds().contains(removeServer.getServerId())) { + } else if (!isSelf && !raftContext.getPeerIds().contains(removeServer.getServerId())) { sender.tell(new RemoveServerReply(ServerChangeStatus.DOES_NOT_EXIST, raftActor.getLeaderId()), raftActor.getSelf()); } else { @@ -136,7 +140,7 @@ class RaftActorServerConfigurationSupport { private boolean onApplyState(ApplyState applyState) { Payload data = applyState.getReplicatedLogEntry().getData(); - if(data instanceof ServerConfigurationPayload) { + if (data instanceof ServerConfigurationPayload) { currentOperationState.onApplyState(applyState); return true; } @@ -226,7 +230,7 @@ class RaftActorServerConfigurationSupport { void onNewLeader(String newLeader) { } - protected void persistNewServerConfiguration(ServerOperationContext operationContext){ + protected void persistNewServerConfiguration(ServerOperationContext operationContext) { raftContext.setDynamicServerConfigurationInUse(); ServerConfigurationPayload payload = raftContext.getPeerServerInfo( @@ -241,8 +245,9 @@ class RaftActorServerConfigurationSupport { sendReply(operationContext, ServerChangeStatus.OK); } - protected void operationComplete(ServerOperationContext operationContext, @Nullable ServerChangeStatus replyStatus) { - if(replyStatus != null) { + protected void operationComplete(ServerOperationContext operationContext, + @Nullable ServerChangeStatus replyStatus) { + if (replyStatus != null) { sendReply(operationContext, replyStatus); } @@ -255,13 +260,14 @@ class RaftActorServerConfigurationSupport { currentOperationState = IDLE; ServerOperationContext nextOperation = pendingOperationsQueue.poll(); - if(nextOperation != null) { + if (nextOperation != null) { RaftActorServerConfigurationSupport.this.onNewOperation(nextOperation); } } protected void sendReply(ServerOperationContext operationContext, ServerChangeStatus status) { - LOG.debug("{}: Returning {} for operation {}", raftContext.getId(), status, operationContext.getOperation()); + LOG.debug("{}: Returning {} for operation {}", raftContext.getId(), status, + operationContext.getOperation()); operationContext.getClientRequestor().tell(operationContext.newReply(status, raftActor.getLeaderId()), raftActor.self()); @@ -315,7 +321,7 @@ class RaftActorServerConfigurationSupport { public void onApplyState(ApplyState applyState) { // Sanity check - we could get an ApplyState from a previous operation that timed out so make // sure it's meant for us. - if(operationContext.getContextId().equals(applyState.getIdentifier())) { + if (operationContext.getContextId().equals(applyState.getIdentifier())) { LOG.info("{}: {} has been successfully replicated to a majority of followers", raftContext.getId(), applyState.getReplicatedLogEntry().getData()); @@ -333,18 +339,18 @@ class RaftActorServerConfigurationSupport { // Fail any pending operations ServerOperationContext nextOperation = pendingOperationsQueue.poll(); - while(nextOperation != null) { + while (nextOperation != null) { sendReply(nextOperation, ServerChangeStatus.PRIOR_REQUEST_CONSENSUS_TIMEOUT); nextOperation = pendingOperationsQueue.poll(); } } @Override - public void onNewOperation(ServerOperationContext operationContext) { - if(timedOut) { - sendReply(operationContext, ServerChangeStatus.PRIOR_REQUEST_CONSENSUS_TIMEOUT); + public void onNewOperation(ServerOperationContext newOperationContext) { + if (timedOut) { + sendReply(newOperationContext, ServerChangeStatus.PRIOR_REQUEST_CONSENSUS_TIMEOUT); } else { - super.onNewOperation(operationContext); + super.onNewOperation(newOperationContext); } } } @@ -376,12 +382,13 @@ class RaftActorServerConfigurationSupport { raftContext.removePeer(serverId); boolean isLeader = raftActor.isLeader(); - if(isLeader) { + if (isLeader) { AbstractLeader leader = (AbstractLeader) raftActor.getCurrentBehavior(); leader.removeFollower(serverId); } - operationComplete(getAddServerContext(), isLeader ? ServerChangeStatus.TIMEOUT : ServerChangeStatus.NO_LEADER); + operationComplete(getAddServerContext(), isLeader ? ServerChangeStatus.TIMEOUT + : ServerChangeStatus.NO_LEADER); } } @@ -397,12 +404,12 @@ class RaftActorServerConfigurationSupport { @Override public void initiate() { - AbstractLeader leader = (AbstractLeader) raftActor.getCurrentBehavior(); + final AbstractLeader leader = (AbstractLeader) raftActor.getCurrentBehavior(); AddServer addServer = getAddServerContext().getOperation(); LOG.debug("{}: Initiating {}", raftContext.getId(), addServer); - if(raftContext.getPeerInfo(addServer.getNewServerId()) != null) { + if (raftContext.getPeerInfo(addServer.getNewServerId()) != null) { operationComplete(getAddServerContext(), ServerChangeStatus.ALREADY_EXISTS); return; } @@ -413,10 +420,10 @@ class RaftActorServerConfigurationSupport { leader.addFollower(addServer.getNewServerId()); - if(votingState == VotingState.VOTING_NOT_INITIALIZED){ + if (votingState == VotingState.VOTING_NOT_INITIALIZED) { // schedule the install snapshot timeout timer Cancellable installSnapshotTimer = newInstallSnapshotTimer(); - if(leader.initiateCaptureSnapshot(addServer.getNewServerId())) { + if (leader.initiateCaptureSnapshot(addServer.getNewServerId())) { LOG.debug("{}: Initiating capture snapshot for new server {}", raftContext.getId(), addServer.getNewServerId()); @@ -464,7 +471,7 @@ class RaftActorServerConfigurationSupport { // Sanity check to guard against receiving an UnInitializedFollowerSnapshotReply from a prior // add server operation that timed out. - if(getAddServerContext().getOperation().getNewServerId().equals(followerId) && raftActor.isLeader()) { + if (getAddServerContext().getOperation().getNewServerId().equals(followerId) && raftActor.isLeader()) { AbstractLeader leader = (AbstractLeader) raftActor.getCurrentBehavior(); raftContext.getPeerInfo(followerId).setVotingState(VotingState.VOTING); leader.updateMinReplicaCount(); @@ -496,13 +503,13 @@ class RaftActorServerConfigurationSupport { public void onSnapshotComplete() { LOG.debug("{}: onSnapshotComplete", raftContext.getId()); - if(!raftActor.isLeader()) { + if (!raftActor.isLeader()) { LOG.debug("{}: No longer the leader", raftContext.getId()); return; } AbstractLeader leader = (AbstractLeader) raftActor.getCurrentBehavior(); - if(leader.initiateCaptureSnapshot(getAddServerContext().getOperation().getNewServerId())) { + if (leader.initiateCaptureSnapshot(getAddServerContext().getOperation().getNewServerId())) { LOG.debug("{}: Initiating capture snapshot for new server {}", raftContext.getId(), getAddServerContext().getOperation().getNewServerId()); @@ -522,23 +529,32 @@ class RaftActorServerConfigurationSupport { } } + private static final class ServerOperationContextIdentifier + extends AbstractUUIDIdentifier { + private static final long serialVersionUID = 1L; + + ServerOperationContextIdentifier() { + super(UUID.randomUUID()); + } + } + /** * Stores context information for a server operation. * * @param the operation type */ - private static abstract class ServerOperationContext { + private abstract static class ServerOperationContext { private final T operation; private final ActorRef clientRequestor; - private final String contextId; + private final Identifier contextId; - ServerOperationContext(T operation, ActorRef clientRequestor){ + ServerOperationContext(T operation, ActorRef clientRequestor) { this.operation = operation; this.clientRequestor = clientRequestor; - contextId = UUID.randomUUID().toString(); + contextId = new ServerOperationContextIdentifier(); } - String getContextId() { + Identifier getContextId() { return contextId; } @@ -601,7 +617,7 @@ class RaftActorServerConfigurationSupport { } } - private final class InitialRemoveServerState extends RemoveServerState implements InitialOperationState{ + private final class InitialRemoveServerState extends RemoveServerState implements InitialOperationState { protected InitialRemoveServerState(RemoveServerContext removeServerContext) { super(removeServerContext); @@ -637,8 +653,9 @@ class RaftActorServerConfigurationSupport { @Override void operationComplete(RaftActor raftActor, boolean succeeded) { - if(peerAddress != null) { - raftActor.context().actorSelection(peerAddress).tell(new ServerRemoved(getOperation().getServerId()), raftActor.getSelf()); + if (peerAddress != null) { + raftActor.context().actorSelection(peerAddress).tell( + new ServerRemoved(getOperation().getServerId()), raftActor.getSelf()); } } @@ -676,10 +693,11 @@ class RaftActorServerConfigurationSupport { void operationComplete(final RaftActor raftActor, boolean succeeded) { // If this leader changed to non-voting we need to step down as leader so we'll try to transfer // leadership. - boolean localServerChangedToNonVoting = Boolean.FALSE.equals(getOperation(). - getServerVotingStatusMap().get(raftActor.getRaftActorContext().getId())); + boolean localServerChangedToNonVoting = Boolean.FALSE.equals(getOperation() + .getServerVotingStatusMap().get(raftActor.getRaftActorContext().getId())); if (succeeded && localServerChangedToNonVoting) { - raftActor.becomeNonVoting(); + LOG.debug("Leader changed to non-voting - trying leadership transfer"); + raftActor.becomeNonVoting(); } } @@ -703,11 +721,9 @@ class RaftActorServerConfigurationSupport { public void initiate() { LOG.debug("Initiating ChangeServersVotingStatusState"); - if(tryToElectLeader) { + if (tryToElectLeader) { initiateLocalLeaderElection(); - } else { - updateLocalPeerInfo(); - + } else if (updateLocalPeerInfo()) { persistNewServerConfiguration(changeVotingStatusContext); } } @@ -716,33 +732,53 @@ class RaftActorServerConfigurationSupport { LOG.debug("{}: Sending local ElectionTimeout to start leader election", raftContext.getId()); ServerConfigurationPayload previousServerConfig = raftContext.getPeerServerInfo(true); - updateLocalPeerInfo(); + if (!updateLocalPeerInfo()) { + return; + } - raftContext.getActor().tell(ElectionTimeout.INSTANCE, raftContext.getActor()); + raftContext.getActor().tell(TimeoutNow.INSTANCE, raftContext.getActor()); currentOperationState = new WaitingForLeaderElected(changeVotingStatusContext, previousServerConfig); } - private void updateLocalPeerInfo() { + private boolean updateLocalPeerInfo() { List newServerInfoList = newServerInfoList(); + // Check if new voting state would leave us with no voting members. + boolean atLeastOneVoting = false; + for (ServerInfo info: newServerInfoList) { + if (info.isVoting()) { + atLeastOneVoting = true; + break; + } + } + + if (!atLeastOneVoting) { + operationComplete(changeVotingStatusContext, ServerChangeStatus.INVALID_REQUEST); + return false; + } + raftContext.updatePeerIds(new ServerConfigurationPayload(newServerInfoList)); - if(raftActor.getCurrentBehavior() instanceof AbstractLeader) { + if (raftActor.getCurrentBehavior() instanceof AbstractLeader) { AbstractLeader leader = (AbstractLeader) raftActor.getCurrentBehavior(); leader.updateMinReplicaCount(); } + + return true; } private List newServerInfoList() { - Map serverVotingStatusMap = changeVotingStatusContext.getOperation().getServerVotingStatusMap(); + Map serverVotingStatusMap = changeVotingStatusContext.getOperation() + .getServerVotingStatusMap(); List newServerInfoList = new ArrayList<>(); - for(String peerId: raftContext.getPeerIds()) { - newServerInfoList.add(new ServerInfo(peerId, serverVotingStatusMap.containsKey(peerId) ? - serverVotingStatusMap.get(peerId) : raftContext.getPeerInfo(peerId).isVoting())); + for (String peerId: raftContext.getPeerIds()) { + newServerInfoList.add(new ServerInfo(peerId, serverVotingStatusMap.containsKey(peerId) + ? serverVotingStatusMap.get(peerId) : raftContext.getPeerInfo(peerId).isVoting())); } newServerInfoList.add(new ServerInfo(raftContext.getId(), serverVotingStatusMap.containsKey( - raftContext.getId()) ? serverVotingStatusMap.get(raftContext.getId()) : raftContext.isVotingMember())); + raftContext.getId()) ? serverVotingStatusMap.get(raftContext.getId()) + : raftContext.isVotingMember())); return newServerInfoList; } @@ -764,11 +800,15 @@ class RaftActorServerConfigurationSupport { @Override void onNewLeader(String newLeader) { + if (newLeader == null) { + return; + } + LOG.debug("{}: New leader {} elected", raftContext.getId(), newLeader); timer.cancel(); - if(raftActor.isLeader()) { + if (raftActor.isLeader()) { persistNewServerConfiguration(operationContext); } else { // Edge case - some other node became leader so forward the operation. @@ -806,20 +846,20 @@ class RaftActorServerConfigurationSupport { // tried yet. Map serverVotingStatusMap = operationContext.getOperation().getServerVotingStatusMap(); ActorSelection forwardToPeerActor = null; - for(Map.Entry e: serverVotingStatusMap.entrySet()) { + for (Map.Entry e: serverVotingStatusMap.entrySet()) { Boolean isVoting = e.getValue(); String serverId = e.getKey(); PeerInfo peerInfo = raftContext.getPeerInfo(serverId); - if(isVoting && peerInfo != null && !peerInfo.isVoting() && !serversVisited.contains(serverId)) { + if (isVoting && peerInfo != null && !peerInfo.isVoting() && !serversVisited.contains(serverId)) { ActorSelection actor = raftContext.getPeerActorSelection(serverId); - if(actor != null) { + if (actor != null) { forwardToPeerActor = actor; break; } } } - if(forwardToPeerActor != null) { + if (forwardToPeerActor != null) { LOG.debug("{}: Found server {} to forward to", raftContext.getId(), forwardToPeerActor); forwardToPeerActor.tell(new ChangeServersVotingStatus(serverVotingStatusMap, serversVisited), @@ -834,8 +874,8 @@ class RaftActorServerConfigurationSupport { static class ServerOperationTimeout { private final String loggingContext; - ServerOperationTimeout(String loggingContext){ - this.loggingContext = Preconditions.checkNotNull(loggingContext, "loggingContext should not be null"); + ServerOperationTimeout(String loggingContext) { + this.loggingContext = Preconditions.checkNotNull(loggingContext, "loggingContext should not be null"); } String getLoggingContext() {