X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-akka-raft%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fraft%2Fbehaviors%2FAbstractRaftActorBehavior.java;h=d17fba03d4ba58e77168602ffe388431b0a64a1a;hb=a615daaacf68376104e89e31b6e40f6636bb2a1e;hp=075b2873e45332364c09aee83c49e6b23e40780c;hpb=bd8beb1bfee9f421ad8f2d07b1424b21038234a2;p=controller.git diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractRaftActorBehavior.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractRaftActorBehavior.java index 075b2873e4..d17fba03d4 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractRaftActorBehavior.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractRaftActorBehavior.java @@ -14,9 +14,10 @@ import java.util.Random; import java.util.concurrent.TimeUnit; import org.opendaylight.controller.cluster.raft.ClientRequestTracker; import org.opendaylight.controller.cluster.raft.RaftActorContext; +import org.opendaylight.controller.cluster.raft.RaftState; import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry; import org.opendaylight.controller.cluster.raft.SerializationUtils; -import org.opendaylight.controller.cluster.raft.base.messages.ApplyLogEntries; +import org.opendaylight.controller.cluster.raft.base.messages.ApplyJournalEntries; import org.opendaylight.controller.cluster.raft.base.messages.ApplyState; import org.opendaylight.controller.cluster.raft.base.messages.ElectionTimeout; import org.opendaylight.controller.cluster.raft.messages.AppendEntries; @@ -38,6 +39,8 @@ import scala.concurrent.duration.FiniteDuration; */ public abstract class AbstractRaftActorBehavior implements RaftActorBehavior { + protected static final ElectionTimeout ELECTION_TIMEOUT = new ElectionTimeout(); + /** * Information about the RaftActor whose behavior this class represents */ @@ -58,10 +61,39 @@ public abstract class AbstractRaftActorBehavior implements RaftActorBehavior { */ protected String leaderId = null; + private short leaderPayloadVersion = -1; + + private long replicatedToAllIndex = -1; + + private final String logName; + + private final RaftState state; - protected AbstractRaftActorBehavior(RaftActorContext context) { + protected AbstractRaftActorBehavior(RaftActorContext context, RaftState state) { this.context = context; + this.state = state; this.LOG = context.getLogger(); + + logName = String.format("%s (%s)", context.getId(), state); + } + + @Override + public RaftState state() { + return state; + } + + public String logName() { + return logName; + } + + @Override + public void setReplicatedToAllIndex(long replicatedToAllIndex) { + this.replicatedToAllIndex = replicatedToAllIndex; + } + + @Override + public long getReplicatedToAllIndex() { + return replicatedToAllIndex; } /** @@ -74,7 +106,7 @@ public abstract class AbstractRaftActorBehavior implements RaftActorBehavior { * * @param sender The actor that sent this message * @param appendEntries The AppendEntries message - * @return + * @return a new behavior if it was changed or the current behavior */ protected abstract RaftActorBehavior handleAppendEntries(ActorRef sender, AppendEntries appendEntries); @@ -86,7 +118,7 @@ public abstract class AbstractRaftActorBehavior implements RaftActorBehavior { * * @param sender * @param appendEntries - * @return + * @return a new behavior if it was changed or the current behavior */ protected RaftActorBehavior appendEntries(ActorRef sender, AppendEntries appendEntries) { @@ -95,12 +127,12 @@ public abstract class AbstractRaftActorBehavior implements RaftActorBehavior { if (appendEntries.getTerm() < currentTerm()) { if(LOG.isDebugEnabled()) { LOG.debug("{}: Cannot append entries because sender term {} is less than {}", - context.getId(), appendEntries.getTerm(), currentTerm()); + logName(), appendEntries.getTerm(), currentTerm()); } sender.tell( new AppendEntriesReply(context.getId(), currentTerm(), false, - lastIndex(), lastTerm()), actor() + lastIndex(), lastTerm(), context.getPayloadVersion()), actor() ); return this; } @@ -119,7 +151,7 @@ public abstract class AbstractRaftActorBehavior implements RaftActorBehavior { * * @param sender The actor that sent this message * @param appendEntriesReply The AppendEntriesReply message - * @return + * @return a new behavior if it was changed or the current behavior */ protected abstract RaftActorBehavior handleAppendEntriesReply(ActorRef sender, AppendEntriesReply appendEntriesReply); @@ -130,15 +162,28 @@ public abstract class AbstractRaftActorBehavior implements RaftActorBehavior { * * @param sender * @param requestVote - * @return + * @return a new behavior if it was changed or the current behavior */ - protected RaftActorBehavior requestVote(ActorRef sender, - RequestVote requestVote) { + protected RaftActorBehavior requestVote(ActorRef sender, RequestVote requestVote) { - if(LOG.isDebugEnabled()) { - LOG.debug("{}: Received {}", context.getId(), requestVote); + LOG.debug("{}: In requestVote: {}", logName(), requestVote); + + boolean grantVote = canGrantVote(requestVote); + + if(grantVote) { + context.getTermInformation().updateAndPersist(requestVote.getTerm(), requestVote.getCandidateId()); } + RequestVoteReply reply = new RequestVoteReply(currentTerm(), grantVote); + + LOG.debug("{}: requestVote returning: {}", logName(), reply); + + sender.tell(reply, actor()); + + return this; + } + + protected boolean canGrantVote(RequestVote requestVote){ boolean grantVote = false; // Reply false if term < currentTerm (§5.1) @@ -148,7 +193,7 @@ public abstract class AbstractRaftActorBehavior implements RaftActorBehavior { // If votedFor is null or candidateId, and candidate’s log is at // least as up-to-date as receiver’s log, grant vote (§5.2, §5.4) } else if (votedFor() == null || votedFor() - .equals(requestVote.getCandidateId())) { + .equals(requestVote.getCandidateId())) { boolean candidateLatest = false; @@ -162,20 +207,15 @@ public abstract class AbstractRaftActorBehavior implements RaftActorBehavior { if (requestVote.getLastLogTerm() > lastTerm()) { candidateLatest = true; } else if ((requestVote.getLastLogTerm() == lastTerm()) - && requestVote.getLastLogIndex() >= lastIndex()) { + && requestVote.getLastLogIndex() >= lastIndex()) { candidateLatest = true; } if (candidateLatest) { grantVote = true; - context.getTermInformation().updateAndPersist(requestVote.getTerm(), - requestVote.getCandidateId()); } } - - sender.tell(new RequestVoteReply(currentTerm(), grantVote), actor()); - - return this; + return grantVote; } /** @@ -188,15 +228,14 @@ public abstract class AbstractRaftActorBehavior implements RaftActorBehavior { * * @param sender The actor that sent this message * @param requestVoteReply The RequestVoteReply message - * @return + * @return a new behavior if it was changed or the current behavior */ protected abstract RaftActorBehavior handleRequestVoteReply(ActorRef sender, RequestVoteReply requestVoteReply); /** - * Creates a random election duration * - * @return + * @return a random election duration */ protected FiniteDuration electionDuration() { long variance = new Random().nextInt(context.getConfigParams().getElectionTimeVariance()); @@ -213,79 +252,72 @@ public abstract class AbstractRaftActorBehavior implements RaftActorBehavior { } } + protected boolean canStartElection() { + return context.getRaftPolicy().automaticElectionsEnabled() && context.isVotingMember(); + } + /** * schedule a new election * - * @param interval + * @param interval the duration after which we should trigger a new election */ protected void scheduleElection(FiniteDuration interval) { stopElection(); - // Schedule an election. When the scheduler triggers an ElectionTimeout - // message is sent to itself - electionCancel = - context.getActorSystem().scheduler().scheduleOnce(interval, - context.getActor(), new ElectionTimeout(), - context.getActorSystem().dispatcher(), context.getActor()); + if(canStartElection()) { + // Schedule an election. When the scheduler triggers an ElectionTimeout message is sent to itself + electionCancel = context.getActorSystem().scheduler().scheduleOnce(interval, context.getActor(), + ELECTION_TIMEOUT,context.getActorSystem().dispatcher(), context.getActor()); + } } /** - * Get the current term - * @return + * @return the current term */ protected long currentTerm() { return context.getTermInformation().getCurrentTerm(); } /** - * Get the candidate for whom we voted in the current term - * @return + * @return the candidate for whom we voted in the current term */ protected String votedFor() { return context.getTermInformation().getVotedFor(); } /** - * Get the actor associated with this behavior - * @return + * @return the actor associated with this behavior */ protected ActorRef actor() { return context.getActor(); } /** - * Get the term from the last entry in the log * - * @return + * @return the term from the last entry in the log */ protected long lastTerm() { return context.getReplicatedLog().lastTerm(); } /** - * Get the index from the last entry in the log - * - * @return + * @return the index from the last entry in the log */ protected long lastIndex() { return context.getReplicatedLog().lastIndex(); } /** - * Find the client request tracker for a specific logIndex - * * @param logIndex - * @return + * @return the client request tracker for the specified logIndex */ protected ClientRequestTracker findClientRequestTracker(long logIndex) { return null; } /** - * Find the client request tracker for a specific logIndex - * * @param logIndex - * @return + * @return the client request tracker for the specified logIndex */ protected ClientRequestTracker removeClientRequestTracker(long logIndex) { return null; @@ -293,9 +325,8 @@ public abstract class AbstractRaftActorBehavior implements RaftActorBehavior { /** - * Find the log index from the previous to last entry in the log * - * @return + * @return log index from the previous to last entry in the log */ protected long prevLogIndex(long index){ ReplicatedLogEntry prevEntry = @@ -307,8 +338,7 @@ public abstract class AbstractRaftActorBehavior implements RaftActorBehavior { } /** - * Find the log term from the previous to last entry in the log - * @return + * @return log term from the previous to last entry in the log */ protected long prevLogTerm(long index){ ReplicatedLogEntry prevEntry = @@ -351,12 +381,12 @@ public abstract class AbstractRaftActorBehavior implements RaftActorBehavior { // around as the rest wont be present either LOG.warn( "{}: Missing index {} from log. Cannot apply state. Ignoring {} to {}", - context.getId(), i, i, index); + logName(), i, i, index); break; } } if(LOG.isDebugEnabled()) { - LOG.debug("{}: Setting last applied to {}", context.getId(), newLastApplied); + LOG.debug("{}: Setting last applied to {}", logName(), newLastApplied); } context.setLastApplied(newLastApplied); @@ -364,7 +394,7 @@ public abstract class AbstractRaftActorBehavior implements RaftActorBehavior { // will be used during recovery //in case if the above code throws an error and this message is not sent, it would be fine // as the append entries received later would initiate add this message to the journal - actor().tell(new ApplyLogEntries((int) context.getLastApplied()), actor()); + actor().tell(new ApplyJournalEntries(context.getLastApplied()), actor()); } protected Object fromSerializableMessage(Object serializable){ @@ -389,17 +419,38 @@ public abstract class AbstractRaftActorBehavior implements RaftActorBehavior { return leaderId; } - protected RaftActorBehavior switchBehavior(RaftActorBehavior behavior) { - LOG.info("{} :- Switching from behavior {} to {}", context.getId(), this.state(), behavior.state()); + @Override + public short getLeaderPayloadVersion() { + return leaderPayloadVersion; + } + + public void setLeaderPayloadVersion(short leaderPayloadVersion) { + this.leaderPayloadVersion = leaderPayloadVersion; + } + + @Override + public RaftActorBehavior switchBehavior(RaftActorBehavior behavior) { + return internalSwitchBehavior(behavior); + } + + protected RaftActorBehavior internalSwitchBehavior(RaftState newState) { + if(context.getRaftPolicy().automaticElectionsEnabled()){ + return internalSwitchBehavior(newState.createBehavior(context)); + } + return this; + } + + private RaftActorBehavior internalSwitchBehavior(RaftActorBehavior newBehavior) { + LOG.info("{} :- Switching from behavior {} to {}", logName(), this.state(), newBehavior.state()); try { close(); } catch (Exception e) { - LOG.error("{}: Failed to close behavior : {}", context.getId(), this.state(), e); + LOG.error("{}: Failed to close behavior : {}", logName(), this.state(), e); } - - return behavior; + return newBehavior; } + protected int getMajorityVoteCount(int numPeers) { // Votes are required from a majority of the peers including self. // The numMajority field therefore stores a calculated value @@ -423,17 +474,22 @@ public abstract class AbstractRaftActorBehavior implements RaftActorBehavior { } - protected long fakeSnapshot(final long minReplicatedToAllIndex, final long currentReplicatedIndex) { - // we would want to keep the lastApplied as its used while capturing snapshots - long tempMin = Math.min(minReplicatedToAllIndex, - (context.getLastApplied() > -1 ? context.getLastApplied() - 1 : -1)); + /** + * Performs a snapshot with no capture on the replicated log. + * It clears the log from the supplied index or last-applied-1 which ever is minimum. + * + * @param snapshotCapturedIndex + */ + protected void performSnapshotWithoutCapture(final long snapshotCapturedIndex) { + long actualIndex = context.getSnapshotManager().trimLog(snapshotCapturedIndex, this); - if (tempMin > -1 && context.getReplicatedLog().isPresent(tempMin)) { - context.getReplicatedLog().snapshotPreCommit(tempMin, context.getTermInformation().getCurrentTerm()); - context.getReplicatedLog().snapshotCommit(); - return tempMin; + if(actualIndex != -1){ + setReplicatedToAllIndex(actualIndex); } - return currentReplicatedIndex; + } + + protected String getId(){ + return context.getId(); } }