From a3adcd6cd7659b30e5115efe86440f7a2123ec20 Mon Sep 17 00:00:00 2001 From: Tom Pantelis Date: Wed, 21 Oct 2015 18:47:31 -0400 Subject: [PATCH] Bug 2187: Code cleanup and refactoring I addressed remaining comments from a prior patch. I also refactored RaftActorServerConfigurationSupport to use an FSM similar to the SnapshotManager with some generic classes. This will make it easier to implement RemoveServer and reuse code. Change-Id: Id3cdcede3f9c393c878abd3e9a9d3a5e12c5fb8a Signed-off-by: Tom Pantelis --- .../RaftActorServerConfigurationSupport.java | 405 ++++++++++++------ .../raft/behaviors/AbstractLeader.java | 4 +- .../behaviors/AbstractRaftActorBehavior.java | 17 +- .../raft/messages/FollowerCatchUpTimeout.java | 2 +- .../UnInitializedFollowerSnapshotReply.java | 12 +- ...ftActorServerConfigurationSupportTest.java | 117 ++++- 6 files changed, 407 insertions(+), 150 deletions(-) diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActorServerConfigurationSupport.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActorServerConfigurationSupport.java index ae23140114..a00c0241d9 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActorServerConfigurationSupport.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActorServerConfigurationSupport.java @@ -10,6 +10,7 @@ package org.opendaylight.controller.cluster.raft; import akka.actor.ActorRef; import akka.actor.ActorSelection; import akka.actor.Cancellable; +import com.google.common.base.Preconditions; import java.util.ArrayList; import java.util.Collections; import java.util.LinkedList; @@ -37,28 +38,29 @@ import scala.concurrent.duration.FiniteDuration; */ class RaftActorServerConfigurationSupport { private static final Logger LOG = LoggerFactory.getLogger(RaftActorServerConfigurationSupport.class); - private final RaftActorContext context; - // client follower queue - private final Queue followerInfoQueue = new LinkedList(); - // timeout handle - private Cancellable followerTimeout = null; + + private final OperationState IDLE = new Idle(); + + private final RaftActorContext raftContext; + + private final Queue> pendingOperationsQueue = new LinkedList<>(); + + private OperationState currentOperationState = IDLE; RaftActorServerConfigurationSupport(RaftActorContext context) { - this.context = context; + this.raftContext = context; } boolean handleMessage(Object message, RaftActor raftActor, ActorRef sender) { if(message instanceof AddServer) { onAddServer((AddServer)message, raftActor, sender); return true; - } else if (message instanceof FollowerCatchUpTimeout){ - FollowerCatchUpTimeout followerTimeout = (FollowerCatchUpTimeout)message; - // abort follower catchup on timeout - onFollowerCatchupTimeout(raftActor, sender, followerTimeout.getNewServerId()); + } else if (message instanceof FollowerCatchUpTimeout) { + currentOperationState.onFollowerCatchupTimeout(raftActor, (FollowerCatchUpTimeout)message); return true; - } else if (message instanceof UnInitializedFollowerSnapshotReply){ - // snapshot installation is successful - onUnInitializedFollowerSnapshotReply((UnInitializedFollowerSnapshotReply)message, raftActor,sender); + } else if (message instanceof UnInitializedFollowerSnapshotReply) { + currentOperationState.onUnInitializedFollowerSnapshotReply(raftActor, + (UnInitializedFollowerSnapshotReply)message); return true; } else if(message instanceof ApplyState) { return onApplyState((ApplyState) message, raftActor); @@ -70,34 +72,13 @@ class RaftActorServerConfigurationSupport { private boolean onApplyState(ApplyState applyState, RaftActor raftActor) { Payload data = applyState.getReplicatedLogEntry().getData(); if(data instanceof ServerConfigurationPayload) { - CatchupFollowerInfo followerInfo = followerInfoQueue.peek(); - if(followerInfo != null && followerInfo.getContextId().equals(applyState.getIdentifier())) { - LOG.info("{} has been successfully replicated to a majority of followers", data); - - // respond ok to follower - respondToClient(raftActor, ServerChangeStatus.OK); - } - + currentOperationState.onApplyState(raftActor, applyState); return true; } return false; } - private void onAddServer(AddServer addServer, RaftActor raftActor, ActorRef sender) { - LOG.debug("{}: onAddServer: {}", context.getId(), addServer); - if(noLeaderOrForwardedToLeader(addServer, raftActor, sender)) { - return; - } - - CatchupFollowerInfo followerInfo = new CatchupFollowerInfo(addServer,sender); - boolean process = followerInfoQueue.isEmpty(); - followerInfoQueue.add(followerInfo); - if(process) { - processAddServer(raftActor); - } - } - /** * The algorithm for AddServer is as follows: *
    @@ -119,119 +100,276 @@ class RaftActorServerConfigurationSupport { *
  • Respond to caller with TIMEOUT.
  • *
*/ - private void processAddServer(RaftActor raftActor){ - LOG.debug("{}: In processAddServer", context.getId()); - - AbstractLeader leader = (AbstractLeader) raftActor.getCurrentBehavior(); - CatchupFollowerInfo followerInfo = followerInfoQueue.peek(); - AddServer addSrv = followerInfo.getAddServer(); - context.addToPeers(addSrv.getNewServerId(), addSrv.getNewServerAddress()); - - // if voting member - initialize to VOTING_NOT_INITIALIZED - FollowerState initialState = addSrv.isVotingMember() ? FollowerState.VOTING_NOT_INITIALIZED : - FollowerState.NON_VOTING; - leader.addFollower(addSrv.getNewServerId(), initialState); - - if(initialState == FollowerState.VOTING_NOT_INITIALIZED){ - LOG.debug("Leader sending initiate capture snapshot to follower : {}", addSrv.getNewServerId()); - leader.initiateCaptureSnapshot(addSrv.getNewServerId()); - // schedule the catchup timeout timer - followerTimeout = context.getActorSystem().scheduler() - .scheduleOnce(new FiniteDuration(((context.getConfigParams().getElectionTimeOutInterval().toMillis()) * 2), - TimeUnit.MILLISECONDS), - context.getActor(), new FollowerCatchUpTimeout(addSrv.getNewServerId()), - context.getActorSystem().dispatcher(), context.getActor()); + private void onAddServer(AddServer addServer, RaftActor raftActor, ActorRef sender) { + LOG.debug("{}: onAddServer: {}", raftContext.getId(), addServer); + + onNewOperation(raftActor, new AddServerContext(addServer, sender)); + } + + private void onNewOperation(RaftActor raftActor, ServerOperationContext operationContext) { + if (raftActor.isLeader()) { + currentOperationState.onNewOperation(raftActor, operationContext); } else { - LOG.debug("Directly persisting the new server configuration : {}", addSrv.getNewServerId()); - persistNewServerConfiguration(raftActor, followerInfo); + ActorSelection leader = raftActor.getLeader(); + if (leader != null) { + LOG.debug("{}: Not leader - forwarding to leader {}", raftContext.getId(), leader); + leader.forward(operationContext.getOperation(), raftActor.getContext()); + } else { + LOG.debug("{}: No leader - returning NO_LEADER reply", raftContext.getId()); + operationContext.getClientRequestor().tell(operationContext.newReply( + ServerChangeStatus.NO_LEADER, null), raftActor.self()); + } } } - private boolean noLeaderOrForwardedToLeader(Object message, RaftActor raftActor, ActorRef sender) { - if (raftActor.isLeader()) { - return false; + /** + * Interface for a server operation FSM state. + */ + private interface OperationState { + void onNewOperation(RaftActor raftActor, ServerOperationContext operationContext); + + void onFollowerCatchupTimeout(RaftActor raftActor, FollowerCatchUpTimeout followerTimeout); + + void onUnInitializedFollowerSnapshotReply(RaftActor raftActor, UnInitializedFollowerSnapshotReply reply); + + void onApplyState(RaftActor raftActor, ApplyState applyState); + } + + /** + * Interface for the initial state for a server operation. + */ + private interface InitialOperationState { + void initiate(RaftActor raftActor); + } + + /** + * Abstract base class for server operation FSM state. Handles common behavior for all states. + */ + private abstract class AbstractOperationState implements OperationState { + @Override + public void onNewOperation(RaftActor raftActor, ServerOperationContext operationContext) { + // We're currently processing another operation so queue it to be processed later. + + LOG.debug("{}: Server operation already in progress - queueing {}", raftContext.getId(), + operationContext.getOperation()); + + pendingOperationsQueue.add(operationContext); } - ActorSelection leader = raftActor.getLeader(); - if (leader != null) { - LOG.debug("Not leader - forwarding to leader {}", leader); - leader.forward(message, raftActor.getContext()); - } else { - LOG.debug("No leader - returning NO_LEADER AddServerReply"); - sender.tell(new AddServerReply(ServerChangeStatus.NO_LEADER, null), raftActor.self()); + @Override + public void onFollowerCatchupTimeout(RaftActor raftActor, FollowerCatchUpTimeout followerTimeout) { + LOG.debug("onFollowerCatchupTimeout should not be called in state {}", this); } - return true; + @Override + public void onUnInitializedFollowerSnapshotReply(RaftActor raftActor, UnInitializedFollowerSnapshotReply reply) { + LOG.debug("onUnInitializedFollowerSnapshotReply was called in state {}", this); + } + + @Override + public void onApplyState(RaftActor raftActor, ApplyState applyState) { + LOG.debug("onApplyState was called in state {}", this); + } + + protected void persistNewServerConfiguration(RaftActor raftActor, ServerOperationContext operationContext){ + List newConfig = new ArrayList(raftContext.getPeerAddresses().keySet()); + newConfig.add(raftContext.getId()); + + LOG.debug("{}: New server configuration : {}", raftContext.getId(), newConfig); + + ServerConfigurationPayload payload = new ServerConfigurationPayload(newConfig, Collections.emptyList()); + + raftActor.persistData(operationContext.getClientRequestor(), operationContext.getContextId(), payload); + + currentOperationState = new Persisting(operationContext); + } + + protected void operationComplete(RaftActor raftActor, ServerOperationContext operationContext, + ServerChangeStatus status) { + + LOG.debug("{}: Returning {} for operation {}", raftContext.getId(), status, operationContext.getOperation()); + + operationContext.getClientRequestor().tell(operationContext.newReply(status, raftActor.getLeaderId()), + raftActor.self()); + + currentOperationState = IDLE; + + ServerOperationContext nextOperation = pendingOperationsQueue.poll(); + if(nextOperation != null) { + RaftActorServerConfigurationSupport.this.onNewOperation(raftActor, nextOperation); + } + } + + @Override + public String toString() { + return getClass().getSimpleName(); + } } - private void onUnInitializedFollowerSnapshotReply(UnInitializedFollowerSnapshotReply reply, - RaftActor raftActor, ActorRef sender){ - CatchupFollowerInfo followerInfo = followerInfoQueue.peek(); - // Sanity check - it's possible we get a reply after it timed out. - if(followerInfo == null) { - return; + /** + * The state when no server operation is in progress. It immediately initiates new server operations. + */ + private class Idle extends AbstractOperationState { + @Override + public void onNewOperation(RaftActor raftActor, ServerOperationContext operationContext) { + operationContext.newInitialOperationState(RaftActorServerConfigurationSupport.this).initiate(raftActor); } - String followerId = reply.getFollowerId(); - AbstractLeader leader = (AbstractLeader) raftActor.getCurrentBehavior(); - FollowerLogInformation followerLogInformation = leader.getFollower(followerId); - stopFollowerTimer(); - followerLogInformation.setFollowerState(FollowerState.VOTING); - leader.updateMinReplicaCountAndMinIsolatedLeaderPeerCount(); + @Override + public void onApplyState(RaftActor raftActor, ApplyState applyState) { + // Noop - we override b/c ApplyState is called normally for followers in the idle state. + } + } + + /** + * The state when a new server configuration is being persisted and replicated. + */ + private class Persisting extends AbstractOperationState { + private final ServerOperationContext operationContext; - persistNewServerConfiguration(raftActor, followerInfo); + Persisting(ServerOperationContext operationContext) { + this.operationContext = operationContext; + } + + @Override + public void onApplyState(RaftActor raftActor, ApplyState applyState) { + // Sanity check - we could get an ApplyState from a previous operation that timed out so make + // sure it's meant for us. + if(operationContext.getContextId().equals(applyState.getIdentifier())) { + LOG.info("{}: {} has been successfully replicated to a majority of followers", + applyState.getReplicatedLogEntry().getData()); + + operationComplete(raftActor, operationContext, ServerChangeStatus.OK); + } + } } - private void persistNewServerConfiguration(RaftActor raftActor, CatchupFollowerInfo followerInfo){ - List cNew = new ArrayList(context.getPeerAddresses().keySet()); - cNew.add(context.getId()); + /** + * Abstract base class for an AddServer operation state. + */ + private abstract class AddServerState extends AbstractOperationState { + private final AddServerContext addServerContext; - LOG.debug("New server configuration : {}", cNew.toString()); + AddServerState(AddServerContext addServerContext) { + this.addServerContext = addServerContext; + } - ServerConfigurationPayload servPayload = new ServerConfigurationPayload(cNew, Collections.emptyList()); + AddServerContext getAddServerContext() { + return addServerContext; + } + } - raftActor.persistData(followerInfo.getClientRequestor(), followerInfo.getContextId(), servPayload); - } + /** + * The initial state for the AddServer operation. It adds the new follower as a peer and initiates + * snapshot capture, if necessary. + */ + private class InitialAddServerState extends AddServerState implements InitialOperationState { + InitialAddServerState(AddServerContext addServerContext) { + super(addServerContext); + } + + @Override + public void initiate(RaftActor raftActor) { + AbstractLeader leader = (AbstractLeader) raftActor.getCurrentBehavior(); + + AddServer addServer = getAddServerContext().getOperation(); + + LOG.debug("{}: Initiating {}", raftContext.getId(), addServer); + + raftContext.addToPeers(addServer.getNewServerId(), addServer.getNewServerAddress()); + + // if voting member - initialize to VOTING_NOT_INITIALIZED + FollowerState initialState = addServer.isVotingMember() ? FollowerState.VOTING_NOT_INITIALIZED : + FollowerState.NON_VOTING; + leader.addFollower(addServer.getNewServerId(), initialState); - private void stopFollowerTimer() { - if (followerTimeout != null && !followerTimeout.isCancelled()) { - followerTimeout.cancel(); + if(initialState == FollowerState.VOTING_NOT_INITIALIZED){ + LOG.debug("{}: Leader sending initiate capture snapshot to new follower {}", raftContext.getId(), + addServer.getNewServerId()); + + leader.initiateCaptureSnapshot(addServer.getNewServerId()); + + // schedule the install snapshot timeout timer + Cancellable installSnapshotTimer = raftContext.getActorSystem().scheduler().scheduleOnce( + new FiniteDuration(((raftContext.getConfigParams().getElectionTimeOutInterval().toMillis()) * 2), + TimeUnit.MILLISECONDS), raftContext.getActor(), + new FollowerCatchUpTimeout(addServer.getNewServerId()), + raftContext.getActorSystem().dispatcher(), raftContext.getActor()); + + currentOperationState = new InstallingSnapshot(getAddServerContext(), installSnapshotTimer); + } else { + LOG.debug("{}: New follower is non-voting - directly persisting new server configuration", + raftContext.getId()); + + persistNewServerConfiguration(raftActor, getAddServerContext()); + } } - } + } - private void onFollowerCatchupTimeout(RaftActor raftActor, ActorRef sender, String serverId){ - LOG.debug("onFollowerCatchupTimeout: {}", serverId); - AbstractLeader leader = (AbstractLeader) raftActor.getCurrentBehavior(); - // cleanup - context.removePeer(serverId); - leader.removeFollower(serverId); - LOG.warn("Timeout occured for new server {} while installing snapshot", serverId); - respondToClient(raftActor,ServerChangeStatus.TIMEOUT); - } + /** + * The AddServer operation state for when the catch-up snapshot is being installed. It handles successful + * reply or timeout. + */ + private class InstallingSnapshot extends AddServerState { + private final Cancellable installSnapshotTimer; + + InstallingSnapshot(AddServerContext addServerContext, Cancellable installSnapshotTimer) { + super(addServerContext); + this.installSnapshotTimer = Preconditions.checkNotNull(installSnapshotTimer); + } + + @Override + public void onFollowerCatchupTimeout(RaftActor raftActor, FollowerCatchUpTimeout followerTimeout) { + String serverId = followerTimeout.getNewServerId(); + + LOG.debug("{}: onFollowerCatchupTimeout: {}", raftContext.getId(), serverId); + + AbstractLeader leader = (AbstractLeader) raftActor.getCurrentBehavior(); + + // cleanup + raftContext.removePeer(serverId); + leader.removeFollower(serverId); - private void respondToClient(RaftActor raftActor, ServerChangeStatus result){ - // remove the entry from the queue - CatchupFollowerInfo fInfo = followerInfoQueue.remove(); + LOG.warn("{}: Timeout occured for new server {} while installing snapshot", raftContext.getId(), serverId); - // get the sender - ActorRef toClient = fInfo.getClientRequestor(); + operationComplete(raftActor, getAddServerContext(), ServerChangeStatus.TIMEOUT); + } + + @Override + public void onUnInitializedFollowerSnapshotReply(RaftActor raftActor, UnInitializedFollowerSnapshotReply reply) { + LOG.debug("{}: onUnInitializedFollowerSnapshotReply: {}", raftContext.getId(), reply); + + String followerId = reply.getFollowerId(); + + // Sanity check to guard against receiving an UnInitializedFollowerSnapshotReply from a prior + // add server operation that timed out. + if(getAddServerContext().getOperation().getNewServerId().equals(followerId)) { + AbstractLeader leader = (AbstractLeader) raftActor.getCurrentBehavior(); + FollowerLogInformation followerLogInformation = leader.getFollower(followerId); - toClient.tell(new AddServerReply(result, raftActor.getLeaderId()), raftActor.self()); - LOG.debug("Response returned is {} for server {} ", result, fInfo.getAddServer().getNewServerId()); - if(!followerInfoQueue.isEmpty()){ - processAddServer(raftActor); + installSnapshotTimer.cancel(); + + followerLogInformation.setFollowerState(FollowerState.VOTING); + leader.updateMinReplicaCountAndMinIsolatedLeaderPeerCount(); + + persistNewServerConfiguration(raftActor, getAddServerContext()); + } } - } + } - // maintain sender actorRef - private static class CatchupFollowerInfo { - private final AddServer addServer; + /** + * Stores context information for a server operation. + * + * @param the operation type + */ + private static abstract class ServerOperationContext { + private final T operation; private final ActorRef clientRequestor; private final String contextId; - CatchupFollowerInfo(AddServer addSrv, ActorRef cliReq){ - addServer = addSrv; - clientRequestor = cliReq; + ServerOperationContext(T operation, ActorRef clientRequestor){ + this.operation = operation; + this.clientRequestor = clientRequestor; contextId = UUID.randomUUID().toString(); } @@ -239,12 +377,35 @@ class RaftActorServerConfigurationSupport { return contextId; } - AddServer getAddServer(){ - return addServer; + T getOperation() { + return operation; } - ActorRef getClientRequestor(){ + ActorRef getClientRequestor() { return clientRequestor; } + + abstract Object newReply(ServerChangeStatus status, String leaderId); + + abstract InitialOperationState newInitialOperationState(RaftActorServerConfigurationSupport support); + } + + /** + * Stores context information for an AddServer operation. + */ + private static class AddServerContext extends ServerOperationContext { + AddServerContext(AddServer addServer, ActorRef clientRequestor) { + super(addServer, clientRequestor); + } + + @Override + Object newReply(ServerChangeStatus status, String leaderId) { + return new AddServerReply(status, leaderId); + } + + @Override + InitialOperationState newInitialOperationState(RaftActorServerConfigurationSupport support) { + return support.new InitialAddServerState(this); + } } } diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractLeader.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractLeader.java index 3c5ad0428f..d4612dbeda 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractLeader.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractLeader.java @@ -157,7 +157,7 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior { minIsolatedLeaderPeerCount = minReplicationCount > 0 ? (minReplicationCount - 1) : 0; } - public int getMinIsolatedLeaderPeerCount(){ + protected int getMinIsolatedLeaderPeerCount(){ return minIsolatedLeaderPeerCount; } @@ -433,7 +433,7 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior { } wasLastChunk = true; FollowerState followerState = followerLogInformation.getFollowerState(); - if(followerState == FollowerState.VOTING_NOT_INITIALIZED){ + if(followerState == FollowerState.VOTING_NOT_INITIALIZED){ UnInitializedFollowerSnapshotReply unInitFollowerSnapshotSuccess = new UnInitializedFollowerSnapshotReply(followerId); context.getActor().tell(unInitFollowerSnapshotSuccess, context.getActor()); diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractRaftActorBehavior.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractRaftActorBehavior.java index 787bd74629..b20671f9d8 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractRaftActorBehavior.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractRaftActorBehavior.java @@ -10,6 +10,8 @@ package org.opendaylight.controller.cluster.raft.behaviors; import akka.actor.ActorRef; import akka.actor.Cancellable; +import java.util.HashMap; +import java.util.Map; import java.util.Random; import java.util.concurrent.TimeUnit; import org.opendaylight.controller.cluster.raft.ClientRequestTracker; @@ -492,14 +494,19 @@ public abstract class AbstractRaftActorBehavior implements RaftActorBehavior { } public void applyServerConfiguration(ServerConfigurationPayload serverConfig) { - for(String peerId: context.getPeerAddresses().keySet()) { - context.removePeer(peerId); - } - + Map currentPeers = new HashMap<>(context.getPeerAddresses()); for(String peerId: serverConfig.getNewServerConfig()) { if(!getId().equals(peerId)) { - context.addToPeers(peerId, null); + if(!currentPeers.containsKey(peerId)) { + context.addToPeers(peerId, null); + } else { + currentPeers.remove(peerId); + } } } + + for(String peerIdToRemove: currentPeers.keySet()) { + context.removePeer(peerIdToRemove); + } } } diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/messages/FollowerCatchUpTimeout.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/messages/FollowerCatchUpTimeout.java index 6700702794..365e3a0b3c 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/messages/FollowerCatchUpTimeout.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/messages/FollowerCatchUpTimeout.java @@ -1,5 +1,5 @@ /* - * Copyright (c) 2014 Dell Systems, Inc. and others. All rights reserved. + * Copyright (c) 2015 Dell Systems, Inc. and others. All rights reserved. * * This program and the accompanying materials are made available under the * terms of the Eclipse Public License v1.0 which accompanies this distribution, diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/messages/UnInitializedFollowerSnapshotReply.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/messages/UnInitializedFollowerSnapshotReply.java index 50f20705ca..4d66fa6d4d 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/messages/UnInitializedFollowerSnapshotReply.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/messages/UnInitializedFollowerSnapshotReply.java @@ -1,5 +1,5 @@ /* - * Copyright (c) 2014 Dell Systems, Inc. and others. All rights reserved. + * Copyright (c) 2015 Dell Systems, Inc. and others. All rights reserved. * * This program and the accompanying materials are made available under the * terms of the Eclipse Public License v1.0 which accompanies this distribution, @@ -14,10 +14,16 @@ package org.opendaylight.controller.cluster.raft.messages; public class UnInitializedFollowerSnapshotReply { private final String followerId; - public UnInitializedFollowerSnapshotReply(String follId){ - this.followerId = follId; + public UnInitializedFollowerSnapshotReply(String followerId){ + this.followerId = followerId; } + public String getFollowerId() { return followerId; } + + @Override + public String toString() { + return "UnInitializedFollowerSnapshotReply [followerId=" + followerId + "]"; + } } diff --git a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/RaftActorServerConfigurationSupportTest.java b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/RaftActorServerConfigurationSupportTest.java index 5c9ba13b8b..2528c8aad4 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/RaftActorServerConfigurationSupportTest.java +++ b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/RaftActorServerConfigurationSupportTest.java @@ -59,6 +59,7 @@ public class RaftActorServerConfigurationSupportTest extends AbstractActorTest { static final String LEADER_ID = "leader"; static final String FOLLOWER_ID = "follower"; static final String NEW_SERVER_ID = "new-server"; + static final String NEW_SERVER_ID2 = "new-server2"; private static final Logger LOG = LoggerFactory.getLogger(RaftActorServerConfigurationSupportTest.class); private static final DataPersistenceProvider NO_PERSISTENCE = new NonPersistentDataProvider(); @@ -70,8 +71,8 @@ public class RaftActorServerConfigurationSupportTest extends AbstractActorTest { private TestActorRef newFollowerRaftActor; private TestActorRef newFollowerCollectorActor; - private RaftActorContext newFollowerActorContext; + private final JavaTestKit testKit = new JavaTestKit(getSystem()); @Before @@ -79,10 +80,7 @@ public class RaftActorServerConfigurationSupportTest extends AbstractActorTest { InMemoryJournal.clear(); InMemorySnapshotStore.clear(); - DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl(); - configParams.setHeartBeatInterval(new FiniteDuration(100, TimeUnit.MILLISECONDS)); - configParams.setElectionTimeoutFactor(100000); - configParams.setCustomRaftPolicyImplementationClass(DisableElectionsRaftPolicy.class.getName()); + DefaultConfigParamsImpl configParams = newFollowerConfigParams(); newFollowerCollectorActor = actorFactory.createTestActor( MessageCollectorActor.props().withDispatcher(Dispatchers.DefaultDispatcherId()), @@ -90,7 +88,20 @@ public class RaftActorServerConfigurationSupportTest extends AbstractActorTest { newFollowerRaftActor = actorFactory.createTestActor(MockNewFollowerRaftActor.props( configParams, newFollowerCollectorActor).withDispatcher(Dispatchers.DefaultDispatcherId()), actorFactory.generateActorId(NEW_SERVER_ID)); - newFollowerActorContext = newFollowerRaftActor.underlyingActor().getRaftActorContext(); + + try { + newFollowerActorContext = newFollowerRaftActor.underlyingActor().getRaftActorContext(); + } catch (Exception e) { + newFollowerActorContext = newFollowerRaftActor.underlyingActor().getRaftActorContext(); + } + } + + private DefaultConfigParamsImpl newFollowerConfigParams() { + DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl(); + configParams.setHeartBeatInterval(new FiniteDuration(100, TimeUnit.MILLISECONDS)); + configParams.setElectionTimeoutFactor(100000); + configParams.setCustomRaftPolicyImplementationClass(DisableElectionsRaftPolicy.class.getName()); + return configParams; } @After @@ -156,9 +167,6 @@ public class RaftActorServerConfigurationSupportTest extends AbstractActorTest { assertEquals("New follower peers", Sets.newHashSet(LEADER_ID, FOLLOWER_ID), newFollowerActorContext.getPeerAddresses().keySet()); - clearMessages(followerActor); - clearMessages(newFollowerCollectorActor); - expectFirstMatching(newFollowerCollectorActor, ApplyState.class); expectFirstMatching(followerActor, ApplyState.class); @@ -205,8 +213,6 @@ public class RaftActorServerConfigurationSupportTest extends AbstractActorTest { // Verify ServerConfigurationPayload entry in the new follower - clearMessages(newFollowerCollectorActor); - expectFirstMatching(newFollowerCollectorActor, ApplyState.class); assertEquals("New follower journal last index", 2, newFollowerActorContext.getReplicatedLog().lastIndex()); verifyServerConfigurationPayloadEntry(newFollowerActorContext.getReplicatedLog(), LEADER_ID, NEW_SERVER_ID); @@ -218,7 +224,7 @@ public class RaftActorServerConfigurationSupportTest extends AbstractActorTest { } @Test - public void testAddServerAsNonVoting() throws Exception { + public void testAddServersAsNonVoting() throws Exception { RaftActorContext initialActorContext = new MockRaftActorContext(); initialActorContext.setCommitIndex(-1); initialActorContext.setLastApplied(-1); @@ -256,7 +262,81 @@ public class RaftActorServerConfigurationSupportTest extends AbstractActorTest { assertEquals("New follower peers", Sets.newHashSet(LEADER_ID), newFollowerActorContext.getPeerAddresses().keySet()); - MessageCollectorActor.assertNoneMatching(newFollowerCollectorActor, InstallSnapshot.SERIALIZABLE_CLASS, 500); + MessageCollectorActor.assertNoneMatching(newFollowerCollectorActor, InstallSnapshot.class, 500); + + // Add another non-voting server. + + RaftActorContext follower2ActorContext = newFollowerContext(NEW_SERVER_ID2, followerActor); + Follower newFollower2 = new Follower(follower2ActorContext); + followerActor.underlyingActor().setBehavior(newFollower2); + + leaderActor.tell(new AddServer(NEW_SERVER_ID2, followerActor.path().toString(), false), testKit.getRef()); + + addServerReply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"), AddServerReply.class); + assertEquals("getStatus", ServerChangeStatus.OK, addServerReply.getStatus()); + assertEquals("getLeaderHint", LEADER_ID, addServerReply.getLeaderHint()); + + assertEquals("Leader journal last index", 1, leaderActorContext.getReplicatedLog().lastIndex()); + assertEquals("Leader commit index", 1, leaderActorContext.getCommitIndex()); + assertEquals("Leader last applied index", 1, leaderActorContext.getLastApplied()); + verifyServerConfigurationPayloadEntry(leaderActorContext.getReplicatedLog(), + LEADER_ID, NEW_SERVER_ID, NEW_SERVER_ID2); + } + + @Test + public void testAddServerWithOperationInProgress() throws Exception { + RaftActorContext initialActorContext = new MockRaftActorContext(); + initialActorContext.setCommitIndex(-1); + initialActorContext.setLastApplied(-1); + initialActorContext.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().build()); + + TestActorRef leaderActor = actorFactory.createTestActor( + MockLeaderRaftActor.props(ImmutableMap.of(), + initialActorContext).withDispatcher(Dispatchers.DefaultDispatcherId()), + actorFactory.generateActorId(LEADER_ID)); + + MockLeaderRaftActor leaderRaftActor = leaderActor.underlyingActor(); + RaftActorContext leaderActorContext = leaderRaftActor.getRaftActorContext(); + + RaftActorContext follower2ActorContext = newFollowerContext(NEW_SERVER_ID2, followerActor); + Follower newFollower2 = new Follower(follower2ActorContext); + followerActor.underlyingActor().setBehavior(newFollower2); + + MockNewFollowerRaftActor newFollowerRaftActorInstance = newFollowerRaftActor.underlyingActor(); + newFollowerRaftActorInstance.setDropMessageOfType(InstallSnapshot.SERIALIZABLE_CLASS); + + leaderActor.tell(new AddServer(NEW_SERVER_ID, newFollowerRaftActor.path().toString(), true), testKit.getRef()); + + // Wait for leader's install snapshot and capture it + + Object installSnapshot = expectFirstMatching(newFollowerCollectorActor, InstallSnapshot.class); + + JavaTestKit testKit2 = new JavaTestKit(getSystem()); + leaderActor.tell(new AddServer(NEW_SERVER_ID2, followerActor.path().toString(), false), testKit2.getRef()); + + newFollowerRaftActorInstance.setDropMessageOfType(null); + newFollowerRaftActor.tell(installSnapshot, leaderActor); + + AddServerReply addServerReply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"), AddServerReply.class); + assertEquals("getStatus", ServerChangeStatus.OK, addServerReply.getStatus()); + + addServerReply = testKit2.expectMsgClass(JavaTestKit.duration("5 seconds"), AddServerReply.class); + assertEquals("getStatus", ServerChangeStatus.OK, addServerReply.getStatus()); + + // Verify ServerConfigurationPayload entries in leader's log + + assertEquals("Leader journal last index", 1, leaderActorContext.getReplicatedLog().lastIndex()); + assertEquals("Leader commit index", 1, leaderActorContext.getCommitIndex()); + assertEquals("Leader last applied index", 1, leaderActorContext.getLastApplied()); + verifyServerConfigurationPayloadEntry(leaderActorContext.getReplicatedLog(), + LEADER_ID, NEW_SERVER_ID, NEW_SERVER_ID2); + + // Verify ServerConfigurationPayload entry in the new follower + + MessageCollectorActor.expectMatching(newFollowerCollectorActor, ApplyState.class, 2); + + assertEquals("New follower peers", Sets.newHashSet(LEADER_ID, NEW_SERVER_ID2), + newFollowerActorContext.getPeerAddresses().keySet()); } @Test @@ -275,6 +355,7 @@ public class RaftActorServerConfigurationSupportTest extends AbstractActorTest { MockLeaderRaftActor leaderRaftActor = leaderActor.underlyingActor(); RaftActorContext leaderActorContext = leaderRaftActor.getRaftActorContext(); + ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setElectionTimeoutFactor(1); leaderActor.tell(new AddServer(NEW_SERVER_ID, newFollowerRaftActor.path().toString(), true), testKit.getRef()); @@ -340,6 +421,9 @@ public class RaftActorServerConfigurationSupportTest extends AbstractActorTest { RaftActorContext followerActorContext = new RaftActorContextImpl(actor, actor.underlyingActor().getContext(), id, termInfo, -1, -1, ImmutableMap.of(LEADER_ID, ""), configParams, NO_PERSISTENCE, LOG); + followerActorContext.setCommitIndex(-1); + followerActorContext.setLastApplied(-1); + followerActorContext.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().build()); return followerActorContext; } @@ -380,7 +464,7 @@ public class RaftActorServerConfigurationSupportTest extends AbstractActorTest { static Props props(Map peerAddresses, RaftActorContext fromContext) { DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl(); configParams.setHeartBeatInterval(new FiniteDuration(100, TimeUnit.MILLISECONDS)); - configParams.setElectionTimeoutFactor(1); + configParams.setElectionTimeoutFactor(10); return Props.create(MockLeaderRaftActor.class, peerAddresses, configParams, fromContext); } } @@ -400,11 +484,10 @@ public class RaftActorServerConfigurationSupportTest extends AbstractActorTest { @Override public void handleCommand(Object message) { - if(dropMessageOfType != null && dropMessageOfType.equals(message.getClass())) { - return; + if(dropMessageOfType == null || !dropMessageOfType.equals(message.getClass())) { + super.handleCommand(message); } - super.handleCommand(message); collectorActor.tell(message, getSender()); } -- 2.36.6