Merge "Bug 1025: Fixed incorrect revision in sal-remote-augment, which caused log...
[controller.git] / opendaylight / md-sal / sal-akka-raft / src / main / java / org / opendaylight / controller / cluster / raft / behaviors / AbstractRaftActorBehavior.java
index f7281bb8e3967c3ca2be583adb35210cbd07e87b..b1560a5648b283e028ae0dc96e4267d92c6f438f 100644 (file)
@@ -15,6 +15,7 @@ import org.opendaylight.controller.cluster.raft.RaftActorContext;
 import org.opendaylight.controller.cluster.raft.RaftState;
 import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
 import org.opendaylight.controller.cluster.raft.SerializationUtils;
+import org.opendaylight.controller.cluster.raft.base.messages.ApplyLogEntries;
 import org.opendaylight.controller.cluster.raft.base.messages.ApplyState;
 import org.opendaylight.controller.cluster.raft.base.messages.ElectionTimeout;
 import org.opendaylight.controller.cluster.raft.messages.AppendEntries;
@@ -43,27 +44,6 @@ public abstract class AbstractRaftActorBehavior implements RaftActorBehavior {
      */
     protected final RaftActorContext context;
 
-    /**
-     * The maximum election time variance
-     */
-    private static final int ELECTION_TIME_MAX_VARIANCE = 100;
-
-    /**
-     * The interval at which a heart beat message will be sent to the remote
-     * RaftActor
-     * <p/>
-     * Since this is set to 100 milliseconds the Election timeout should be
-     * at least 200 milliseconds
-     */
-    protected static final FiniteDuration HEART_BEAT_INTERVAL =
-        new FiniteDuration(100, TimeUnit.MILLISECONDS);
-
-    /**
-     * The interval in which a new election would get triggered if no leader is found
-     */
-    private static final long ELECTION_TIME_INTERVAL =
-        HEART_BEAT_INTERVAL.toMillis() * 2;
-
     /**
      *
      */
@@ -148,6 +128,9 @@ public abstract class AbstractRaftActorBehavior implements RaftActorBehavior {
     protected RaftState requestVote(ActorRef sender,
         RequestVote requestVote) {
 
+
+        context.getLogger().debug(requestVote.toString());
+
         boolean grantVote = false;
 
         //  Reply false if term < currentTerm (ยง5.1)
@@ -208,9 +191,9 @@ public abstract class AbstractRaftActorBehavior implements RaftActorBehavior {
      * @return
      */
     protected FiniteDuration electionDuration() {
-        long variance = new Random().nextInt(ELECTION_TIME_MAX_VARIANCE);
-        return new FiniteDuration(ELECTION_TIME_INTERVAL + variance,
-            TimeUnit.MILLISECONDS);
+        long variance = new Random().nextInt(context.getConfigParams().getElectionTimeVariance());
+        return context.getConfigParams().getElectionTimeOutInterval().$plus(
+            new FiniteDuration(variance, TimeUnit.MILLISECONDS));
     }
 
     /**
@@ -290,6 +273,17 @@ public abstract class AbstractRaftActorBehavior implements RaftActorBehavior {
         return null;
     }
 
+    /**
+     * Find the client request tracker for a specific logIndex
+     *
+     * @param logIndex
+     * @return
+     */
+    protected ClientRequestTracker removeClientRequestTracker(long logIndex) {
+        return null;
+    }
+
+
     /**
      * Find the log index from the previous to last entry in the log
      *
@@ -322,13 +316,14 @@ public abstract class AbstractRaftActorBehavior implements RaftActorBehavior {
      *
      * @param index a log index that is known to be committed
      */
-    protected void applyLogToStateMachine(long index) {
+    protected void applyLogToStateMachine(final long index) {
+        long newLastApplied = context.getLastApplied();
         // Now maybe we apply to the state machine
         for (long i = context.getLastApplied() + 1;
              i < index + 1; i++) {
             ActorRef clientActor = null;
             String identifier = null;
-            ClientRequestTracker tracker = findClientRequestTracker(i);
+            ClientRequestTracker tracker = removeClientRequestTracker(i);
 
             if (tracker != null) {
                 clientActor = tracker.getClientActor();
@@ -338,16 +333,27 @@ public abstract class AbstractRaftActorBehavior implements RaftActorBehavior {
                 context.getReplicatedLog().get(i);
 
             if (replicatedLogEntry != null) {
+                // Send a local message to the local RaftActor (it's derived class to be
+                // specific to apply the log to it's index)
                 actor().tell(new ApplyState(clientActor, identifier,
                     replicatedLogEntry), actor());
+                newLastApplied = i;
             } else {
-                context.getLogger().error(
-                    "Missing index " + i + " from log. Cannot apply state.");
+                //if one index is not present in the log, no point in looping
+                // around as the rest wont be present either
+                context.getLogger().warning(
+                    "Missing index {} from log. Cannot apply state. Ignoring {} to {}", i, i, index );
+                break;
             }
         }
-        // Send a local message to the local RaftActor (it's derived class to be
-        // specific to apply the log to it's index)
-        context.setLastApplied(index);
+        context.getLogger().debug("Setting last applied to {}", newLastApplied);
+        context.setLastApplied(newLastApplied);
+
+        // send a message to persist a ApplyLogEntries marker message into akka's persistent journal
+        // will be used during recovery
+        //in case if the above code throws an error and this message is not sent, it would be fine
+        // as the  append entries received later would initiate add this message to the journal
+        actor().tell(new ApplyLogEntries((int) context.getLastApplied()), actor());
     }
 
     protected Object fromSerializableMessage(Object serializable){