X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-akka-raft%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fraft%2FRaftActor.java;h=65c265c1947244d2de4711a4fa1c36cd6e873439;hp=ff464b0c9ca6629a3c754c017e56eb7cfda6bd2f;hb=55d229e2b83b1a48a43cc1e01c90f5be7d02faf0;hpb=c1d8d46591c54d610d8467e21b6cd48f7091bcf8 diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActor.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActor.java index ff464b0c9c..65c265c194 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActor.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActor.java @@ -14,15 +14,14 @@ import akka.actor.ActorSelection; import akka.actor.PoisonPill; import akka.japi.Procedure; import com.google.common.annotations.VisibleForTesting; -import com.google.common.base.Objects; import com.google.common.base.Optional; -import com.google.common.base.Supplier; import com.google.common.collect.Lists; import java.io.Serializable; import java.util.Collection; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Objects; import java.util.concurrent.TimeUnit; import javax.annotation.Nonnull; import org.apache.commons.lang3.time.DurationFormatUtils; @@ -115,8 +114,6 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor { private final BehaviorStateHolder reusableBehaviorStateHolder = new BehaviorStateHolder(); - private final SwitchBehaviorSupplier reusableSwitchBehaviorSupplier = new SwitchBehaviorSupplier(); - private RaftActorServerConfigurationSupport serverConfigurationSupport; private RaftActorLeadershipTransferCohort leadershipTransferInProgress; @@ -189,18 +186,19 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor { } @VisibleForTesting - protected void changeCurrentBehavior(RaftActorBehavior newBehavior){ - if(getCurrentBehavior() != null) { + protected void changeCurrentBehavior(RaftActorBehavior newBehavior) { + final RaftActorBehavior currentBehavior = getCurrentBehavior(); + if (currentBehavior != null) { try { - getCurrentBehavior().close(); - } catch(Exception e) { - LOG.warn("{}: Error closing behavior {}", persistence(), getCurrentBehavior(), e); + currentBehavior.close(); + } catch (Exception e) { + LOG.warn("{}: Error closing behavior {}", persistence(), currentBehavior, e); } } - reusableBehaviorStateHolder.init(getCurrentBehavior()); + reusableBehaviorStateHolder.init(currentBehavior); setCurrentBehavior(newBehavior); - handleBehaviorChange(reusableBehaviorStateHolder, getCurrentBehavior()); + handleBehaviorChange(reusableBehaviorStateHolder, newBehavior); } @Override @@ -266,7 +264,12 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor { } else if(message instanceof Runnable) { ((Runnable)message).run(); } else { - switchBehavior(reusableSwitchBehaviorSupplier.handleMessage(getSender(), message)); + final RaftActorBehavior currentBehavior = getCurrentBehavior(); + + // Processing the message may affect the state, hence we need to capture it + reusableBehaviorStateHolder.init(currentBehavior); + final RaftActorBehavior nextBehavior = currentBehavior.handleMessage(getSender(), message); + switchBehavior(nextBehavior); } } @@ -274,15 +277,15 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor { LOG.debug("{}: Initiating leader transfer", persistenceId()); if(leadershipTransferInProgress == null) { - leadershipTransferInProgress = new RaftActorLeadershipTransferCohort(this, getSender()); + leadershipTransferInProgress = new RaftActorLeadershipTransferCohort(this); leadershipTransferInProgress.addOnComplete(new RaftActorLeadershipTransferCohort.OnComplete() { @Override - public void onSuccess(ActorRef raftActorRef, ActorRef replyTo) { + public void onSuccess(ActorRef raftActorRef) { leadershipTransferInProgress = null; } @Override - public void onFailure(ActorRef raftActorRef, ActorRef replyTo) { + public void onFailure(ActorRef raftActorRef) { leadershipTransferInProgress = null; } }); @@ -314,13 +317,13 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor { if (context.hasFollowers()) { initiateLeadershipTransfer(new RaftActorLeadershipTransferCohort.OnComplete() { @Override - public void onSuccess(ActorRef raftActorRef, ActorRef replyTo) { + public void onSuccess(ActorRef raftActorRef) { LOG.debug("{}: leader transfer succeeded - sending PoisonPill", persistenceId()); raftActorRef.tell(PoisonPill.getInstance(), raftActorRef); } @Override - public void onFailure(ActorRef raftActorRef, ActorRef replyTo) { + public void onFailure(ActorRef raftActorRef) { LOG.debug("{}: leader transfer failed - sending PoisonPill", persistenceId()); raftActorRef.tell(PoisonPill.getInstance(), raftActorRef); } @@ -353,7 +356,10 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor { if(!getRaftActorContext().getRaftPolicy().automaticElectionsEnabled()) { RaftState newState = message.getNewState(); if( newState == RaftState.Leader || newState == RaftState.Follower) { - switchBehavior(reusableSwitchBehaviorSupplier.handleMessage(getSender(), message)); + reusableBehaviorStateHolder.init(getCurrentBehavior()); + final RaftActorBehavior nextBehavior = AbstractRaftActorBehavior.createBehavior(context, + message.getNewState()); + switchBehavior(nextBehavior); getRaftActorContext().getTermInformation().updateAndPersist(message.getNewTerm(), ""); } else { LOG.warn("Switching to behavior : {} - not supported", newState); @@ -361,12 +367,9 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor { } } - private void switchBehavior(Supplier supplier){ - reusableBehaviorStateHolder.init(getCurrentBehavior()); - - setCurrentBehavior(supplier.get()); - - handleBehaviorChange(reusableBehaviorStateHolder, getCurrentBehavior()); + private void switchBehavior(final RaftActorBehavior nextBehavior) { + setCurrentBehavior(nextBehavior); + handleBehaviorChange(reusableBehaviorStateHolder, nextBehavior); } @VisibleForTesting @@ -436,7 +439,7 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor { // it can happen that the state has not changed but the leader has changed. Optional roleChangeNotifier = getRoleChangeNotifier(); - if(!Objects.equal(lastValidLeaderId, currentBehavior.getLeaderId()) || + if(!Objects.equals(lastValidLeaderId, currentBehavior.getLeaderId()) || oldBehaviorState.getLeaderPayloadVersion() != currentBehavior.getLeaderPayloadVersion()) { if(roleChangeNotifier.isPresent()) { roleChangeNotifier.get().tell(newLeaderStateChanged(getId(), currentBehavior.getLeaderId(), @@ -605,13 +608,13 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor { LOG.debug("{}: RaftPolicy used with prev.config {}, RaftPolicy used with newConfig {}", persistenceId(), oldRaftPolicy, newRaftPolicy); context.setConfigParams(configParams); - if (!Objects.equal(oldRaftPolicy, newRaftPolicy)) { + if (!Objects.equals(oldRaftPolicy, newRaftPolicy)) { // The RaftPolicy was modified. If the current behavior is Follower then re-initialize to Follower // but transfer the previous leaderId so it doesn't immediately try to schedule an election. This // avoids potential disruption. Otherwise, switch to Follower normally. RaftActorBehavior behavior = getCurrentBehavior(); - if(behavior instanceof Follower) { - String previousLeaderId = ((Follower)behavior).getLeaderId(); + if (behavior != null && behavior.state() == RaftState.Follower) { + String previousLeaderId = behavior.getLeaderId(); short previousLeaderPayloadVersion = behavior.getLeaderPayloadVersion(); LOG.debug("{}: Re-initializing to Follower with previous leaderId {}", persistenceId(), previousLeaderId); @@ -786,13 +789,13 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor { if (isLeader()) { initiateLeadershipTransfer(new RaftActorLeadershipTransferCohort.OnComplete() { @Override - public void onSuccess(ActorRef raftActorRef, ActorRef replyTo) { + public void onSuccess(ActorRef raftActorRef) { LOG.debug("{}: leader transfer succeeded after change to non-voting", persistenceId()); ensureFollowerState(); } @Override - public void onFailure(ActorRef raftActorRef, ActorRef replyTo) { + public void onFailure(ActorRef raftActorRef) { LOG.debug("{}: leader transfer failed after change to non-voting", persistenceId()); ensureFollowerState(); } @@ -880,23 +883,4 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor { return leaderPayloadVersion; } } - - private class SwitchBehaviorSupplier implements Supplier { - private Object message; - private ActorRef sender; - - public SwitchBehaviorSupplier handleMessage(ActorRef sender, Object message){ - this.sender = sender; - this.message = message; - return this; - } - - @Override - public RaftActorBehavior get() { - if(this.message instanceof SwitchBehavior){ - return AbstractRaftActorBehavior.createBehavior(context, ((SwitchBehavior) message).getNewState()); - } - return getCurrentBehavior().handleMessage(sender, message); - } - } }