+ /**
+ * Find the client request tracker for a specific logIndex
+ *
+ * @param logIndex
+ * @return
+ */
+ protected ClientRequestTracker findClientRequestTracker(long logIndex) {
+ return null;
+ }
+
+ /**
+ * Find the client request tracker for a specific logIndex
+ *
+ * @param logIndex
+ * @return
+ */
+ protected ClientRequestTracker removeClientRequestTracker(long logIndex) {
+ return null;
+ }
+
+
+ /**
+ * Find the log index from the previous to last entry in the log
+ *
+ * @return
+ */
+ protected long prevLogIndex(long index){
+ ReplicatedLogEntry prevEntry =
+ context.getReplicatedLog().get(index - 1);
+ if (prevEntry != null) {
+ return prevEntry.getIndex();
+ }
+ return -1;
+ }
+
+ /**
+ * Find the log term from the previous to last entry in the log
+ * @return
+ */
+ protected long prevLogTerm(long index){
+ ReplicatedLogEntry prevEntry =
+ context.getReplicatedLog().get(index - 1);
+ if (prevEntry != null) {
+ return prevEntry.getTerm();
+ }
+ return -1;
+ }
+
+ /**
+ * Apply the provided index to the state machine
+ *
+ * @param index a log index that is known to be committed
+ */
+ protected void applyLogToStateMachine(final long index) {
+ long newLastApplied = context.getLastApplied();
+ // Now maybe we apply to the state machine
+ for (long i = context.getLastApplied() + 1;
+ i < index + 1; i++) {
+ ActorRef clientActor = null;
+ String identifier = null;
+ ClientRequestTracker tracker = removeClientRequestTracker(i);
+
+ if (tracker != null) {
+ clientActor = tracker.getClientActor();
+ identifier = tracker.getIdentifier();
+ }
+ ReplicatedLogEntry replicatedLogEntry =
+ context.getReplicatedLog().get(i);
+
+ if (replicatedLogEntry != null) {
+ // Send a local message to the local RaftActor (it's derived class to be
+ // specific to apply the log to it's index)
+ actor().tell(new ApplyState(clientActor, identifier,
+ replicatedLogEntry), actor());
+ newLastApplied = i;
+ } else {
+ //if one index is not present in the log, no point in looping
+ // around as the rest wont be present either
+ context.getLogger().warning(
+ "Missing index {} from log. Cannot apply state. Ignoring {} to {}", i, i, index );
+ break;
+ }
+ }
+ context.getLogger().debug("Setting last applied to {}", newLastApplied);
+ context.setLastApplied(newLastApplied);
+
+ // send a message to persist a ApplyLogEntries marker message into akka's persistent journal
+ // will be used during recovery
+ //in case if the above code throws an error and this message is not sent, it would be fine
+ // as the append entries received later would initiate add this message to the journal
+ actor().tell(new ApplyLogEntries((int) context.getLastApplied()), actor());
+ }
+
+ protected Object fromSerializableMessage(Object serializable){
+ return SerializationUtils.fromSerializable(serializable);
+ }