X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-akka-raft%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fraft%2Fbehaviors%2FAbstractRaftActorBehavior.java;h=b1560a5648b283e028ae0dc96e4267d92c6f438f;hb=34bc6ec632529a0dfe419aa7404bb42a456fbc96;hp=0a553b40fd59aab555f258f897a2154830afd1c8;hpb=eb887b1c2c8cd2768f8b4c2ed2b5054f97798466;p=controller.git diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractRaftActorBehavior.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractRaftActorBehavior.java index 0a553b40fd..b1560a5648 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractRaftActorBehavior.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractRaftActorBehavior.java @@ -15,6 +15,7 @@ import org.opendaylight.controller.cluster.raft.RaftActorContext; import org.opendaylight.controller.cluster.raft.RaftState; import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry; import org.opendaylight.controller.cluster.raft.SerializationUtils; +import org.opendaylight.controller.cluster.raft.base.messages.ApplyLogEntries; import org.opendaylight.controller.cluster.raft.base.messages.ApplyState; import org.opendaylight.controller.cluster.raft.base.messages.ElectionTimeout; import org.opendaylight.controller.cluster.raft.messages.AppendEntries; @@ -127,6 +128,9 @@ public abstract class AbstractRaftActorBehavior implements RaftActorBehavior { protected RaftState requestVote(ActorRef sender, RequestVote requestVote) { + + context.getLogger().debug(requestVote.toString()); + boolean grantVote = false; // Reply false if term < currentTerm (§5.1) @@ -269,6 +273,17 @@ public abstract class AbstractRaftActorBehavior implements RaftActorBehavior { return null; } + /** + * Find the client request tracker for a specific logIndex + * + * @param logIndex + * @return + */ + protected ClientRequestTracker removeClientRequestTracker(long logIndex) { + return null; + } + + /** * Find the log index from the previous to last entry in the log * @@ -301,13 +316,14 @@ public abstract class AbstractRaftActorBehavior implements RaftActorBehavior { * * @param index a log index that is known to be committed */ - protected void applyLogToStateMachine(long index) { + protected void applyLogToStateMachine(final long index) { + long newLastApplied = context.getLastApplied(); // Now maybe we apply to the state machine for (long i = context.getLastApplied() + 1; i < index + 1; i++) { ActorRef clientActor = null; String identifier = null; - ClientRequestTracker tracker = findClientRequestTracker(i); + ClientRequestTracker tracker = removeClientRequestTracker(i); if (tracker != null) { clientActor = tracker.getClientActor(); @@ -317,16 +333,27 @@ public abstract class AbstractRaftActorBehavior implements RaftActorBehavior { context.getReplicatedLog().get(i); if (replicatedLogEntry != null) { + // Send a local message to the local RaftActor (it's derived class to be + // specific to apply the log to it's index) actor().tell(new ApplyState(clientActor, identifier, replicatedLogEntry), actor()); + newLastApplied = i; } else { - context.getLogger().error( - "Missing index " + i + " from log. Cannot apply state."); + //if one index is not present in the log, no point in looping + // around as the rest wont be present either + context.getLogger().warning( + "Missing index {} from log. Cannot apply state. Ignoring {} to {}", i, i, index ); + break; } } - // Send a local message to the local RaftActor (it's derived class to be - // specific to apply the log to it's index) - context.setLastApplied(index); + context.getLogger().debug("Setting last applied to {}", newLastApplied); + context.setLastApplied(newLastApplied); + + // send a message to persist a ApplyLogEntries marker message into akka's persistent journal + // will be used during recovery + //in case if the above code throws an error and this message is not sent, it would be fine + // as the append entries received later would initiate add this message to the journal + actor().tell(new ApplyLogEntries((int) context.getLastApplied()), actor()); } protected Object fromSerializableMessage(Object serializable){