X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-akka-raft%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fraft%2Fbehaviors%2FAbstractRaftActorBehavior.java;h=eed74bba82fbf8a4f8e4c1b5431887451c49d59d;hb=95115ca49f3b16b936e0f6c88aedfc17cd0ee92c;hp=d12789205e4540482e98b5d2ca9144218abd7154;hpb=53e9fc9521e5fc0d49e9b1cb8763f8508101984b;p=controller.git diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractRaftActorBehavior.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractRaftActorBehavior.java index d12789205e..c276d32cce 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractRaftActorBehavior.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractRaftActorBehavior.java @@ -10,22 +10,23 @@ package org.opendaylight.controller.cluster.raft.behaviors; import akka.actor.ActorRef; import akka.actor.Cancellable; +import java.util.Random; +import java.util.concurrent.TimeUnit; import org.opendaylight.controller.cluster.raft.ClientRequestTracker; import org.opendaylight.controller.cluster.raft.RaftActorContext; import org.opendaylight.controller.cluster.raft.RaftState; import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry; -import org.opendaylight.controller.cluster.raft.internal.messages.ApplyState; -import org.opendaylight.controller.cluster.raft.internal.messages.ElectionTimeout; +import org.opendaylight.controller.cluster.raft.SerializationUtils; +import org.opendaylight.controller.cluster.raft.base.messages.ApplyJournalEntries; +import org.opendaylight.controller.cluster.raft.base.messages.ApplyState; +import org.opendaylight.controller.cluster.raft.base.messages.ElectionTimeout; import org.opendaylight.controller.cluster.raft.messages.AppendEntries; import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply; -import org.opendaylight.controller.cluster.raft.messages.RaftRPC; import org.opendaylight.controller.cluster.raft.messages.RequestVote; import org.opendaylight.controller.cluster.raft.messages.RequestVoteReply; +import org.slf4j.Logger; import scala.concurrent.duration.FiniteDuration; -import java.util.Random; -import java.util.concurrent.TimeUnit; - /** * Abstract class that represents the behavior of a RaftActor *

@@ -38,31 +39,17 @@ import java.util.concurrent.TimeUnit; */ public abstract class AbstractRaftActorBehavior implements RaftActorBehavior { + protected static final ElectionTimeout ELECTION_TIMEOUT = new ElectionTimeout(); + /** * Information about the RaftActor whose behavior this class represents */ protected final RaftActorContext context; /** - * The maximum election time variance - */ - private static final int ELECTION_TIME_MAX_VARIANCE = 100; - - /** - * The interval at which a heart beat message will be sent to the remote - * RaftActor - *

- * Since this is set to 100 milliseconds the Election timeout should be - * at least 200 milliseconds - */ - protected static final FiniteDuration HEART_BEAT_INTERVAL = - new FiniteDuration(100, TimeUnit.MILLISECONDS); - - /** - * The interval in which a new election would get triggered if no leader is found + * */ - private static final long ELECTION_TIME_INTERVAL = - HEART_BEAT_INTERVAL.toMillis() * 2; + protected final Logger LOG; /** * @@ -74,9 +61,37 @@ public abstract class AbstractRaftActorBehavior implements RaftActorBehavior { */ protected String leaderId = null; + private long replicatedToAllIndex = -1; + + private final String logName; - protected AbstractRaftActorBehavior(RaftActorContext context) { + private final RaftState state; + + protected AbstractRaftActorBehavior(RaftActorContext context, RaftState state) { this.context = context; + this.state = state; + this.LOG = context.getLogger(); + + logName = String.format("%s (%s)", context.getId(), state); + } + + @Override + public RaftState state() { + return state; + } + + public String logName() { + return logName; + } + + @Override + public void setReplicatedToAllIndex(long replicatedToAllIndex) { + this.replicatedToAllIndex = replicatedToAllIndex; + } + + @Override + public long getReplicatedToAllIndex() { + return replicatedToAllIndex; } /** @@ -89,13 +104,10 @@ public abstract class AbstractRaftActorBehavior implements RaftActorBehavior { * * @param sender The actor that sent this message * @param appendEntries The AppendEntries message - * @param suggestedState The state that the RaftActor should be in based - * on the base class's processing of the AppendEntries - * message * @return */ - protected abstract RaftState handleAppendEntries(ActorRef sender, - AppendEntries appendEntries, RaftState suggestedState); + protected abstract RaftActorBehavior handleAppendEntries(ActorRef sender, + AppendEntries appendEntries); /** @@ -104,31 +116,27 @@ public abstract class AbstractRaftActorBehavior implements RaftActorBehavior { * * @param sender * @param appendEntries - * @param raftState * @return */ - protected RaftState appendEntries(ActorRef sender, - AppendEntries appendEntries, RaftState raftState) { - - if (raftState != state()) { - context.getLogger().debug("Suggested state is " + raftState - + " current behavior state is " + state()); - } + protected RaftActorBehavior appendEntries(ActorRef sender, + AppendEntries appendEntries) { // 1. Reply false if term < currentTerm (§5.1) if (appendEntries.getTerm() < currentTerm()) { - context.getLogger().debug( - "Cannot append entries because sender term " + appendEntries - .getTerm() + " is less than " + currentTerm()); + if(LOG.isDebugEnabled()) { + LOG.debug("{}: Cannot append entries because sender term {} is less than {}", + logName(), appendEntries.getTerm(), currentTerm()); + } + sender.tell( new AppendEntriesReply(context.getId(), currentTerm(), false, lastIndex(), lastTerm()), actor() ); - return state(); + return this; } - return handleAppendEntries(sender, appendEntries, raftState); + return handleAppendEntries(sender, appendEntries); } /** @@ -141,13 +149,10 @@ public abstract class AbstractRaftActorBehavior implements RaftActorBehavior { * * @param sender The actor that sent this message * @param appendEntriesReply The AppendEntriesReply message - * @param suggestedState The state that the RaftActor should be in based - * on the base class's processing of the - * AppendEntriesReply message * @return */ - protected abstract RaftState handleAppendEntriesReply(ActorRef sender, - AppendEntriesReply appendEntriesReply, RaftState suggestedState); + protected abstract RaftActorBehavior handleAppendEntriesReply(ActorRef sender, + AppendEntriesReply appendEntriesReply); /** * requestVote handles the RequestVote message. This logic is common @@ -155,11 +160,11 @@ public abstract class AbstractRaftActorBehavior implements RaftActorBehavior { * * @param sender * @param requestVote - * @param suggestedState * @return */ - protected RaftState requestVote(ActorRef sender, - RequestVote requestVote, RaftState suggestedState) { + protected RaftActorBehavior requestVote(ActorRef sender, RequestVote requestVote) { + + LOG.debug("{}: In requestVote: {}", logName(), requestVote); boolean grantVote = false; @@ -190,14 +195,18 @@ public abstract class AbstractRaftActorBehavior implements RaftActorBehavior { if (candidateLatest) { grantVote = true; - context.getTermInformation().update(requestVote.getTerm(), + context.getTermInformation().updateAndPersist(requestVote.getTerm(), requestVote.getCandidateId()); } } - sender.tell(new RequestVoteReply(currentTerm(), grantVote), actor()); + RequestVoteReply reply = new RequestVoteReply(currentTerm(), grantVote); - return suggestedState; + LOG.debug("{}: requestVote returning: {}", logName(), reply); + + sender.tell(reply, actor()); + + return this; } /** @@ -210,13 +219,10 @@ public abstract class AbstractRaftActorBehavior implements RaftActorBehavior { * * @param sender The actor that sent this message * @param requestVoteReply The RequestVoteReply message - * @param suggestedState The state that the RaftActor should be in based - * on the base class's processing of the RequestVote - * message * @return */ - protected abstract RaftState handleRequestVoteReply(ActorRef sender, - RequestVoteReply requestVoteReply, RaftState suggestedState); + protected abstract RaftActorBehavior handleRequestVoteReply(ActorRef sender, + RequestVoteReply requestVoteReply); /** * Creates a random election duration @@ -224,9 +230,9 @@ public abstract class AbstractRaftActorBehavior implements RaftActorBehavior { * @return */ protected FiniteDuration electionDuration() { - long variance = new Random().nextInt(ELECTION_TIME_MAX_VARIANCE); - return new FiniteDuration(ELECTION_TIME_INTERVAL + variance, - TimeUnit.MILLISECONDS); + long variance = new Random().nextInt(context.getConfigParams().getElectionTimeVariance()); + return context.getConfigParams().getElectionTimeOutInterval().$plus( + new FiniteDuration(variance, TimeUnit.MILLISECONDS)); } /** @@ -250,7 +256,7 @@ public abstract class AbstractRaftActorBehavior implements RaftActorBehavior { // message is sent to itself electionCancel = context.getActorSystem().scheduler().scheduleOnce(interval, - context.getActor(), new ElectionTimeout(), + context.getActor(), ELECTION_TIMEOUT, context.getActorSystem().dispatcher(), context.getActor()); } @@ -306,6 +312,17 @@ public abstract class AbstractRaftActorBehavior implements RaftActorBehavior { return null; } + /** + * Find the client request tracker for a specific logIndex + * + * @param logIndex + * @return + */ + protected ClientRequestTracker removeClientRequestTracker(long logIndex) { + return null; + } + + /** * Find the log index from the previous to last entry in the log * @@ -338,13 +355,14 @@ public abstract class AbstractRaftActorBehavior implements RaftActorBehavior { * * @param index a log index that is known to be committed */ - protected void applyLogToStateMachine(long index) { + protected void applyLogToStateMachine(final long index) { + long newLastApplied = context.getLastApplied(); // Now maybe we apply to the state machine for (long i = context.getLastApplied() + 1; i < index + 1; i++) { ActorRef clientActor = null; String identifier = null; - ClientRequestTracker tracker = findClientRequestTracker(i); + ClientRequestTracker tracker = removeClientRequestTracker(i); if (tracker != null) { clientActor = tracker.getClientActor(); @@ -354,55 +372,105 @@ public abstract class AbstractRaftActorBehavior implements RaftActorBehavior { context.getReplicatedLog().get(i); if (replicatedLogEntry != null) { + // Send a local message to the local RaftActor (it's derived class to be + // specific to apply the log to it's index) actor().tell(new ApplyState(clientActor, identifier, replicatedLogEntry), actor()); + newLastApplied = i; } else { - context.getLogger().error( - "Missing index " + i + " from log. Cannot apply state."); + //if one index is not present in the log, no point in looping + // around as the rest wont be present either + LOG.warn( + "{}: Missing index {} from log. Cannot apply state. Ignoring {} to {}", + logName(), i, i, index); + break; } } - // Send a local message to the local RaftActor (it's derived class to be - // specific to apply the log to it's index) - context.setLastApplied(index); + if(LOG.isDebugEnabled()) { + LOG.debug("{}: Setting last applied to {}", logName(), newLastApplied); + } + context.setLastApplied(newLastApplied); + + // send a message to persist a ApplyLogEntries marker message into akka's persistent journal + // will be used during recovery + //in case if the above code throws an error and this message is not sent, it would be fine + // as the append entries received later would initiate add this message to the journal + actor().tell(new ApplyJournalEntries(context.getLastApplied()), actor()); + } + + protected Object fromSerializableMessage(Object serializable){ + return SerializationUtils.fromSerializable(serializable); } @Override - public RaftState handleMessage(ActorRef sender, Object message) { - RaftState raftState = state(); - if (message instanceof RaftRPC) { - raftState = applyTerm((RaftRPC) message); - } + public RaftActorBehavior handleMessage(ActorRef sender, Object message) { if (message instanceof AppendEntries) { - raftState = appendEntries(sender, (AppendEntries) message, - raftState); + return appendEntries(sender, (AppendEntries) message); } else if (message instanceof AppendEntriesReply) { - raftState = - handleAppendEntriesReply(sender, (AppendEntriesReply) message, - raftState); + return handleAppendEntriesReply(sender, (AppendEntriesReply) message); } else if (message instanceof RequestVote) { - raftState = - requestVote(sender, (RequestVote) message, raftState); + return requestVote(sender, (RequestVote) message); } else if (message instanceof RequestVoteReply) { - raftState = - handleRequestVoteReply(sender, (RequestVoteReply) message, - raftState); + return handleRequestVoteReply(sender, (RequestVoteReply) message); } - return raftState; + return this; } @Override public String getLeaderId() { return leaderId; } - private RaftState applyTerm(RaftRPC rpc) { - // If RPC request or response contains term T > currentTerm: - // set currentTerm = T, convert to follower (§5.1) - // This applies to all RPC messages and responses - if (rpc.getTerm() > context.getTermInformation().getCurrentTerm()) { - context.getTermInformation().update(rpc.getTerm(), null); - return RaftState.Follower; + protected RaftActorBehavior switchBehavior(RaftActorBehavior behavior) { + LOG.info("{} :- Switching from behavior {} to {}", logName(), this.state(), behavior.state()); + try { + close(); + } catch (Exception e) { + LOG.error("{}: Failed to close behavior : {}", logName(), this.state(), e); + } + + return behavior; + } + + protected int getMajorityVoteCount(int numPeers) { + // Votes are required from a majority of the peers including self. + // The numMajority field therefore stores a calculated value + // of the number of votes required for this candidate to win an + // election based on it's known peers. + // If a peer was added during normal operation and raft replicas + // came to know about them then the new peer would also need to be + // taken into consideration when calculating this value. + // Here are some examples for what the numMajority would be for n + // peers + // 0 peers = 1 numMajority -: (0 + 1) / 2 + 1 = 1 + // 2 peers = 2 numMajority -: (2 + 1) / 2 + 1 = 2 + // 4 peers = 3 numMajority -: (4 + 1) / 2 + 1 = 3 + + int numMajority = 0; + if (numPeers > 0) { + int self = 1; + numMajority = (numPeers + self) / 2 + 1; + } + return numMajority; + + } + + + /** + * Performs a snapshot with no capture on the replicated log. + * It clears the log from the supplied index or last-applied-1 which ever is minimum. + * + * @param snapshotCapturedIndex + */ + protected void performSnapshotWithoutCapture(final long snapshotCapturedIndex) { + long actualIndex = context.getSnapshotManager().trimLog(snapshotCapturedIndex, this); + + if(actualIndex != -1){ + setReplicatedToAllIndex(actualIndex); } - return state(); + } + + protected String getId(){ + return context.getId(); } }