+ return -1;
+ }
+
+ /**
+ * Find the log term from the previous to last entry in the log
+ * @return
+ */
+ protected long prevLogTerm(long index){
+ ReplicatedLogEntry prevEntry =
+ context.getReplicatedLog().get(index - 1);
+ if (prevEntry != null) {
+ return prevEntry.getTerm();
+ }
+ return -1;
+ }
+
+ /**
+ * Apply the provided index to the state machine
+ *
+ * @param index a log index that is known to be committed
+ */
+ protected void applyLogToStateMachine(final long index) {
+ long newLastApplied = context.getLastApplied();
+ // Now maybe we apply to the state machine
+ for (long i = context.getLastApplied() + 1;
+ i < index + 1; i++) {
+ ActorRef clientActor = null;
+ String identifier = null;
+ ClientRequestTracker tracker = removeClientRequestTracker(i);
+
+ if (tracker != null) {
+ clientActor = tracker.getClientActor();
+ identifier = tracker.getIdentifier();
+ }
+ ReplicatedLogEntry replicatedLogEntry =
+ context.getReplicatedLog().get(i);
+
+ if (replicatedLogEntry != null) {
+ // Send a local message to the local RaftActor (it's derived class to be
+ // specific to apply the log to it's index)
+ actor().tell(new ApplyState(clientActor, identifier,
+ replicatedLogEntry), actor());
+ newLastApplied = i;
+ } else {
+ //if one index is not present in the log, no point in looping
+ // around as the rest wont be present either
+ LOG.warn(
+ "{}: Missing index {} from log. Cannot apply state. Ignoring {} to {}",
+ logName(), i, i, index);
+ break;
+ }
+ }
+ if(LOG.isDebugEnabled()) {
+ LOG.debug("{}: Setting last applied to {}", logName(), newLastApplied);
+ }
+ context.setLastApplied(newLastApplied);
+
+ // send a message to persist a ApplyLogEntries marker message into akka's persistent journal
+ // will be used during recovery
+ //in case if the above code throws an error and this message is not sent, it would be fine
+ // as the append entries received later would initiate add this message to the journal
+ actor().tell(new ApplyJournalEntries(context.getLastApplied()), actor());
+ }
+
+ protected Object fromSerializableMessage(Object serializable){
+ return SerializationUtils.fromSerializable(serializable);
+ }
+
+ @Override
+ public RaftActorBehavior handleMessage(ActorRef sender, Object message) {