import org.opendaylight.controller.cluster.raft.RaftState;
import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
import org.opendaylight.controller.cluster.raft.SerializationUtils;
+import org.opendaylight.controller.cluster.raft.base.messages.ApplyLogEntries;
import org.opendaylight.controller.cluster.raft.base.messages.ApplyState;
import org.opendaylight.controller.cluster.raft.base.messages.ElectionTimeout;
import org.opendaylight.controller.cluster.raft.messages.AppendEntries;
protected RaftState requestVote(ActorRef sender,
RequestVote requestVote) {
+
+ context.getLogger().debug(requestVote.toString());
+
boolean grantVote = false;
// Reply false if term < currentTerm (ยง5.1)
return null;
}
+ /**
+ * Find the client request tracker for a specific logIndex
+ *
+ * @param logIndex
+ * @return
+ */
+ protected ClientRequestTracker removeClientRequestTracker(long logIndex) {
+ return null;
+ }
+
+
/**
* Find the log index from the previous to last entry in the log
*
* @param index a log index that is known to be committed
*/
protected void applyLogToStateMachine(final long index) {
+ long newLastApplied = context.getLastApplied();
// Now maybe we apply to the state machine
for (long i = context.getLastApplied() + 1;
i < index + 1; i++) {
ActorRef clientActor = null;
String identifier = null;
- ClientRequestTracker tracker = findClientRequestTracker(i);
+ ClientRequestTracker tracker = removeClientRequestTracker(i);
if (tracker != null) {
clientActor = tracker.getClientActor();
context.getReplicatedLog().get(i);
if (replicatedLogEntry != null) {
+ // Send a local message to the local RaftActor (it's derived class to be
+ // specific to apply the log to it's index)
actor().tell(new ApplyState(clientActor, identifier,
replicatedLogEntry), actor());
+ newLastApplied = i;
} else {
- context.getLogger().error(
- "Missing index " + i + " from log. Cannot apply state.");
+ //if one index is not present in the log, no point in looping
+ // around as the rest wont be present either
+ context.getLogger().warning(
+ "Missing index {} from log. Cannot apply state. Ignoring {} to {}", i, i, index );
+ break;
}
}
- // Send a local message to the local RaftActor (it's derived class to be
- // specific to apply the log to it's index)
- context.getLogger().info("Setting last applied to {}", index);
- context.setLastApplied(index);
+ context.getLogger().debug("Setting last applied to {}", newLastApplied);
+ context.setLastApplied(newLastApplied);
+
+ // send a message to persist a ApplyLogEntries marker message into akka's persistent journal
+ // will be used during recovery
+ //in case if the above code throws an error and this message is not sent, it would be fine
+ // as the append entries received later would initiate add this message to the journal
+ actor().tell(new ApplyLogEntries((int) context.getLastApplied()), actor());
}
protected Object fromSerializableMessage(Object serializable){