import akka.cluster.Member;
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import java.util.Optional;
-import java.util.Random;
import java.util.Set;
+import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
-import org.opendaylight.controller.cluster.raft.ClientRequestTracker;
import org.opendaylight.controller.cluster.raft.RaftActorContext;
import org.opendaylight.controller.cluster.raft.RaftState;
import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
AbstractRaftActorBehavior(final RaftActorContext context, final RaftState state) {
this.context = requireNonNull(context);
this.state = requireNonNull(state);
- this.log = context.getLogger();
+ log = context.getLogger();
logName = String.format("%s (%s)", context.getId(), state);
}
// the log with the later term is more up-to-date. If the logs
// end with the same term, then whichever log is longer is
// more up-to-date.
- if (requestVote.getLastLogTerm() > lastTerm()) {
- candidateLatest = true;
- } else if (requestVote.getLastLogTerm() == lastTerm()
- && requestVote.getLastLogIndex() >= lastIndex()) {
+ if (requestVote.getLastLogTerm() > lastTerm()
+ || requestVote.getLastLogTerm() == lastTerm() && requestVote.getLastLogIndex() >= lastIndex()) {
candidateLatest = true;
}
* @return a random election duration
*/
protected FiniteDuration electionDuration() {
- long variance = new Random().nextInt(context.getConfigParams().getElectionTimeVariance());
+ long variance = ThreadLocalRandom.current().nextInt(context.getConfigParams().getElectionTimeVariance());
return context.getConfigParams().getElectionTimeOutInterval().$plus(
new FiniteDuration(variance, TimeUnit.MILLISECONDS));
}
*
* @param interval the duration after which we should trigger a new election
*/
+ // Non-final for testing
protected void scheduleElection(final FiniteDuration interval) {
stopElection();
*
* @return the actor
*/
- protected ActorRef actor() {
+ protected final ActorRef actor() {
return context.getActor();
}
return context.getReplicatedLog().lastIndex();
}
- /**
- * Removes and returns the ClientRequestTracker for the specified log index.
- * @param logIndex the log index
- * @return the ClientRequestTracker or null if none available
- */
- protected ClientRequestTracker removeClientRequestTracker(final long logIndex) {
- return null;
- }
-
/**
* Returns the actual index of the entry in replicated log for the given index or -1 if not found.
*
// Send a local message to the local RaftActor (it's derived class to be
// specific to apply the log to it's index)
- final ApplyState applyState;
- final ClientRequestTracker tracker = removeClientRequestTracker(i);
- if (tracker != null) {
- applyState = new ApplyState(tracker.getClientActor(), tracker.getIdentifier(), replicatedLogEntry);
- } else {
- applyState = new ApplyState(null, null, replicatedLogEntry);
- }
+ final ApplyState applyState = getApplyStateFor(replicatedLogEntry);
log.debug("{}: Setting last applied to {}", logName(), i);
actor().tell(new ApplyJournalEntries(context.getLastApplied()), actor());
}
+ /**
+ * Create an ApplyState message for a particular log entry so we can determine how to apply this entry.
+ *
+ * @param entry the log entry
+ * @return ApplyState for this entry
+ */
+ abstract ApplyState getApplyStateFor(ReplicatedLogEntry entry);
+
@Override
public RaftActorBehavior handleMessage(final ActorRef sender, final Object message) {
if (message instanceof AppendEntries) {
}
}
- protected String getId() {
+ protected final String getId() {
return context.getId();
}