import akka.actor.ActorRef;
import akka.actor.Cancellable;
+import com.google.common.base.Preconditions;
import java.util.Random;
import java.util.concurrent.TimeUnit;
import org.opendaylight.controller.cluster.raft.ClientRequestTracker;
* set currentTerm = T, convert to follower (ยง5.1)
*/
public abstract class AbstractRaftActorBehavior implements RaftActorBehavior {
-
- protected static final ElectionTimeout ELECTION_TIMEOUT = new ElectionTimeout();
-
/**
* Information about the RaftActor whose behavior this class represents
*/
*/
private Cancellable electionCancel = null;
- /**
- *
- */
- protected String leaderId = null;
-
- private short leaderPayloadVersion = -1;
-
private long replicatedToAllIndex = -1;
private final String logName;
private final RaftState state;
- protected AbstractRaftActorBehavior(RaftActorContext context, RaftState state) {
- this.context = context;
- this.state = state;
+ AbstractRaftActorBehavior(final RaftActorContext context, final RaftState state) {
+ this.context = Preconditions.checkNotNull(context);
+ this.state = Preconditions.checkNotNull(state);
this.LOG = context.getLogger();
logName = String.format("%s (%s)", context.getId(), state);
}
+ public static RaftActorBehavior createBehavior(final RaftActorContext context, final RaftState state) {
+ switch (state) {
+ case Candidate:
+ return new Candidate(context);
+ case Follower:
+ return new Follower(context);
+ case IsolatedLeader:
+ return new IsolatedLeader(context);
+ case Leader:
+ return new Leader(context);
+ default:
+ throw new IllegalArgumentException("Unhandled state " + state);
+ }
+ }
+
@Override
- public RaftState state() {
+ public final RaftState state() {
return state;
}
- public String logName() {
+ protected final String logName() {
return logName;
}
}
}
+ protected boolean canStartElection() {
+ return context.getRaftPolicy().automaticElectionsEnabled() && context.isVotingMember();
+ }
+
/**
* schedule a new election
*
protected void scheduleElection(FiniteDuration interval) {
stopElection();
- // Schedule an election. When the scheduler triggers an ElectionTimeout
- // message is sent to itself
- electionCancel =
- context.getActorSystem().scheduler().scheduleOnce(interval,
- context.getActor(), ELECTION_TIMEOUT,
- context.getActorSystem().dispatcher(), context.getActor());
+ if(canStartElection()) {
+ // Schedule an election. When the scheduler triggers an ElectionTimeout message is sent to itself
+ electionCancel = context.getActorSystem().scheduler().scheduleOnce(interval, context.getActor(),
+ ElectionTimeout.INSTANCE, context.getActorSystem().dispatcher(), context.getActor());
+ }
}
/**
return requestVote(sender, (RequestVote) message);
} else if (message instanceof RequestVoteReply) {
return handleRequestVoteReply(sender, (RequestVoteReply) message);
+ } else {
+ return null;
}
- return this;
- }
-
- @Override public String getLeaderId() {
- return leaderId;
}
@Override
- public short getLeaderPayloadVersion() {
- return leaderPayloadVersion;
+ public RaftActorBehavior switchBehavior(RaftActorBehavior behavior) {
+ return internalSwitchBehavior(behavior);
}
- public void setLeaderPayloadVersion(short leaderPayloadVersion) {
- this.leaderPayloadVersion = leaderPayloadVersion;
+ protected RaftActorBehavior internalSwitchBehavior(RaftState newState) {
+ if(context.getRaftPolicy().automaticElectionsEnabled()){
+ return internalSwitchBehavior(createBehavior(context, newState));
+ }
+ return this;
}
- protected RaftActorBehavior switchBehavior(RaftActorBehavior behavior) {
- LOG.info("{} :- Switching from behavior {} to {}", logName(), this.state(), behavior.state());
+ private RaftActorBehavior internalSwitchBehavior(RaftActorBehavior newBehavior) {
+ LOG.info("{} :- Switching from behavior {} to {}", logName(), this.state(), newBehavior.state());
try {
close();
} catch (Exception e) {
LOG.error("{}: Failed to close behavior : {}", logName(), this.state(), e);
}
-
- return behavior;
+ return newBehavior;
}
+
protected int getMajorityVoteCount(int numPeers) {
// Votes are required from a majority of the peers including self.
// The numMajority field therefore stores a calculated value
* @param snapshotCapturedIndex
*/
protected void performSnapshotWithoutCapture(final long snapshotCapturedIndex) {
- long actualIndex = context.getSnapshotManager().trimLog(snapshotCapturedIndex, this);
+ long actualIndex = context.getSnapshotManager().trimLog(snapshotCapturedIndex);
if(actualIndex != -1){
setReplicatedToAllIndex(actualIndex);
protected String getId(){
return context.getId();
}
-
}