X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-akka-raft%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fraft%2Fbehaviors%2FAbstractRaftActorBehavior.java;h=d8b23f946e0f889b69a109d0d34463a7abb43f40;hp=d17fba03d4ba58e77168602ffe388431b0a64a1a;hb=bad1f8b8f3c1780cd37ec8a817ef4b0f23901654;hpb=f9c49de804eb7741df63d0a15889d485d65660e7 diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractRaftActorBehavior.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractRaftActorBehavior.java index d17fba03d4..d8b23f946e 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractRaftActorBehavior.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractRaftActorBehavior.java @@ -10,13 +10,13 @@ package org.opendaylight.controller.cluster.raft.behaviors; import akka.actor.ActorRef; import akka.actor.Cancellable; +import com.google.common.base.Preconditions; import java.util.Random; import java.util.concurrent.TimeUnit; import org.opendaylight.controller.cluster.raft.ClientRequestTracker; import org.opendaylight.controller.cluster.raft.RaftActorContext; import org.opendaylight.controller.cluster.raft.RaftState; import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry; -import org.opendaylight.controller.cluster.raft.SerializationUtils; import org.opendaylight.controller.cluster.raft.base.messages.ApplyJournalEntries; import org.opendaylight.controller.cluster.raft.base.messages.ApplyState; import org.opendaylight.controller.cluster.raft.base.messages.ElectionTimeout; @@ -38,9 +38,6 @@ import scala.concurrent.duration.FiniteDuration; * set currentTerm = T, convert to follower (§5.1) */ public abstract class AbstractRaftActorBehavior implements RaftActorBehavior { - - protected static final ElectionTimeout ELECTION_TIMEOUT = new ElectionTimeout(); - /** * Information about the RaftActor whose behavior this class represents */ @@ -56,33 +53,43 @@ public abstract class AbstractRaftActorBehavior implements RaftActorBehavior { */ private Cancellable electionCancel = null; - /** - * - */ - protected String leaderId = null; - - private short leaderPayloadVersion = -1; - private long replicatedToAllIndex = -1; private final String logName; private final RaftState state; - protected AbstractRaftActorBehavior(RaftActorContext context, RaftState state) { - this.context = context; - this.state = state; + AbstractRaftActorBehavior(final RaftActorContext context, final RaftState state) { + this.context = Preconditions.checkNotNull(context); + this.state = Preconditions.checkNotNull(state); this.LOG = context.getLogger(); logName = String.format("%s (%s)", context.getId(), state); } + public static RaftActorBehavior createBehavior(final RaftActorContext context, final RaftState state) { + switch (state) { + case Candidate: + return new Candidate(context); + case Follower: + return new Follower(context); + case IsolatedLeader: + return new IsolatedLeader(context); + case Leader: + return new Leader(context); + case PreLeader: + return new PreLeader(context); + default: + throw new IllegalArgumentException("Unhandled state " + state); + } + } + @Override - public RaftState state() { + public final RaftState state() { return state; } - public String logName() { + protected final String logName() { return logName; } @@ -264,11 +271,9 @@ public abstract class AbstractRaftActorBehavior implements RaftActorBehavior { protected void scheduleElection(FiniteDuration interval) { stopElection(); - if(canStartElection()) { - // Schedule an election. When the scheduler triggers an ElectionTimeout message is sent to itself - electionCancel = context.getActorSystem().scheduler().scheduleOnce(interval, context.getActor(), - ELECTION_TIMEOUT,context.getActorSystem().dispatcher(), context.getActor()); - } + // Schedule an election. When the scheduler triggers an ElectionTimeout message is sent to itself + electionCancel = context.getActorSystem().scheduler().scheduleOnce(interval, context.getActor(), + ElectionTimeout.INSTANCE, context.getActorSystem().dispatcher(), context.getActor()); } /** @@ -307,14 +312,6 @@ public abstract class AbstractRaftActorBehavior implements RaftActorBehavior { return context.getReplicatedLog().lastIndex(); } - /** - * @param logIndex - * @return the client request tracker for the specified logIndex - */ - protected ClientRequestTracker findClientRequestTracker(long logIndex) { - return null; - } - /** * @param logIndex * @return the client request tracker for the specified logIndex @@ -323,29 +320,36 @@ public abstract class AbstractRaftActorBehavior implements RaftActorBehavior { return null; } - /** * - * @return log index from the previous to last entry in the log + * @return the log entry index for the given index or -1 if not found */ - protected long prevLogIndex(long index){ - ReplicatedLogEntry prevEntry = - context.getReplicatedLog().get(index - 1); - if (prevEntry != null) { - return prevEntry.getIndex(); + protected long getLogEntryIndex(long index){ + if(index == context.getReplicatedLog().getSnapshotIndex()){ + return context.getReplicatedLog().getSnapshotIndex(); } + + ReplicatedLogEntry entry = context.getReplicatedLog().get(index); + if(entry != null){ + return entry.getIndex(); + } + return -1; } /** - * @return log term from the previous to last entry in the log + * @return the log entry term for the given index or -1 if not found */ - protected long prevLogTerm(long index){ - ReplicatedLogEntry prevEntry = - context.getReplicatedLog().get(index - 1); - if (prevEntry != null) { - return prevEntry.getTerm(); + protected long getLogEntryTerm(long index){ + if(index == context.getReplicatedLog().getSnapshotIndex()){ + return context.getReplicatedLog().getSnapshotTerm(); + } + + ReplicatedLogEntry entry = context.getReplicatedLog().get(index); + if(entry != null){ + return entry.getTerm(); } + return -1; } @@ -357,30 +361,27 @@ public abstract class AbstractRaftActorBehavior implements RaftActorBehavior { protected void applyLogToStateMachine(final long index) { long newLastApplied = context.getLastApplied(); // Now maybe we apply to the state machine - for (long i = context.getLastApplied() + 1; - i < index + 1; i++) { - ActorRef clientActor = null; - String identifier = null; - ClientRequestTracker tracker = removeClientRequestTracker(i); - - if (tracker != null) { - clientActor = tracker.getClientActor(); - identifier = tracker.getIdentifier(); - } - ReplicatedLogEntry replicatedLogEntry = - context.getReplicatedLog().get(i); + for (long i = context.getLastApplied() + 1; i < index + 1; i++) { + ReplicatedLogEntry replicatedLogEntry = context.getReplicatedLog().get(i); if (replicatedLogEntry != null) { // Send a local message to the local RaftActor (it's derived class to be // specific to apply the log to it's index) - actor().tell(new ApplyState(clientActor, identifier, - replicatedLogEntry), actor()); + + final ApplyState msg; + final ClientRequestTracker tracker = removeClientRequestTracker(i); + if (tracker != null) { + msg = new ApplyState(tracker.getClientActor(), tracker.getIdentifier(), replicatedLogEntry); + } else { + msg = new ApplyState(null, null, replicatedLogEntry); + } + + actor().tell(msg, actor()); newLastApplied = i; } else { //if one index is not present in the log, no point in looping // around as the rest wont be present either - LOG.warn( - "{}: Missing index {} from log. Cannot apply state. Ignoring {} to {}", + LOG.warn("{}: Missing index {} from log. Cannot apply state. Ignoring {} to {}", logName(), i, i, index); break; } @@ -397,10 +398,6 @@ public abstract class AbstractRaftActorBehavior implements RaftActorBehavior { actor().tell(new ApplyJournalEntries(context.getLastApplied()), actor()); } - protected Object fromSerializableMessage(Object serializable){ - return SerializationUtils.fromSerializable(serializable); - } - @Override public RaftActorBehavior handleMessage(ActorRef sender, Object message) { if (message instanceof AppendEntries) { @@ -411,21 +408,9 @@ public abstract class AbstractRaftActorBehavior implements RaftActorBehavior { return requestVote(sender, (RequestVote) message); } else if (message instanceof RequestVoteReply) { return handleRequestVoteReply(sender, (RequestVoteReply) message); + } else { + return null; } - return this; - } - - @Override public String getLeaderId() { - return leaderId; - } - - @Override - public short getLeaderPayloadVersion() { - return leaderPayloadVersion; - } - - public void setLeaderPayloadVersion(short leaderPayloadVersion) { - this.leaderPayloadVersion = leaderPayloadVersion; } @Override @@ -435,12 +420,12 @@ public abstract class AbstractRaftActorBehavior implements RaftActorBehavior { protected RaftActorBehavior internalSwitchBehavior(RaftState newState) { if(context.getRaftPolicy().automaticElectionsEnabled()){ - return internalSwitchBehavior(newState.createBehavior(context)); + return internalSwitchBehavior(createBehavior(context, newState)); } return this; } - private RaftActorBehavior internalSwitchBehavior(RaftActorBehavior newBehavior) { + protected RaftActorBehavior internalSwitchBehavior(RaftActorBehavior newBehavior) { LOG.info("{} :- Switching from behavior {} to {}", logName(), this.state(), newBehavior.state()); try { close(); @@ -482,7 +467,7 @@ public abstract class AbstractRaftActorBehavior implements RaftActorBehavior { * @param snapshotCapturedIndex */ protected void performSnapshotWithoutCapture(final long snapshotCapturedIndex) { - long actualIndex = context.getSnapshotManager().trimLog(snapshotCapturedIndex, this); + long actualIndex = context.getSnapshotManager().trimLog(snapshotCapturedIndex); if(actualIndex != -1){ setReplicatedToAllIndex(actualIndex);