X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-akka-raft%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fraft%2Fbehaviors%2FAbstractRaftActorBehavior.java;h=0b0b4c7cd642480f92dd600a4f8f10be07977dc4;hp=f235221da940d4cb21c491e78b9624c2afa18a80;hb=bfd413d87f82ee3ffed67a141a980805950a0f06;hpb=0424501920e4d1c7691116deaaa8d26b213ab579 diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractRaftActorBehavior.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractRaftActorBehavior.java index f235221da9..0b0b4c7cd6 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractRaftActorBehavior.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractRaftActorBehavior.java @@ -10,9 +10,11 @@ package org.opendaylight.controller.cluster.raft.behaviors; import akka.actor.ActorRef; import akka.actor.Cancellable; -import akka.event.LoggingAdapter; +import java.util.Random; +import java.util.concurrent.TimeUnit; import org.opendaylight.controller.cluster.raft.ClientRequestTracker; import org.opendaylight.controller.cluster.raft.RaftActorContext; +import org.opendaylight.controller.cluster.raft.RaftState; import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry; import org.opendaylight.controller.cluster.raft.SerializationUtils; import org.opendaylight.controller.cluster.raft.base.messages.ApplyLogEntries; @@ -22,11 +24,9 @@ import org.opendaylight.controller.cluster.raft.messages.AppendEntries; import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply; import org.opendaylight.controller.cluster.raft.messages.RequestVote; import org.opendaylight.controller.cluster.raft.messages.RequestVoteReply; +import org.slf4j.Logger; import scala.concurrent.duration.FiniteDuration; -import java.util.Random; -import java.util.concurrent.TimeUnit; - /** * Abstract class that represents the behavior of a RaftActor *

@@ -47,7 +47,7 @@ public abstract class AbstractRaftActorBehavior implements RaftActorBehavior { /** * */ - protected final LoggingAdapter LOG; + protected final Logger LOG; /** * @@ -59,10 +59,37 @@ public abstract class AbstractRaftActorBehavior implements RaftActorBehavior { */ protected String leaderId = null; + private long replicatedToAllIndex = -1; + + private final String logName; - protected AbstractRaftActorBehavior(RaftActorContext context) { + private final RaftState state; + + protected AbstractRaftActorBehavior(RaftActorContext context, RaftState state) { this.context = context; + this.state = state; this.LOG = context.getLogger(); + + logName = String.format("%s (%s)", context.getId(), state); + } + + @Override + public RaftState state() { + return state; + } + + public String logName() { + return logName; + } + + @Override + public void setReplicatedToAllIndex(long replicatedToAllIndex) { + this.replicatedToAllIndex = replicatedToAllIndex; + } + + @Override + public long getReplicatedToAllIndex() { + return replicatedToAllIndex; } /** @@ -95,8 +122,8 @@ public abstract class AbstractRaftActorBehavior implements RaftActorBehavior { // 1. Reply false if term < currentTerm (§5.1) if (appendEntries.getTerm() < currentTerm()) { if(LOG.isDebugEnabled()) { - LOG.debug("Cannot append entries because sender term {} is less than {}", - appendEntries.getTerm(), currentTerm()); + LOG.debug("{}: Cannot append entries because sender term {} is less than {}", + logName(), appendEntries.getTerm(), currentTerm()); } sender.tell( @@ -133,12 +160,9 @@ public abstract class AbstractRaftActorBehavior implements RaftActorBehavior { * @param requestVote * @return */ - protected RaftActorBehavior requestVote(ActorRef sender, - RequestVote requestVote) { + protected RaftActorBehavior requestVote(ActorRef sender, RequestVote requestVote) { - if(LOG.isDebugEnabled()) { - LOG.debug(requestVote.toString()); - } + LOG.debug("{}: In requestVote: {}", logName(), requestVote); boolean grantVote = false; @@ -174,7 +198,11 @@ public abstract class AbstractRaftActorBehavior implements RaftActorBehavior { } } - sender.tell(new RequestVoteReply(currentTerm(), grantVote), actor()); + RequestVoteReply reply = new RequestVoteReply(currentTerm(), grantVote); + + LOG.debug("{}: requestVote returning: {}", logName(), reply); + + sender.tell(reply, actor()); return this; } @@ -202,7 +230,7 @@ public abstract class AbstractRaftActorBehavior implements RaftActorBehavior { protected FiniteDuration electionDuration() { long variance = new Random().nextInt(context.getConfigParams().getElectionTimeVariance()); return context.getConfigParams().getElectionTimeOutInterval().$plus( - new FiniteDuration(variance, TimeUnit.MILLISECONDS)); + new FiniteDuration(variance, TimeUnit.MILLISECONDS)); } /** @@ -350,13 +378,14 @@ public abstract class AbstractRaftActorBehavior implements RaftActorBehavior { } else { //if one index is not present in the log, no point in looping // around as the rest wont be present either - LOG.warning( - "Missing index {} from log. Cannot apply state. Ignoring {} to {}", i, i, index); + LOG.warn( + "{}: Missing index {} from log. Cannot apply state. Ignoring {} to {}", + logName(), i, i, index); break; } } if(LOG.isDebugEnabled()) { - LOG.debug("Setting last applied to {}", newLastApplied); + LOG.debug("{}: Setting last applied to {}", logName(), newLastApplied); } context.setLastApplied(newLastApplied); @@ -390,11 +419,11 @@ public abstract class AbstractRaftActorBehavior implements RaftActorBehavior { } protected RaftActorBehavior switchBehavior(RaftActorBehavior behavior) { - LOG.info("{} :- Switching from behavior {} to {}", context.getId(), this.state(), behavior.state()); + LOG.info("{} :- Switching from behavior {} to {}", logName(), this.state(), behavior.state()); try { close(); } catch (Exception e) { - LOG.error(e, "Failed to close behavior : {}", this.state()); + LOG.error("{}: Failed to close behavior : {}", logName(), this.state(), e); } return behavior; @@ -422,4 +451,29 @@ public abstract class AbstractRaftActorBehavior implements RaftActorBehavior { return numMajority; } + + + /** + * Performs a snapshot with no capture on the replicated log. + * It clears the log from the supplied index or last-applied-1 which ever is minimum. + * + * @param snapshotCapturedIndex + */ + protected void performSnapshotWithoutCapture(final long snapshotCapturedIndex) { + // we would want to keep the lastApplied as its used while capturing snapshots + long lastApplied = context.getLastApplied(); + long tempMin = Math.min(snapshotCapturedIndex, (lastApplied > -1 ? lastApplied - 1 : -1)); + + if (tempMin > -1 && context.getReplicatedLog().isPresent(tempMin)) { + LOG.debug("{}: fakeSnapshot purging log to {} for term {}", logName(), tempMin, + context.getTermInformation().getCurrentTerm()); + + //use the term of the temp-min, since we check for isPresent, entry will not be null + ReplicatedLogEntry entry = context.getReplicatedLog().get(tempMin); + context.getReplicatedLog().snapshotPreCommit(tempMin, entry.getTerm()); + context.getReplicatedLog().snapshotCommit(); + setReplicatedToAllIndex(tempMin); + } + } + }