import akka.actor.PoisonPill;
import akka.japi.Procedure;
import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Objects;
import com.google.common.base.Optional;
-import com.google.common.base.Supplier;
import com.google.common.collect.Lists;
import java.io.Serializable;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.Objects;
import java.util.concurrent.TimeUnit;
import javax.annotation.Nonnull;
import org.apache.commons.lang3.time.DurationFormatUtils;
private final BehaviorStateHolder reusableBehaviorStateHolder = new BehaviorStateHolder();
- private final SwitchBehaviorSupplier reusableSwitchBehaviorSupplier = new SwitchBehaviorSupplier();
-
private RaftActorServerConfigurationSupport serverConfigurationSupport;
private RaftActorLeadershipTransferCohort leadershipTransferInProgress;
}
@VisibleForTesting
- protected void changeCurrentBehavior(RaftActorBehavior newBehavior){
- if(getCurrentBehavior() != null) {
+ protected void changeCurrentBehavior(RaftActorBehavior newBehavior) {
+ final RaftActorBehavior currentBehavior = getCurrentBehavior();
+ if (currentBehavior != null) {
try {
- getCurrentBehavior().close();
- } catch(Exception e) {
- LOG.warn("{}: Error closing behavior {}", persistence(), getCurrentBehavior(), e);
+ currentBehavior.close();
+ } catch (Exception e) {
+ LOG.warn("{}: Error closing behavior {}", persistence(), currentBehavior, e);
}
}
- reusableBehaviorStateHolder.init(getCurrentBehavior());
+ reusableBehaviorStateHolder.init(currentBehavior);
setCurrentBehavior(newBehavior);
- handleBehaviorChange(reusableBehaviorStateHolder, getCurrentBehavior());
+ handleBehaviorChange(reusableBehaviorStateHolder, newBehavior);
}
@Override
} else if(message instanceof Runnable) {
((Runnable)message).run();
} else {
- switchBehavior(reusableSwitchBehaviorSupplier.handleMessage(getSender(), message));
+ final RaftActorBehavior currentBehavior = getCurrentBehavior();
+
+ // Processing the message may affect the state, hence we need to capture it
+ reusableBehaviorStateHolder.init(currentBehavior);
+ final RaftActorBehavior nextBehavior = currentBehavior.handleMessage(getSender(), message);
+ switchBehavior(nextBehavior);
}
}
LOG.debug("{}: Initiating leader transfer", persistenceId());
if(leadershipTransferInProgress == null) {
- leadershipTransferInProgress = new RaftActorLeadershipTransferCohort(this, getSender());
+ leadershipTransferInProgress = new RaftActorLeadershipTransferCohort(this);
leadershipTransferInProgress.addOnComplete(new RaftActorLeadershipTransferCohort.OnComplete() {
@Override
- public void onSuccess(ActorRef raftActorRef, ActorRef replyTo) {
+ public void onSuccess(ActorRef raftActorRef) {
leadershipTransferInProgress = null;
}
@Override
- public void onFailure(ActorRef raftActorRef, ActorRef replyTo) {
+ public void onFailure(ActorRef raftActorRef) {
leadershipTransferInProgress = null;
}
});
if (context.hasFollowers()) {
initiateLeadershipTransfer(new RaftActorLeadershipTransferCohort.OnComplete() {
@Override
- public void onSuccess(ActorRef raftActorRef, ActorRef replyTo) {
+ public void onSuccess(ActorRef raftActorRef) {
LOG.debug("{}: leader transfer succeeded - sending PoisonPill", persistenceId());
raftActorRef.tell(PoisonPill.getInstance(), raftActorRef);
}
@Override
- public void onFailure(ActorRef raftActorRef, ActorRef replyTo) {
+ public void onFailure(ActorRef raftActorRef) {
LOG.debug("{}: leader transfer failed - sending PoisonPill", persistenceId());
raftActorRef.tell(PoisonPill.getInstance(), raftActorRef);
}
if(!getRaftActorContext().getRaftPolicy().automaticElectionsEnabled()) {
RaftState newState = message.getNewState();
if( newState == RaftState.Leader || newState == RaftState.Follower) {
- switchBehavior(reusableSwitchBehaviorSupplier.handleMessage(getSender(), message));
+ reusableBehaviorStateHolder.init(getCurrentBehavior());
+ final RaftActorBehavior nextBehavior = AbstractRaftActorBehavior.createBehavior(context,
+ message.getNewState());
+ switchBehavior(nextBehavior);
getRaftActorContext().getTermInformation().updateAndPersist(message.getNewTerm(), "");
} else {
LOG.warn("Switching to behavior : {} - not supported", newState);
}
}
- private void switchBehavior(Supplier<RaftActorBehavior> supplier){
- reusableBehaviorStateHolder.init(getCurrentBehavior());
-
- setCurrentBehavior(supplier.get());
-
- handleBehaviorChange(reusableBehaviorStateHolder, getCurrentBehavior());
+ private void switchBehavior(final RaftActorBehavior nextBehavior) {
+ setCurrentBehavior(nextBehavior);
+ handleBehaviorChange(reusableBehaviorStateHolder, nextBehavior);
}
@VisibleForTesting
// it can happen that the state has not changed but the leader has changed.
Optional<ActorRef> roleChangeNotifier = getRoleChangeNotifier();
- if(!Objects.equal(lastValidLeaderId, currentBehavior.getLeaderId()) ||
+ if(!Objects.equals(lastValidLeaderId, currentBehavior.getLeaderId()) ||
oldBehaviorState.getLeaderPayloadVersion() != currentBehavior.getLeaderPayloadVersion()) {
if(roleChangeNotifier.isPresent()) {
roleChangeNotifier.get().tell(newLeaderStateChanged(getId(), currentBehavior.getLeaderId(),
LOG.debug("{}: RaftPolicy used with prev.config {}, RaftPolicy used with newConfig {}", persistenceId(),
oldRaftPolicy, newRaftPolicy);
context.setConfigParams(configParams);
- if (!Objects.equal(oldRaftPolicy, newRaftPolicy)) {
+ if (!Objects.equals(oldRaftPolicy, newRaftPolicy)) {
// The RaftPolicy was modified. If the current behavior is Follower then re-initialize to Follower
// but transfer the previous leaderId so it doesn't immediately try to schedule an election. This
// avoids potential disruption. Otherwise, switch to Follower normally.
RaftActorBehavior behavior = getCurrentBehavior();
- if(behavior instanceof Follower) {
- String previousLeaderId = ((Follower)behavior).getLeaderId();
+ if (behavior != null && behavior.state() == RaftState.Follower) {
+ String previousLeaderId = behavior.getLeaderId();
short previousLeaderPayloadVersion = behavior.getLeaderPayloadVersion();
LOG.debug("{}: Re-initializing to Follower with previous leaderId {}", persistenceId(), previousLeaderId);
if (isLeader()) {
initiateLeadershipTransfer(new RaftActorLeadershipTransferCohort.OnComplete() {
@Override
- public void onSuccess(ActorRef raftActorRef, ActorRef replyTo) {
+ public void onSuccess(ActorRef raftActorRef) {
LOG.debug("{}: leader transfer succeeded after change to non-voting", persistenceId());
ensureFollowerState();
}
@Override
- public void onFailure(ActorRef raftActorRef, ActorRef replyTo) {
+ public void onFailure(ActorRef raftActorRef) {
LOG.debug("{}: leader transfer failed after change to non-voting", persistenceId());
ensureFollowerState();
}
return leaderPayloadVersion;
}
}
-
- private class SwitchBehaviorSupplier implements Supplier<RaftActorBehavior> {
- private Object message;
- private ActorRef sender;
-
- public SwitchBehaviorSupplier handleMessage(ActorRef sender, Object message){
- this.sender = sender;
- this.message = message;
- return this;
- }
-
- @Override
- public RaftActorBehavior get() {
- if(this.message instanceof SwitchBehavior){
- return AbstractRaftActorBehavior.createBehavior(context, ((SwitchBehavior) message).getNewState());
- }
- return getCurrentBehavior().handleMessage(sender, message);
- }
- }
}