X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-akka-raft%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fraft%2FRaftActorServerConfigurationSupport.java;h=f969e3d67d46b53920e2b41b5fa3078fe5efd5f1;hp=2a2b50137c663ce5379ba59f973c7f3208454da9;hb=388dd012c7b36177808ff5c5ad692b16dd58c944;hpb=01be99539d7b19743a237b6e72d2d870491daf7a diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActorServerConfigurationSupport.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActorServerConfigurationSupport.java index 2a2b50137c..f969e3d67d 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActorServerConfigurationSupport.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActorServerConfigurationSupport.java @@ -39,41 +39,43 @@ class RaftActorServerConfigurationSupport { private final OperationState IDLE = new Idle(); + private final RaftActor raftActor; + private final RaftActorContext raftContext; private final Queue> pendingOperationsQueue = new LinkedList<>(); private OperationState currentOperationState = IDLE; - RaftActorServerConfigurationSupport(RaftActorContext context) { - this.raftContext = context; + RaftActorServerConfigurationSupport(RaftActor raftActor) { + this.raftActor = raftActor; + this.raftContext = raftActor.getRaftActorContext(); } - boolean handleMessage(Object message, RaftActor raftActor, ActorRef sender) { + boolean handleMessage(Object message, ActorRef sender) { if(message instanceof AddServer) { - onAddServer((AddServer) message, raftActor, sender); + onAddServer((AddServer) message, sender); return true; } else if(message instanceof RemoveServer) { - onRemoveServer((RemoveServer) message, raftActor, sender); + onRemoveServer((RemoveServer) message, sender); return true; } else if (message instanceof ServerOperationTimeout) { - currentOperationState.onServerOperationTimeout(raftActor, (ServerOperationTimeout) message); + currentOperationState.onServerOperationTimeout((ServerOperationTimeout) message); return true; } else if (message instanceof UnInitializedFollowerSnapshotReply) { - currentOperationState.onUnInitializedFollowerSnapshotReply(raftActor, - (UnInitializedFollowerSnapshotReply) message); + currentOperationState.onUnInitializedFollowerSnapshotReply((UnInitializedFollowerSnapshotReply) message); return true; } else if(message instanceof ApplyState) { - return onApplyState((ApplyState) message, raftActor); + return onApplyState((ApplyState) message); } else if(message instanceof SnapshotComplete) { - currentOperationState.onSnapshotComplete(raftActor); + currentOperationState.onSnapshotComplete(); return false; } else { return false; } } - private void onRemoveServer(RemoveServer removeServer, RaftActor raftActor, ActorRef sender) { + private void onRemoveServer(RemoveServer removeServer, ActorRef sender) { LOG.debug("{}: onRemoveServer: {}, state: {}", raftContext.getId(), removeServer, currentOperationState); if(removeServer.getServerId().equals(raftActor.getLeaderId())){ // Removing current leader is not supported yet @@ -83,14 +85,14 @@ class RaftActorServerConfigurationSupport { } else if(!raftContext.getPeerIds().contains(removeServer.getServerId())) { sender.tell(new RemoveServerReply(ServerChangeStatus.DOES_NOT_EXIST, raftActor.getLeaderId()), raftActor.getSelf()); } else { - onNewOperation(raftActor, new RemoveServerContext(removeServer, raftContext.getPeerAddress(removeServer.getServerId()), sender)); + onNewOperation(new RemoveServerContext(removeServer, raftContext.getPeerAddress(removeServer.getServerId()), sender)); } } - private boolean onApplyState(ApplyState applyState, RaftActor raftActor) { + private boolean onApplyState(ApplyState applyState) { Payload data = applyState.getReplicatedLogEntry().getData(); if(data instanceof ServerConfigurationPayload) { - currentOperationState.onApplyState(raftActor, applyState); + currentOperationState.onApplyState(applyState); return true; } @@ -118,15 +120,15 @@ class RaftActorServerConfigurationSupport { *
  • Respond to caller with TIMEOUT.
  • * */ - private void onAddServer(AddServer addServer, RaftActor raftActor, ActorRef sender) { + private void onAddServer(AddServer addServer, ActorRef sender) { LOG.debug("{}: onAddServer: {}, state: {}", raftContext.getId(), addServer, currentOperationState); - onNewOperation(raftActor, new AddServerContext(addServer, sender)); + onNewOperation(new AddServerContext(addServer, sender)); } - private void onNewOperation(RaftActor raftActor, ServerOperationContext operationContext) { + private void onNewOperation(ServerOperationContext operationContext) { if (raftActor.isLeader()) { - currentOperationState.onNewOperation(raftActor, operationContext); + currentOperationState.onNewOperation(operationContext); } else { ActorSelection leader = raftActor.getLeader(); if (leader != null) { @@ -144,22 +146,22 @@ class RaftActorServerConfigurationSupport { * Interface for a server operation FSM state. */ private interface OperationState { - void onNewOperation(RaftActor raftActor, ServerOperationContext operationContext); + void onNewOperation(ServerOperationContext operationContext); - void onServerOperationTimeout(RaftActor raftActor, ServerOperationTimeout timeout); + void onServerOperationTimeout(ServerOperationTimeout timeout); - void onUnInitializedFollowerSnapshotReply(RaftActor raftActor, UnInitializedFollowerSnapshotReply reply); + void onUnInitializedFollowerSnapshotReply(UnInitializedFollowerSnapshotReply reply); - void onApplyState(RaftActor raftActor, ApplyState applyState); + void onApplyState(ApplyState applyState); - void onSnapshotComplete(RaftActor raftActor); + void onSnapshotComplete(); } /** * Interface for the initial state for a server operation. */ private interface InitialOperationState { - void initiate(RaftActor raftActor); + void initiate(); } /** @@ -167,7 +169,7 @@ class RaftActorServerConfigurationSupport { */ private abstract class AbstractOperationState implements OperationState { @Override - public void onNewOperation(RaftActor raftActor, ServerOperationContext operationContext) { + public void onNewOperation(ServerOperationContext operationContext) { // We're currently processing another operation so queue it to be processed later. LOG.debug("{}: Server operation already in progress - queueing {}", raftContext.getId(), @@ -177,41 +179,39 @@ class RaftActorServerConfigurationSupport { } @Override - public void onServerOperationTimeout(RaftActor raftActor, ServerOperationTimeout timeout) { + public void onServerOperationTimeout(ServerOperationTimeout timeout) { LOG.debug("onServerOperationTimeout should not be called in state {}", this); } @Override - public void onUnInitializedFollowerSnapshotReply(RaftActor raftActor, UnInitializedFollowerSnapshotReply reply) { + public void onUnInitializedFollowerSnapshotReply(UnInitializedFollowerSnapshotReply reply) { LOG.debug("onUnInitializedFollowerSnapshotReply was called in state {}", this); } @Override - public void onApplyState(RaftActor raftActor, ApplyState applyState) { + public void onApplyState(ApplyState applyState) { LOG.debug("onApplyState was called in state {}", this); } @Override - public void onSnapshotComplete(RaftActor raftActor) { + public void onSnapshotComplete() { } - protected void persistNewServerConfiguration(RaftActor raftActor, ServerOperationContext operationContext){ + protected void persistNewServerConfiguration(ServerOperationContext operationContext){ raftContext.setDynamicServerConfigurationInUse(); ServerConfigurationPayload payload = raftContext.getPeerServerInfo(); LOG.debug("{}: New server configuration : {}", raftContext.getId(), payload.getServerConfig()); raftActor.persistData(operationContext.getClientRequestor(), operationContext.getContextId(), payload); - currentOperationState = new Persisting(operationContext, newTimer( - new ServerOperationTimeout(operationContext.getServerId()))); + currentOperationState = new Persisting(operationContext, newTimer(new ServerOperationTimeout(operationContext.getServerId()))); - sendReply(raftActor, operationContext, ServerChangeStatus.OK); + sendReply(operationContext, ServerChangeStatus.OK); } - protected void operationComplete(RaftActor raftActor, ServerOperationContext operationContext, - @Nullable ServerChangeStatus replyStatus) { + protected void operationComplete(ServerOperationContext operationContext, @Nullable ServerChangeStatus replyStatus) { if(replyStatus != null) { - sendReply(raftActor, operationContext, replyStatus); + sendReply(operationContext, replyStatus); } operationContext.operationComplete(raftActor, replyStatus); @@ -220,12 +220,11 @@ class RaftActorServerConfigurationSupport { ServerOperationContext nextOperation = pendingOperationsQueue.poll(); if(nextOperation != null) { - RaftActorServerConfigurationSupport.this.onNewOperation(raftActor, nextOperation); + RaftActorServerConfigurationSupport.this.onNewOperation(nextOperation); } } - protected void sendReply(RaftActor raftActor, ServerOperationContext operationContext, - ServerChangeStatus status) { + protected void sendReply(ServerOperationContext operationContext, ServerChangeStatus status) { LOG.debug("{}: Returning {} for operation {}", raftContext.getId(), status, operationContext.getOperation()); operationContext.getClientRequestor().tell(operationContext.newReply(status, raftActor.getLeaderId()), @@ -234,8 +233,8 @@ class RaftActorServerConfigurationSupport { Cancellable newTimer(Object message) { return raftContext.getActorSystem().scheduler().scheduleOnce( - raftContext.getConfigParams().getElectionTimeOutInterval().$times(2), raftContext.getActor(), - message, raftContext.getActorSystem().dispatcher(), raftContext.getActor()); + raftContext.getConfigParams().getElectionTimeOutInterval().$times(2), raftContext.getActor(), message, + raftContext.getActorSystem().dispatcher(), raftContext.getActor()); } @Override @@ -249,12 +248,12 @@ class RaftActorServerConfigurationSupport { */ private class Idle extends AbstractOperationState { @Override - public void onNewOperation(RaftActor raftActor, ServerOperationContext operationContext) { - operationContext.newInitialOperationState(RaftActorServerConfigurationSupport.this).initiate(raftActor); + public void onNewOperation(ServerOperationContext operationContext) { + operationContext.newInitialOperationState(RaftActorServerConfigurationSupport.this).initiate(); } @Override - public void onApplyState(RaftActor raftActor, ApplyState applyState) { + public void onApplyState(ApplyState applyState) { // Noop - we override b/c ApplyState is called normally for followers in the idle state. } } @@ -273,7 +272,7 @@ class RaftActorServerConfigurationSupport { } @Override - public void onApplyState(RaftActor raftActor, ApplyState applyState) { + public void onApplyState(ApplyState applyState) { // Sanity check - we could get an ApplyState from a previous operation that timed out so make // sure it's meant for us. if(operationContext.getContextId().equals(applyState.getIdentifier())) { @@ -281,12 +280,12 @@ class RaftActorServerConfigurationSupport { applyState.getReplicatedLogEntry().getData()); timer.cancel(); - operationComplete(raftActor, operationContext, null); + operationComplete(operationContext, null); } } @Override - public void onServerOperationTimeout(RaftActor raftActor, ServerOperationTimeout timeout) { + public void onServerOperationTimeout(ServerOperationTimeout timeout) { LOG.warn("{}: Timeout occured while replicating the new server configuration for {}", raftContext.getId(), timeout.getServerId()); @@ -295,17 +294,17 @@ class RaftActorServerConfigurationSupport { // Fail any pending operations ServerOperationContext nextOperation = pendingOperationsQueue.poll(); while(nextOperation != null) { - sendReply(raftActor, nextOperation, ServerChangeStatus.PRIOR_REQUEST_CONSENSUS_TIMEOUT); + sendReply(nextOperation, ServerChangeStatus.PRIOR_REQUEST_CONSENSUS_TIMEOUT); nextOperation = pendingOperationsQueue.poll(); } } @Override - public void onNewOperation(RaftActor raftActor, ServerOperationContext operationContext) { + public void onNewOperation(ServerOperationContext operationContext) { if(timedOut) { - sendReply(raftActor, operationContext, ServerChangeStatus.PRIOR_REQUEST_CONSENSUS_TIMEOUT); + sendReply(operationContext, ServerChangeStatus.PRIOR_REQUEST_CONSENSUS_TIMEOUT); } else { - super.onNewOperation(raftActor, operationContext); + super.onNewOperation(operationContext); } } } @@ -324,11 +323,11 @@ class RaftActorServerConfigurationSupport { return addServerContext; } - Cancellable newInstallSnapshotTimer(RaftActor raftActor) { + Cancellable newInstallSnapshotTimer() { return newTimer(new ServerOperationTimeout(addServerContext.getOperation().getNewServerId())); } - void handleInstallSnapshotTimeout(RaftActor raftActor, ServerOperationTimeout timeout) { + void handleInstallSnapshotTimeout(ServerOperationTimeout timeout) { String serverId = timeout.getServerId(); LOG.debug("{}: handleInstallSnapshotTimeout for new server {}", raftContext.getId(), serverId); @@ -342,8 +341,7 @@ class RaftActorServerConfigurationSupport { leader.removeFollower(serverId); } - operationComplete(raftActor, getAddServerContext(), - isLeader ? ServerChangeStatus.TIMEOUT : ServerChangeStatus.NO_LEADER); + operationComplete(getAddServerContext(), isLeader ? ServerChangeStatus.TIMEOUT : ServerChangeStatus.NO_LEADER); } } @@ -358,14 +356,14 @@ class RaftActorServerConfigurationSupport { } @Override - public void initiate(RaftActor raftActor) { + public void initiate() { AbstractLeader leader = (AbstractLeader) raftActor.getCurrentBehavior(); AddServer addServer = getAddServerContext().getOperation(); LOG.debug("{}: Initiating {}", raftContext.getId(), addServer); if(raftContext.getPeerInfo(addServer.getNewServerId()) != null) { - operationComplete(raftActor, getAddServerContext(), ServerChangeStatus.ALREADY_EXISTS); + operationComplete(getAddServerContext(), ServerChangeStatus.ALREADY_EXISTS); return; } @@ -377,7 +375,7 @@ class RaftActorServerConfigurationSupport { if(votingState == VotingState.VOTING_NOT_INITIALIZED){ // schedule the install snapshot timeout timer - Cancellable installSnapshotTimer = newInstallSnapshotTimer(raftActor); + Cancellable installSnapshotTimer = newInstallSnapshotTimer(); if(leader.initiateCaptureSnapshot(addServer.getNewServerId())) { LOG.debug("{}: Initiating capture snapshot for new server {}", raftContext.getId(), addServer.getNewServerId()); @@ -393,7 +391,7 @@ class RaftActorServerConfigurationSupport { LOG.debug("{}: New follower is non-voting - directly persisting new server configuration", raftContext.getId()); - persistNewServerConfiguration(raftActor, getAddServerContext()); + persistNewServerConfiguration(getAddServerContext()); } } } @@ -411,15 +409,15 @@ class RaftActorServerConfigurationSupport { } @Override - public void onServerOperationTimeout(RaftActor raftActor, ServerOperationTimeout timeout) { - handleInstallSnapshotTimeout(raftActor, timeout); + public void onServerOperationTimeout(ServerOperationTimeout timeout) { + handleInstallSnapshotTimeout(timeout); LOG.warn("{}: Timeout occured for new server {} while installing snapshot", raftContext.getId(), timeout.getServerId()); } @Override - public void onUnInitializedFollowerSnapshotReply(RaftActor raftActor, UnInitializedFollowerSnapshotReply reply) { + public void onUnInitializedFollowerSnapshotReply(UnInitializedFollowerSnapshotReply reply) { LOG.debug("{}: onUnInitializedFollowerSnapshotReply: {}", raftContext.getId(), reply); String followerId = reply.getFollowerId(); @@ -431,7 +429,7 @@ class RaftActorServerConfigurationSupport { raftContext.getPeerInfo(followerId).setVotingState(VotingState.VOTING); leader.updateMinReplicaCount(); - persistNewServerConfiguration(raftActor, getAddServerContext()); + persistNewServerConfiguration(getAddServerContext()); installSnapshotTimer.cancel(); } else { @@ -455,7 +453,7 @@ class RaftActorServerConfigurationSupport { } @Override - public void onSnapshotComplete(RaftActor raftActor) { + public void onSnapshotComplete() { LOG.debug("{}: onSnapshotComplete", raftContext.getId()); if(!raftActor.isLeader()) { @@ -469,15 +467,15 @@ class RaftActorServerConfigurationSupport { getAddServerContext().getOperation().getNewServerId()); currentOperationState = new InstallingSnapshot(getAddServerContext(), - newInstallSnapshotTimer(raftActor)); + newInstallSnapshotTimer()); snapshotTimer.cancel(); } } @Override - public void onServerOperationTimeout(RaftActor raftActor, ServerOperationTimeout timeout) { - handleInstallSnapshotTimeout(raftActor, timeout); + public void onServerOperationTimeout(ServerOperationTimeout timeout) { + handleInstallSnapshotTimeout(timeout); LOG.warn("{}: Timeout occured for new server {} while waiting for prior snapshot to complete", raftContext.getId(), timeout.getServerId()); @@ -571,9 +569,9 @@ class RaftActorServerConfigurationSupport { } @Override - public void initiate(RaftActor raftActor) { + public void initiate() { raftContext.removePeer(getRemoveServerContext().getOperation().getServerId()); - persistNewServerConfiguration(raftActor, getRemoveServerContext()); + persistNewServerConfiguration(getRemoveServerContext()); } }