X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-akka-raft%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fraft%2Fbehaviors%2FAbstractRaftActorBehavior.java;h=45671ea31e4c804f9993df96e3d534ceaf6e4247;hp=b1560a5648b283e028ae0dc96e4267d92c6f438f;hb=2a31c2cacb9ad8f015a49708261ea93d256f0f60;hpb=b3e553ce5b3d3e972cbe19465ab7af2fcb39934c diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractRaftActorBehavior.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractRaftActorBehavior.java index b1560a5648..45671ea31e 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractRaftActorBehavior.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractRaftActorBehavior.java @@ -10,23 +10,23 @@ package org.opendaylight.controller.cluster.raft.behaviors; import akka.actor.ActorRef; import akka.actor.Cancellable; +import java.util.Random; +import java.util.concurrent.TimeUnit; import org.opendaylight.controller.cluster.raft.ClientRequestTracker; import org.opendaylight.controller.cluster.raft.RaftActorContext; import org.opendaylight.controller.cluster.raft.RaftState; import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry; import org.opendaylight.controller.cluster.raft.SerializationUtils; -import org.opendaylight.controller.cluster.raft.base.messages.ApplyLogEntries; +import org.opendaylight.controller.cluster.raft.base.messages.ApplyJournalEntries; import org.opendaylight.controller.cluster.raft.base.messages.ApplyState; import org.opendaylight.controller.cluster.raft.base.messages.ElectionTimeout; import org.opendaylight.controller.cluster.raft.messages.AppendEntries; import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply; import org.opendaylight.controller.cluster.raft.messages.RequestVote; import org.opendaylight.controller.cluster.raft.messages.RequestVoteReply; +import org.slf4j.Logger; import scala.concurrent.duration.FiniteDuration; -import java.util.Random; -import java.util.concurrent.TimeUnit; - /** * Abstract class that represents the behavior of a RaftActor *

@@ -39,11 +39,18 @@ import java.util.concurrent.TimeUnit; */ public abstract class AbstractRaftActorBehavior implements RaftActorBehavior { + protected static final ElectionTimeout ELECTION_TIMEOUT = new ElectionTimeout(); + /** * Information about the RaftActor whose behavior this class represents */ protected final RaftActorContext context; + /** + * + */ + protected final Logger LOG; + /** * */ @@ -54,9 +61,37 @@ public abstract class AbstractRaftActorBehavior implements RaftActorBehavior { */ protected String leaderId = null; + private long replicatedToAllIndex = -1; - protected AbstractRaftActorBehavior(RaftActorContext context) { + private final String logName; + + private final RaftState state; + + protected AbstractRaftActorBehavior(RaftActorContext context, RaftState state) { this.context = context; + this.state = state; + this.LOG = context.getLogger(); + + logName = String.format("%s (%s)", context.getId(), state); + } + + @Override + public RaftState state() { + return state; + } + + public String logName() { + return logName; + } + + @Override + public void setReplicatedToAllIndex(long replicatedToAllIndex) { + this.replicatedToAllIndex = replicatedToAllIndex; + } + + @Override + public long getReplicatedToAllIndex() { + return replicatedToAllIndex; } /** @@ -71,7 +106,7 @@ public abstract class AbstractRaftActorBehavior implements RaftActorBehavior { * @param appendEntries The AppendEntries message * @return */ - protected abstract RaftState handleAppendEntries(ActorRef sender, + protected abstract RaftActorBehavior handleAppendEntries(ActorRef sender, AppendEntries appendEntries); @@ -83,19 +118,21 @@ public abstract class AbstractRaftActorBehavior implements RaftActorBehavior { * @param appendEntries * @return */ - protected RaftState appendEntries(ActorRef sender, + protected RaftActorBehavior appendEntries(ActorRef sender, AppendEntries appendEntries) { // 1. Reply false if term < currentTerm (§5.1) if (appendEntries.getTerm() < currentTerm()) { - context.getLogger().debug( - "Cannot append entries because sender term " + appendEntries - .getTerm() + " is less than " + currentTerm()); + if(LOG.isDebugEnabled()) { + LOG.debug("{}: Cannot append entries because sender term {} is less than {}", + logName(), appendEntries.getTerm(), currentTerm()); + } + sender.tell( new AppendEntriesReply(context.getId(), currentTerm(), false, lastIndex(), lastTerm()), actor() ); - return state(); + return this; } @@ -114,7 +151,7 @@ public abstract class AbstractRaftActorBehavior implements RaftActorBehavior { * @param appendEntriesReply The AppendEntriesReply message * @return */ - protected abstract RaftState handleAppendEntriesReply(ActorRef sender, + protected abstract RaftActorBehavior handleAppendEntriesReply(ActorRef sender, AppendEntriesReply appendEntriesReply); /** @@ -125,11 +162,9 @@ public abstract class AbstractRaftActorBehavior implements RaftActorBehavior { * @param requestVote * @return */ - protected RaftState requestVote(ActorRef sender, - RequestVote requestVote) { - + protected RaftActorBehavior requestVote(ActorRef sender, RequestVote requestVote) { - context.getLogger().debug(requestVote.toString()); + LOG.debug("{}: In requestVote: {}", logName(), requestVote); boolean grantVote = false; @@ -165,9 +200,13 @@ public abstract class AbstractRaftActorBehavior implements RaftActorBehavior { } } - sender.tell(new RequestVoteReply(currentTerm(), grantVote), actor()); + RequestVoteReply reply = new RequestVoteReply(currentTerm(), grantVote); + + LOG.debug("{}: requestVote returning: {}", logName(), reply); + + sender.tell(reply, actor()); - return state(); + return this; } /** @@ -182,7 +221,7 @@ public abstract class AbstractRaftActorBehavior implements RaftActorBehavior { * @param requestVoteReply The RequestVoteReply message * @return */ - protected abstract RaftState handleRequestVoteReply(ActorRef sender, + protected abstract RaftActorBehavior handleRequestVoteReply(ActorRef sender, RequestVoteReply requestVoteReply); /** @@ -193,7 +232,7 @@ public abstract class AbstractRaftActorBehavior implements RaftActorBehavior { protected FiniteDuration electionDuration() { long variance = new Random().nextInt(context.getConfigParams().getElectionTimeVariance()); return context.getConfigParams().getElectionTimeOutInterval().$plus( - new FiniteDuration(variance, TimeUnit.MILLISECONDS)); + new FiniteDuration(variance, TimeUnit.MILLISECONDS)); } /** @@ -217,7 +256,7 @@ public abstract class AbstractRaftActorBehavior implements RaftActorBehavior { // message is sent to itself electionCancel = context.getActorSystem().scheduler().scheduleOnce(interval, - context.getActor(), new ElectionTimeout(), + context.getActor(), ELECTION_TIMEOUT, context.getActorSystem().dispatcher(), context.getActor()); } @@ -341,19 +380,22 @@ public abstract class AbstractRaftActorBehavior implements RaftActorBehavior { } else { //if one index is not present in the log, no point in looping // around as the rest wont be present either - context.getLogger().warning( - "Missing index {} from log. Cannot apply state. Ignoring {} to {}", i, i, index ); + LOG.warn( + "{}: Missing index {} from log. Cannot apply state. Ignoring {} to {}", + logName(), i, i, index); break; } } - context.getLogger().debug("Setting last applied to {}", newLastApplied); + if(LOG.isDebugEnabled()) { + LOG.debug("{}: Setting last applied to {}", logName(), newLastApplied); + } context.setLastApplied(newLastApplied); // send a message to persist a ApplyLogEntries marker message into akka's persistent journal // will be used during recovery //in case if the above code throws an error and this message is not sent, it would be fine // as the append entries received later would initiate add this message to the journal - actor().tell(new ApplyLogEntries((int) context.getLastApplied()), actor()); + actor().tell(new ApplyJournalEntries(context.getLastApplied()), actor()); } protected Object fromSerializableMessage(Object serializable){ @@ -361,7 +403,7 @@ public abstract class AbstractRaftActorBehavior implements RaftActorBehavior { } @Override - public RaftState handleMessage(ActorRef sender, Object message) { + public RaftActorBehavior handleMessage(ActorRef sender, Object message) { if (message instanceof AppendEntries) { return appendEntries(sender, (AppendEntries) message); } else if (message instanceof AppendEntriesReply) { @@ -371,10 +413,85 @@ public abstract class AbstractRaftActorBehavior implements RaftActorBehavior { } else if (message instanceof RequestVoteReply) { return handleRequestVoteReply(sender, (RequestVoteReply) message); } - return state(); + return this; } @Override public String getLeaderId() { return leaderId; } + + protected RaftActorBehavior switchBehavior(RaftActorBehavior behavior) { + LOG.info("{} :- Switching from behavior {} to {}", logName(), this.state(), behavior.state()); + try { + close(); + } catch (Exception e) { + LOG.error("{}: Failed to close behavior : {}", logName(), this.state(), e); + } + + return behavior; + } + + protected int getMajorityVoteCount(int numPeers) { + // Votes are required from a majority of the peers including self. + // The numMajority field therefore stores a calculated value + // of the number of votes required for this candidate to win an + // election based on it's known peers. + // If a peer was added during normal operation and raft replicas + // came to know about them then the new peer would also need to be + // taken into consideration when calculating this value. + // Here are some examples for what the numMajority would be for n + // peers + // 0 peers = 1 numMajority -: (0 + 1) / 2 + 1 = 1 + // 2 peers = 2 numMajority -: (2 + 1) / 2 + 1 = 2 + // 4 peers = 3 numMajority -: (4 + 1) / 2 + 1 = 3 + + int numMajority = 0; + if (numPeers > 0) { + int self = 1; + numMajority = (numPeers + self) / 2 + 1; + } + return numMajority; + + } + + + /** + * Performs a snapshot with no capture on the replicated log. + * It clears the log from the supplied index or last-applied-1 which ever is minimum. + * + * @param snapshotCapturedIndex + */ + protected void performSnapshotWithoutCapture(final long snapshotCapturedIndex) { + // we would want to keep the lastApplied as its used while capturing snapshots + long lastApplied = context.getLastApplied(); + long tempMin = Math.min(snapshotCapturedIndex, (lastApplied > -1 ? lastApplied - 1 : -1)); + + if(LOG.isTraceEnabled()) { + LOG.trace("{}: performSnapshotWithoutCapture: snapshotCapturedIndex: {}, lastApplied: {}, tempMin: {}", + logName, snapshotCapturedIndex, lastApplied, tempMin); + } + + if (tempMin > -1 && context.getReplicatedLog().isPresent(tempMin)) { + LOG.debug("{}: fakeSnapshot purging log to {} for term {}", logName(), tempMin, + context.getTermInformation().getCurrentTerm()); + + //use the term of the temp-min, since we check for isPresent, entry will not be null + ReplicatedLogEntry entry = context.getReplicatedLog().get(tempMin); + context.getReplicatedLog().snapshotPreCommit(tempMin, entry.getTerm()); + context.getReplicatedLog().snapshotCommit(); + setReplicatedToAllIndex(tempMin); + } else if(tempMin > getReplicatedToAllIndex()) { + // It's possible a follower was lagging and an install snapshot advanced its match index past + // the current replicatedToAllIndex. Since the follower is now caught up we should advance the + // replicatedToAllIndex (to tempMin). The fact that tempMin wasn't found in the log is likely + // due to a previous snapshot triggered by the memory threshold exceeded, in that case we + // trim the log to the last applied index even if previous entries weren't replicated to all followers. + setReplicatedToAllIndex(tempMin); + } + } + + protected String getId(){ + return context.getId(); + } + }