import org.opendaylight.controller.cluster.raft.RaftActorContext;
import org.opendaylight.controller.cluster.raft.RaftState;
import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
-import org.opendaylight.controller.cluster.raft.SerializationUtils;
-import org.opendaylight.controller.cluster.raft.base.messages.ApplyJournalEntries;
import org.opendaylight.controller.cluster.raft.base.messages.ApplyState;
import org.opendaylight.controller.cluster.raft.base.messages.ElectionTimeout;
import org.opendaylight.controller.cluster.raft.messages.AppendEntries;
import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply;
import org.opendaylight.controller.cluster.raft.messages.RequestVote;
import org.opendaylight.controller.cluster.raft.messages.RequestVoteReply;
+import org.opendaylight.controller.cluster.raft.persisted.ApplyJournalEntries;
import org.slf4j.Logger;
import scala.concurrent.duration.FiniteDuration;
*/
private Cancellable electionCancel = null;
- /**
- *
- */
- protected String leaderId = null;
-
- private short leaderPayloadVersion = -1;
-
private long replicatedToAllIndex = -1;
private final String logName;
return new IsolatedLeader(context);
case Leader:
return new Leader(context);
+ case PreLeader:
+ return new PreLeader(context);
default:
throw new IllegalArgumentException("Unhandled state " + state);
}
return state;
}
- public String logName() {
+ protected final String logName() {
return logName;
}
protected void scheduleElection(FiniteDuration interval) {
stopElection();
- if(canStartElection()) {
- // Schedule an election. When the scheduler triggers an ElectionTimeout message is sent to itself
- electionCancel = context.getActorSystem().scheduler().scheduleOnce(interval, context.getActor(),
- ElectionTimeout.INSTANCE, context.getActorSystem().dispatcher(), context.getActor());
- }
+ // Schedule an election. When the scheduler triggers an ElectionTimeout message is sent to itself
+ electionCancel = context.getActorSystem().scheduler().scheduleOnce(interval, context.getActor(),
+ ElectionTimeout.INSTANCE, context.getActorSystem().dispatcher(), context.getActor());
}
/**
return context.getReplicatedLog().lastIndex();
}
- /**
- * @param logIndex
- * @return the client request tracker for the specified logIndex
- */
- protected ClientRequestTracker findClientRequestTracker(long logIndex) {
- return null;
- }
-
/**
* @param logIndex
* @return the client request tracker for the specified logIndex
return null;
}
-
/**
*
- * @return log index from the previous to last entry in the log
+ * @return the log entry index for the given index or -1 if not found
*/
- protected long prevLogIndex(long index){
- ReplicatedLogEntry prevEntry =
- context.getReplicatedLog().get(index - 1);
- if (prevEntry != null) {
- return prevEntry.getIndex();
+ protected long getLogEntryIndex(long index){
+ if(index == context.getReplicatedLog().getSnapshotIndex()){
+ return context.getReplicatedLog().getSnapshotIndex();
}
+
+ ReplicatedLogEntry entry = context.getReplicatedLog().get(index);
+ if(entry != null){
+ return entry.getIndex();
+ }
+
return -1;
}
/**
- * @return log term from the previous to last entry in the log
+ * @return the log entry term for the given index or -1 if not found
*/
- protected long prevLogTerm(long index){
- ReplicatedLogEntry prevEntry =
- context.getReplicatedLog().get(index - 1);
- if (prevEntry != null) {
- return prevEntry.getTerm();
+ protected long getLogEntryTerm(long index){
+ if(index == context.getReplicatedLog().getSnapshotIndex()){
+ return context.getReplicatedLog().getSnapshotTerm();
+ }
+
+ ReplicatedLogEntry entry = context.getReplicatedLog().get(index);
+ if(entry != null){
+ return entry.getTerm();
}
+
return -1;
}
protected void applyLogToStateMachine(final long index) {
long newLastApplied = context.getLastApplied();
// Now maybe we apply to the state machine
- for (long i = context.getLastApplied() + 1;
- i < index + 1; i++) {
- ActorRef clientActor = null;
- String identifier = null;
- ClientRequestTracker tracker = removeClientRequestTracker(i);
-
- if (tracker != null) {
- clientActor = tracker.getClientActor();
- identifier = tracker.getIdentifier();
- }
- ReplicatedLogEntry replicatedLogEntry =
- context.getReplicatedLog().get(i);
+ for (long i = context.getLastApplied() + 1; i < index + 1; i++) {
+ ReplicatedLogEntry replicatedLogEntry = context.getReplicatedLog().get(i);
if (replicatedLogEntry != null) {
// Send a local message to the local RaftActor (it's derived class to be
// specific to apply the log to it's index)
- actor().tell(new ApplyState(clientActor, identifier,
- replicatedLogEntry), actor());
+
+ final ApplyState msg;
+ final ClientRequestTracker tracker = removeClientRequestTracker(i);
+ if (tracker != null) {
+ msg = new ApplyState(tracker.getClientActor(), tracker.getIdentifier(), replicatedLogEntry);
+ } else {
+ msg = new ApplyState(null, null, replicatedLogEntry);
+ }
+
+ actor().tell(msg, actor());
newLastApplied = i;
} else {
//if one index is not present in the log, no point in looping
// around as the rest wont be present either
- LOG.warn(
- "{}: Missing index {} from log. Cannot apply state. Ignoring {} to {}",
+ LOG.warn("{}: Missing index {} from log. Cannot apply state. Ignoring {} to {}",
logName(), i, i, index);
break;
}
actor().tell(new ApplyJournalEntries(context.getLastApplied()), actor());
}
- protected Object fromSerializableMessage(Object serializable){
- return SerializationUtils.fromSerializable(serializable);
- }
-
@Override
public RaftActorBehavior handleMessage(ActorRef sender, Object message) {
if (message instanceof AppendEntries) {
return requestVote(sender, (RequestVote) message);
} else if (message instanceof RequestVoteReply) {
return handleRequestVoteReply(sender, (RequestVoteReply) message);
+ } else {
+ return null;
}
- return this;
- }
-
- @Override public String getLeaderId() {
- return leaderId;
- }
-
- @Override
- public short getLeaderPayloadVersion() {
- return leaderPayloadVersion;
- }
-
- public void setLeaderPayloadVersion(short leaderPayloadVersion) {
- this.leaderPayloadVersion = leaderPayloadVersion;
}
@Override
}
protected RaftActorBehavior internalSwitchBehavior(RaftState newState) {
- if(context.getRaftPolicy().automaticElectionsEnabled()){
- return internalSwitchBehavior(createBehavior(context, newState));
- }
- return this;
+ return internalSwitchBehavior(createBehavior(context, newState));
}
- private RaftActorBehavior internalSwitchBehavior(RaftActorBehavior newBehavior) {
+ protected RaftActorBehavior internalSwitchBehavior(RaftActorBehavior newBehavior) {
+ if(!context.getRaftPolicy().automaticElectionsEnabled()) {
+ return this;
+ }
+
LOG.info("{} :- Switching from behavior {} to {}", logName(), this.state(), newBehavior.state());
try {
close();