X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-akka-raft%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fraft%2Fbehaviors%2FAbstractRaftActorBehavior.java;h=eed74bba82fbf8a4f8e4c1b5431887451c49d59d;hp=f7281bb8e3967c3ca2be583adb35210cbd07e87b;hb=59c4f0b68407ad243976fa7e64ba19d39147abbe;hpb=11e9ade9af527aba7faeb633d3c9c7552fd09d2d diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractRaftActorBehavior.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractRaftActorBehavior.java index f7281bb8e3..eed74bba82 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractRaftActorBehavior.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractRaftActorBehavior.java @@ -10,11 +10,12 @@ package org.opendaylight.controller.cluster.raft.behaviors; import akka.actor.ActorRef; import akka.actor.Cancellable; +import akka.event.LoggingAdapter; import org.opendaylight.controller.cluster.raft.ClientRequestTracker; import org.opendaylight.controller.cluster.raft.RaftActorContext; -import org.opendaylight.controller.cluster.raft.RaftState; import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry; import org.opendaylight.controller.cluster.raft.SerializationUtils; +import org.opendaylight.controller.cluster.raft.base.messages.ApplyLogEntries; import org.opendaylight.controller.cluster.raft.base.messages.ApplyState; import org.opendaylight.controller.cluster.raft.base.messages.ElectionTimeout; import org.opendaylight.controller.cluster.raft.messages.AppendEntries; @@ -44,25 +45,9 @@ public abstract class AbstractRaftActorBehavior implements RaftActorBehavior { protected final RaftActorContext context; /** - * The maximum election time variance - */ - private static final int ELECTION_TIME_MAX_VARIANCE = 100; - - /** - * The interval at which a heart beat message will be sent to the remote - * RaftActor - *

- * Since this is set to 100 milliseconds the Election timeout should be - * at least 200 milliseconds - */ - protected static final FiniteDuration HEART_BEAT_INTERVAL = - new FiniteDuration(100, TimeUnit.MILLISECONDS); - - /** - * The interval in which a new election would get triggered if no leader is found + * */ - private static final long ELECTION_TIME_INTERVAL = - HEART_BEAT_INTERVAL.toMillis() * 2; + protected final LoggingAdapter LOG; /** * @@ -77,6 +62,7 @@ public abstract class AbstractRaftActorBehavior implements RaftActorBehavior { protected AbstractRaftActorBehavior(RaftActorContext context) { this.context = context; + this.LOG = context.getLogger(); } /** @@ -91,7 +77,7 @@ public abstract class AbstractRaftActorBehavior implements RaftActorBehavior { * @param appendEntries The AppendEntries message * @return */ - protected abstract RaftState handleAppendEntries(ActorRef sender, + protected abstract RaftActorBehavior handleAppendEntries(ActorRef sender, AppendEntries appendEntries); @@ -103,19 +89,21 @@ public abstract class AbstractRaftActorBehavior implements RaftActorBehavior { * @param appendEntries * @return */ - protected RaftState appendEntries(ActorRef sender, + protected RaftActorBehavior appendEntries(ActorRef sender, AppendEntries appendEntries) { // 1. Reply false if term < currentTerm (§5.1) if (appendEntries.getTerm() < currentTerm()) { - context.getLogger().debug( - "Cannot append entries because sender term " + appendEntries - .getTerm() + " is less than " + currentTerm()); + if(LOG.isDebugEnabled()) { + LOG.debug("Cannot append entries because sender term {} is less than {}", + appendEntries.getTerm(), currentTerm()); + } + sender.tell( new AppendEntriesReply(context.getId(), currentTerm(), false, lastIndex(), lastTerm()), actor() ); - return state(); + return this; } @@ -134,7 +122,7 @@ public abstract class AbstractRaftActorBehavior implements RaftActorBehavior { * @param appendEntriesReply The AppendEntriesReply message * @return */ - protected abstract RaftState handleAppendEntriesReply(ActorRef sender, + protected abstract RaftActorBehavior handleAppendEntriesReply(ActorRef sender, AppendEntriesReply appendEntriesReply); /** @@ -145,9 +133,13 @@ public abstract class AbstractRaftActorBehavior implements RaftActorBehavior { * @param requestVote * @return */ - protected RaftState requestVote(ActorRef sender, + protected RaftActorBehavior requestVote(ActorRef sender, RequestVote requestVote) { + if(LOG.isDebugEnabled()) { + LOG.debug(requestVote.toString()); + } + boolean grantVote = false; // Reply false if term < currentTerm (§5.1) @@ -184,7 +176,7 @@ public abstract class AbstractRaftActorBehavior implements RaftActorBehavior { sender.tell(new RequestVoteReply(currentTerm(), grantVote), actor()); - return state(); + return this; } /** @@ -199,7 +191,7 @@ public abstract class AbstractRaftActorBehavior implements RaftActorBehavior { * @param requestVoteReply The RequestVoteReply message * @return */ - protected abstract RaftState handleRequestVoteReply(ActorRef sender, + protected abstract RaftActorBehavior handleRequestVoteReply(ActorRef sender, RequestVoteReply requestVoteReply); /** @@ -208,9 +200,9 @@ public abstract class AbstractRaftActorBehavior implements RaftActorBehavior { * @return */ protected FiniteDuration electionDuration() { - long variance = new Random().nextInt(ELECTION_TIME_MAX_VARIANCE); - return new FiniteDuration(ELECTION_TIME_INTERVAL + variance, - TimeUnit.MILLISECONDS); + long variance = new Random().nextInt(context.getConfigParams().getElectionTimeVariance()); + return context.getConfigParams().getElectionTimeOutInterval().$plus( + new FiniteDuration(variance, TimeUnit.MILLISECONDS)); } /** @@ -290,6 +282,17 @@ public abstract class AbstractRaftActorBehavior implements RaftActorBehavior { return null; } + /** + * Find the client request tracker for a specific logIndex + * + * @param logIndex + * @return + */ + protected ClientRequestTracker removeClientRequestTracker(long logIndex) { + return null; + } + + /** * Find the log index from the previous to last entry in the log * @@ -322,13 +325,14 @@ public abstract class AbstractRaftActorBehavior implements RaftActorBehavior { * * @param index a log index that is known to be committed */ - protected void applyLogToStateMachine(long index) { + protected void applyLogToStateMachine(final long index) { + long newLastApplied = context.getLastApplied(); // Now maybe we apply to the state machine for (long i = context.getLastApplied() + 1; i < index + 1; i++) { ActorRef clientActor = null; String identifier = null; - ClientRequestTracker tracker = findClientRequestTracker(i); + ClientRequestTracker tracker = removeClientRequestTracker(i); if (tracker != null) { clientActor = tracker.getClientActor(); @@ -338,16 +342,29 @@ public abstract class AbstractRaftActorBehavior implements RaftActorBehavior { context.getReplicatedLog().get(i); if (replicatedLogEntry != null) { + // Send a local message to the local RaftActor (it's derived class to be + // specific to apply the log to it's index) actor().tell(new ApplyState(clientActor, identifier, replicatedLogEntry), actor()); + newLastApplied = i; } else { - context.getLogger().error( - "Missing index " + i + " from log. Cannot apply state."); + //if one index is not present in the log, no point in looping + // around as the rest wont be present either + LOG.warning( + "Missing index {} from log. Cannot apply state. Ignoring {} to {}", i, i, index); + break; } } - // Send a local message to the local RaftActor (it's derived class to be - // specific to apply the log to it's index) - context.setLastApplied(index); + if(LOG.isDebugEnabled()) { + LOG.debug("Setting last applied to {}", newLastApplied); + } + context.setLastApplied(newLastApplied); + + // send a message to persist a ApplyLogEntries marker message into akka's persistent journal + // will be used during recovery + //in case if the above code throws an error and this message is not sent, it would be fine + // as the append entries received later would initiate add this message to the journal + actor().tell(new ApplyLogEntries((int) context.getLastApplied()), actor()); } protected Object fromSerializableMessage(Object serializable){ @@ -355,7 +372,7 @@ public abstract class AbstractRaftActorBehavior implements RaftActorBehavior { } @Override - public RaftState handleMessage(ActorRef sender, Object message) { + public RaftActorBehavior handleMessage(ActorRef sender, Object message) { if (message instanceof AppendEntries) { return appendEntries(sender, (AppendEntries) message); } else if (message instanceof AppendEntriesReply) { @@ -365,10 +382,21 @@ public abstract class AbstractRaftActorBehavior implements RaftActorBehavior { } else if (message instanceof RequestVoteReply) { return handleRequestVoteReply(sender, (RequestVoteReply) message); } - return state(); + return this; } @Override public String getLeaderId() { return leaderId; } + + protected RaftActorBehavior switchBehavior(RaftActorBehavior behavior) { + LOG.info("Switching from behavior {} to {}", this.state(), behavior.state()); + try { + close(); + } catch (Exception e) { + LOG.error(e, "Failed to close behavior : {}", this.state()); + } + + return behavior; + } }