From: Gary Wu Date: Wed, 2 Dec 2015 21:34:47 +0000 (-0800) Subject: Add RaftActorServerConfigurationSupport.raftActor X-Git-Tag: release/beryllium~85 X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=commitdiff_plain;h=388dd012c7b36177808ff5c5ad692b16dd58c944 Add RaftActorServerConfigurationSupport.raftActor Add raftActor as a field to RaftActorServerConfigurationSupport to avoid passing raftActor around through all the method calls. Change-Id: I19eb16877af98e9e05ec698c321081e211a7e572 Signed-off-by: Gary Wu --- diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActor.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActor.java index df40cc5c55..2caba55a23 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActor.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActor.java @@ -146,7 +146,7 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor { super.preStart(); snapshotSupport = newRaftActorSnapshotMessageSupport(); - serverConfigurationSupport = new RaftActorServerConfigurationSupport(getRaftActorContext()); + serverConfigurationSupport = new RaftActorServerConfigurationSupport(this); } @Override @@ -201,7 +201,7 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor { @Override public void handleCommand(final Object message) { - if(serverConfigurationSupport.handleMessage(message, this, getSender())) { + if(serverConfigurationSupport.handleMessage(message, getSender())) { return; } else if (message instanceof ApplyState){ ApplyState applyState = (ApplyState) message; diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActorServerConfigurationSupport.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActorServerConfigurationSupport.java index 2a2b50137c..f969e3d67d 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActorServerConfigurationSupport.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActorServerConfigurationSupport.java @@ -39,41 +39,43 @@ class RaftActorServerConfigurationSupport { private final OperationState IDLE = new Idle(); + private final RaftActor raftActor; + private final RaftActorContext raftContext; private final Queue> pendingOperationsQueue = new LinkedList<>(); private OperationState currentOperationState = IDLE; - RaftActorServerConfigurationSupport(RaftActorContext context) { - this.raftContext = context; + RaftActorServerConfigurationSupport(RaftActor raftActor) { + this.raftActor = raftActor; + this.raftContext = raftActor.getRaftActorContext(); } - boolean handleMessage(Object message, RaftActor raftActor, ActorRef sender) { + boolean handleMessage(Object message, ActorRef sender) { if(message instanceof AddServer) { - onAddServer((AddServer) message, raftActor, sender); + onAddServer((AddServer) message, sender); return true; } else if(message instanceof RemoveServer) { - onRemoveServer((RemoveServer) message, raftActor, sender); + onRemoveServer((RemoveServer) message, sender); return true; } else if (message instanceof ServerOperationTimeout) { - currentOperationState.onServerOperationTimeout(raftActor, (ServerOperationTimeout) message); + currentOperationState.onServerOperationTimeout((ServerOperationTimeout) message); return true; } else if (message instanceof UnInitializedFollowerSnapshotReply) { - currentOperationState.onUnInitializedFollowerSnapshotReply(raftActor, - (UnInitializedFollowerSnapshotReply) message); + currentOperationState.onUnInitializedFollowerSnapshotReply((UnInitializedFollowerSnapshotReply) message); return true; } else if(message instanceof ApplyState) { - return onApplyState((ApplyState) message, raftActor); + return onApplyState((ApplyState) message); } else if(message instanceof SnapshotComplete) { - currentOperationState.onSnapshotComplete(raftActor); + currentOperationState.onSnapshotComplete(); return false; } else { return false; } } - private void onRemoveServer(RemoveServer removeServer, RaftActor raftActor, ActorRef sender) { + private void onRemoveServer(RemoveServer removeServer, ActorRef sender) { LOG.debug("{}: onRemoveServer: {}, state: {}", raftContext.getId(), removeServer, currentOperationState); if(removeServer.getServerId().equals(raftActor.getLeaderId())){ // Removing current leader is not supported yet @@ -83,14 +85,14 @@ class RaftActorServerConfigurationSupport { } else if(!raftContext.getPeerIds().contains(removeServer.getServerId())) { sender.tell(new RemoveServerReply(ServerChangeStatus.DOES_NOT_EXIST, raftActor.getLeaderId()), raftActor.getSelf()); } else { - onNewOperation(raftActor, new RemoveServerContext(removeServer, raftContext.getPeerAddress(removeServer.getServerId()), sender)); + onNewOperation(new RemoveServerContext(removeServer, raftContext.getPeerAddress(removeServer.getServerId()), sender)); } } - private boolean onApplyState(ApplyState applyState, RaftActor raftActor) { + private boolean onApplyState(ApplyState applyState) { Payload data = applyState.getReplicatedLogEntry().getData(); if(data instanceof ServerConfigurationPayload) { - currentOperationState.onApplyState(raftActor, applyState); + currentOperationState.onApplyState(applyState); return true; } @@ -118,15 +120,15 @@ class RaftActorServerConfigurationSupport { *
  • Respond to caller with TIMEOUT.
  • * */ - private void onAddServer(AddServer addServer, RaftActor raftActor, ActorRef sender) { + private void onAddServer(AddServer addServer, ActorRef sender) { LOG.debug("{}: onAddServer: {}, state: {}", raftContext.getId(), addServer, currentOperationState); - onNewOperation(raftActor, new AddServerContext(addServer, sender)); + onNewOperation(new AddServerContext(addServer, sender)); } - private void onNewOperation(RaftActor raftActor, ServerOperationContext operationContext) { + private void onNewOperation(ServerOperationContext operationContext) { if (raftActor.isLeader()) { - currentOperationState.onNewOperation(raftActor, operationContext); + currentOperationState.onNewOperation(operationContext); } else { ActorSelection leader = raftActor.getLeader(); if (leader != null) { @@ -144,22 +146,22 @@ class RaftActorServerConfigurationSupport { * Interface for a server operation FSM state. */ private interface OperationState { - void onNewOperation(RaftActor raftActor, ServerOperationContext operationContext); + void onNewOperation(ServerOperationContext operationContext); - void onServerOperationTimeout(RaftActor raftActor, ServerOperationTimeout timeout); + void onServerOperationTimeout(ServerOperationTimeout timeout); - void onUnInitializedFollowerSnapshotReply(RaftActor raftActor, UnInitializedFollowerSnapshotReply reply); + void onUnInitializedFollowerSnapshotReply(UnInitializedFollowerSnapshotReply reply); - void onApplyState(RaftActor raftActor, ApplyState applyState); + void onApplyState(ApplyState applyState); - void onSnapshotComplete(RaftActor raftActor); + void onSnapshotComplete(); } /** * Interface for the initial state for a server operation. */ private interface InitialOperationState { - void initiate(RaftActor raftActor); + void initiate(); } /** @@ -167,7 +169,7 @@ class RaftActorServerConfigurationSupport { */ private abstract class AbstractOperationState implements OperationState { @Override - public void onNewOperation(RaftActor raftActor, ServerOperationContext operationContext) { + public void onNewOperation(ServerOperationContext operationContext) { // We're currently processing another operation so queue it to be processed later. LOG.debug("{}: Server operation already in progress - queueing {}", raftContext.getId(), @@ -177,41 +179,39 @@ class RaftActorServerConfigurationSupport { } @Override - public void onServerOperationTimeout(RaftActor raftActor, ServerOperationTimeout timeout) { + public void onServerOperationTimeout(ServerOperationTimeout timeout) { LOG.debug("onServerOperationTimeout should not be called in state {}", this); } @Override - public void onUnInitializedFollowerSnapshotReply(RaftActor raftActor, UnInitializedFollowerSnapshotReply reply) { + public void onUnInitializedFollowerSnapshotReply(UnInitializedFollowerSnapshotReply reply) { LOG.debug("onUnInitializedFollowerSnapshotReply was called in state {}", this); } @Override - public void onApplyState(RaftActor raftActor, ApplyState applyState) { + public void onApplyState(ApplyState applyState) { LOG.debug("onApplyState was called in state {}", this); } @Override - public void onSnapshotComplete(RaftActor raftActor) { + public void onSnapshotComplete() { } - protected void persistNewServerConfiguration(RaftActor raftActor, ServerOperationContext operationContext){ + protected void persistNewServerConfiguration(ServerOperationContext operationContext){ raftContext.setDynamicServerConfigurationInUse(); ServerConfigurationPayload payload = raftContext.getPeerServerInfo(); LOG.debug("{}: New server configuration : {}", raftContext.getId(), payload.getServerConfig()); raftActor.persistData(operationContext.getClientRequestor(), operationContext.getContextId(), payload); - currentOperationState = new Persisting(operationContext, newTimer( - new ServerOperationTimeout(operationContext.getServerId()))); + currentOperationState = new Persisting(operationContext, newTimer(new ServerOperationTimeout(operationContext.getServerId()))); - sendReply(raftActor, operationContext, ServerChangeStatus.OK); + sendReply(operationContext, ServerChangeStatus.OK); } - protected void operationComplete(RaftActor raftActor, ServerOperationContext operationContext, - @Nullable ServerChangeStatus replyStatus) { + protected void operationComplete(ServerOperationContext operationContext, @Nullable ServerChangeStatus replyStatus) { if(replyStatus != null) { - sendReply(raftActor, operationContext, replyStatus); + sendReply(operationContext, replyStatus); } operationContext.operationComplete(raftActor, replyStatus); @@ -220,12 +220,11 @@ class RaftActorServerConfigurationSupport { ServerOperationContext nextOperation = pendingOperationsQueue.poll(); if(nextOperation != null) { - RaftActorServerConfigurationSupport.this.onNewOperation(raftActor, nextOperation); + RaftActorServerConfigurationSupport.this.onNewOperation(nextOperation); } } - protected void sendReply(RaftActor raftActor, ServerOperationContext operationContext, - ServerChangeStatus status) { + protected void sendReply(ServerOperationContext operationContext, ServerChangeStatus status) { LOG.debug("{}: Returning {} for operation {}", raftContext.getId(), status, operationContext.getOperation()); operationContext.getClientRequestor().tell(operationContext.newReply(status, raftActor.getLeaderId()), @@ -234,8 +233,8 @@ class RaftActorServerConfigurationSupport { Cancellable newTimer(Object message) { return raftContext.getActorSystem().scheduler().scheduleOnce( - raftContext.getConfigParams().getElectionTimeOutInterval().$times(2), raftContext.getActor(), - message, raftContext.getActorSystem().dispatcher(), raftContext.getActor()); + raftContext.getConfigParams().getElectionTimeOutInterval().$times(2), raftContext.getActor(), message, + raftContext.getActorSystem().dispatcher(), raftContext.getActor()); } @Override @@ -249,12 +248,12 @@ class RaftActorServerConfigurationSupport { */ private class Idle extends AbstractOperationState { @Override - public void onNewOperation(RaftActor raftActor, ServerOperationContext operationContext) { - operationContext.newInitialOperationState(RaftActorServerConfigurationSupport.this).initiate(raftActor); + public void onNewOperation(ServerOperationContext operationContext) { + operationContext.newInitialOperationState(RaftActorServerConfigurationSupport.this).initiate(); } @Override - public void onApplyState(RaftActor raftActor, ApplyState applyState) { + public void onApplyState(ApplyState applyState) { // Noop - we override b/c ApplyState is called normally for followers in the idle state. } } @@ -273,7 +272,7 @@ class RaftActorServerConfigurationSupport { } @Override - public void onApplyState(RaftActor raftActor, ApplyState applyState) { + public void onApplyState(ApplyState applyState) { // Sanity check - we could get an ApplyState from a previous operation that timed out so make // sure it's meant for us. if(operationContext.getContextId().equals(applyState.getIdentifier())) { @@ -281,12 +280,12 @@ class RaftActorServerConfigurationSupport { applyState.getReplicatedLogEntry().getData()); timer.cancel(); - operationComplete(raftActor, operationContext, null); + operationComplete(operationContext, null); } } @Override - public void onServerOperationTimeout(RaftActor raftActor, ServerOperationTimeout timeout) { + public void onServerOperationTimeout(ServerOperationTimeout timeout) { LOG.warn("{}: Timeout occured while replicating the new server configuration for {}", raftContext.getId(), timeout.getServerId()); @@ -295,17 +294,17 @@ class RaftActorServerConfigurationSupport { // Fail any pending operations ServerOperationContext nextOperation = pendingOperationsQueue.poll(); while(nextOperation != null) { - sendReply(raftActor, nextOperation, ServerChangeStatus.PRIOR_REQUEST_CONSENSUS_TIMEOUT); + sendReply(nextOperation, ServerChangeStatus.PRIOR_REQUEST_CONSENSUS_TIMEOUT); nextOperation = pendingOperationsQueue.poll(); } } @Override - public void onNewOperation(RaftActor raftActor, ServerOperationContext operationContext) { + public void onNewOperation(ServerOperationContext operationContext) { if(timedOut) { - sendReply(raftActor, operationContext, ServerChangeStatus.PRIOR_REQUEST_CONSENSUS_TIMEOUT); + sendReply(operationContext, ServerChangeStatus.PRIOR_REQUEST_CONSENSUS_TIMEOUT); } else { - super.onNewOperation(raftActor, operationContext); + super.onNewOperation(operationContext); } } } @@ -324,11 +323,11 @@ class RaftActorServerConfigurationSupport { return addServerContext; } - Cancellable newInstallSnapshotTimer(RaftActor raftActor) { + Cancellable newInstallSnapshotTimer() { return newTimer(new ServerOperationTimeout(addServerContext.getOperation().getNewServerId())); } - void handleInstallSnapshotTimeout(RaftActor raftActor, ServerOperationTimeout timeout) { + void handleInstallSnapshotTimeout(ServerOperationTimeout timeout) { String serverId = timeout.getServerId(); LOG.debug("{}: handleInstallSnapshotTimeout for new server {}", raftContext.getId(), serverId); @@ -342,8 +341,7 @@ class RaftActorServerConfigurationSupport { leader.removeFollower(serverId); } - operationComplete(raftActor, getAddServerContext(), - isLeader ? ServerChangeStatus.TIMEOUT : ServerChangeStatus.NO_LEADER); + operationComplete(getAddServerContext(), isLeader ? ServerChangeStatus.TIMEOUT : ServerChangeStatus.NO_LEADER); } } @@ -358,14 +356,14 @@ class RaftActorServerConfigurationSupport { } @Override - public void initiate(RaftActor raftActor) { + public void initiate() { AbstractLeader leader = (AbstractLeader) raftActor.getCurrentBehavior(); AddServer addServer = getAddServerContext().getOperation(); LOG.debug("{}: Initiating {}", raftContext.getId(), addServer); if(raftContext.getPeerInfo(addServer.getNewServerId()) != null) { - operationComplete(raftActor, getAddServerContext(), ServerChangeStatus.ALREADY_EXISTS); + operationComplete(getAddServerContext(), ServerChangeStatus.ALREADY_EXISTS); return; } @@ -377,7 +375,7 @@ class RaftActorServerConfigurationSupport { if(votingState == VotingState.VOTING_NOT_INITIALIZED){ // schedule the install snapshot timeout timer - Cancellable installSnapshotTimer = newInstallSnapshotTimer(raftActor); + Cancellable installSnapshotTimer = newInstallSnapshotTimer(); if(leader.initiateCaptureSnapshot(addServer.getNewServerId())) { LOG.debug("{}: Initiating capture snapshot for new server {}", raftContext.getId(), addServer.getNewServerId()); @@ -393,7 +391,7 @@ class RaftActorServerConfigurationSupport { LOG.debug("{}: New follower is non-voting - directly persisting new server configuration", raftContext.getId()); - persistNewServerConfiguration(raftActor, getAddServerContext()); + persistNewServerConfiguration(getAddServerContext()); } } } @@ -411,15 +409,15 @@ class RaftActorServerConfigurationSupport { } @Override - public void onServerOperationTimeout(RaftActor raftActor, ServerOperationTimeout timeout) { - handleInstallSnapshotTimeout(raftActor, timeout); + public void onServerOperationTimeout(ServerOperationTimeout timeout) { + handleInstallSnapshotTimeout(timeout); LOG.warn("{}: Timeout occured for new server {} while installing snapshot", raftContext.getId(), timeout.getServerId()); } @Override - public void onUnInitializedFollowerSnapshotReply(RaftActor raftActor, UnInitializedFollowerSnapshotReply reply) { + public void onUnInitializedFollowerSnapshotReply(UnInitializedFollowerSnapshotReply reply) { LOG.debug("{}: onUnInitializedFollowerSnapshotReply: {}", raftContext.getId(), reply); String followerId = reply.getFollowerId(); @@ -431,7 +429,7 @@ class RaftActorServerConfigurationSupport { raftContext.getPeerInfo(followerId).setVotingState(VotingState.VOTING); leader.updateMinReplicaCount(); - persistNewServerConfiguration(raftActor, getAddServerContext()); + persistNewServerConfiguration(getAddServerContext()); installSnapshotTimer.cancel(); } else { @@ -455,7 +453,7 @@ class RaftActorServerConfigurationSupport { } @Override - public void onSnapshotComplete(RaftActor raftActor) { + public void onSnapshotComplete() { LOG.debug("{}: onSnapshotComplete", raftContext.getId()); if(!raftActor.isLeader()) { @@ -469,15 +467,15 @@ class RaftActorServerConfigurationSupport { getAddServerContext().getOperation().getNewServerId()); currentOperationState = new InstallingSnapshot(getAddServerContext(), - newInstallSnapshotTimer(raftActor)); + newInstallSnapshotTimer()); snapshotTimer.cancel(); } } @Override - public void onServerOperationTimeout(RaftActor raftActor, ServerOperationTimeout timeout) { - handleInstallSnapshotTimeout(raftActor, timeout); + public void onServerOperationTimeout(ServerOperationTimeout timeout) { + handleInstallSnapshotTimeout(timeout); LOG.warn("{}: Timeout occured for new server {} while waiting for prior snapshot to complete", raftContext.getId(), timeout.getServerId()); @@ -571,9 +569,9 @@ class RaftActorServerConfigurationSupport { } @Override - public void initiate(RaftActor raftActor) { + public void initiate() { raftContext.removePeer(getRemoveServerContext().getOperation().getServerId()); - persistNewServerConfiguration(raftActor, getRemoveServerContext()); + persistNewServerConfiguration(getRemoveServerContext()); } } diff --git a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/RaftActorServerConfigurationSupportTest.java b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/RaftActorServerConfigurationSupportTest.java index d6cd98218f..0e29a3a031 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/RaftActorServerConfigurationSupportTest.java +++ b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/RaftActorServerConfigurationSupportTest.java @@ -672,16 +672,23 @@ public class RaftActorServerConfigurationSupportTest extends AbstractActorTest { @Test public void testOnApplyState() { - RaftActorServerConfigurationSupport support = new RaftActorServerConfigurationSupport(new MockRaftActorContext()); + DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl(); + configParams.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS)); + TestActorRef noLeaderActor = actorFactory.createTestActor( + MockRaftActor.props(LEADER_ID, ImmutableMap.of(FOLLOWER_ID, followerActor.path().toString()), + configParams, NO_PERSISTENCE).withDispatcher(Dispatchers.DefaultDispatcherId()), + actorFactory.generateActorId(LEADER_ID)); + + RaftActorServerConfigurationSupport support = new RaftActorServerConfigurationSupport(noLeaderActor.underlyingActor()); ReplicatedLogEntry serverConfigEntry = new MockRaftActorContext.MockReplicatedLogEntry(1, 1, new ServerConfigurationPayload(Collections.emptyList())); - boolean handled = support.handleMessage(new ApplyState(null, null, serverConfigEntry), null, ActorRef.noSender()); + boolean handled = support.handleMessage(new ApplyState(null, null, serverConfigEntry), ActorRef.noSender()); assertEquals("Message handled", true, handled); ReplicatedLogEntry nonServerConfigEntry = new MockRaftActorContext.MockReplicatedLogEntry(1, 1, new MockRaftActorContext.MockPayload("1")); - handled = support.handleMessage(new ApplyState(null, null, nonServerConfigEntry), null, ActorRef.noSender()); + handled = support.handleMessage(new ApplyState(null, null, nonServerConfigEntry), ActorRef.noSender()); assertEquals("Message handled", false, handled); }