* terms of the Eclipse Public License v1.0 which accompanies this distribution,
* and is available at http://www.eclipse.org/legal/epl-v10.html
*/
-
package org.opendaylight.controller.cluster.raft.behaviors;
import akka.actor.ActorRef;
+import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
-import org.opendaylight.controller.cluster.raft.FollowerLogInformationImpl;
+import com.google.common.base.Stopwatch;
+import java.util.concurrent.TimeUnit;
import org.opendaylight.controller.cluster.raft.RaftActorContext;
-import org.opendaylight.controller.cluster.raft.RaftReplicator;
-import org.opendaylight.controller.cluster.raft.RaftState;
-import org.opendaylight.controller.cluster.raft.internal.messages.SendHeartBeat;
-import org.opendaylight.controller.cluster.raft.messages.AppendEntries;
-import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply;
-import org.opendaylight.controller.cluster.raft.messages.RequestVote;
-import org.opendaylight.controller.cluster.raft.messages.RequestVoteReply;
-
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.atomic.AtomicLong;
+import org.opendaylight.controller.cluster.raft.base.messages.IsolatedLeaderCheck;
/**
* The behavior of a RaftActor when it is in the Leader state
- * <p>
+ * <p/>
* Leaders:
* <ul>
* <li> Upon election: send initial empty AppendEntries RPCs
* of matchIndex[i] ≥ N, and log[N].term == currentTerm:
* set commitIndex = N (§5.3, §5.4).
*/
-public class Leader extends AbstractRaftActorBehavior {
-
-
- private final Map<String, ActorRef> followerToReplicator = new HashMap<>();
+public class Leader extends AbstractLeader {
+ private static final IsolatedLeaderCheck ISOLATED_LEADER_CHECK = new IsolatedLeaderCheck();
+ private final Stopwatch isolatedLeaderCheck;
- public Leader(RaftActorContext context, List<String> followers){
+ public Leader(RaftActorContext context) {
super(context);
+ isolatedLeaderCheck = Stopwatch.createStarted();
+ }
- for(String follower : followers) {
-
- ActorRef replicator = context.actorOf(
- RaftReplicator.props(
- new FollowerLogInformationImpl(follower,
- new AtomicLong(0),
- new AtomicLong(0)),
- context.getActor()
- )
- );
+ @Override public RaftActorBehavior handleMessage(ActorRef sender, Object originalMessage) {
+ Preconditions.checkNotNull(sender, "sender should not be null");
- // Create a replicator for each follower
- followerToReplicator.put(follower, replicator);
+ if (originalMessage instanceof IsolatedLeaderCheck) {
+ if (isLeaderIsolated()) {
+ LOG.warn("{}: At least {} followers need to be active, Switching {} from Leader to IsolatedLeader",
+ context.getId(), minIsolatedLeaderPeerCount, leaderId);
+ return switchBehavior(new IsolatedLeader(context));
+ }
}
+ return super.handleMessage(sender, originalMessage);
}
- @Override protected RaftState handleAppendEntries(ActorRef sender,
- AppendEntries appendEntries, RaftState suggestedState) {
- return suggestedState;
- }
-
- @Override protected RaftState handleAppendEntriesReply(ActorRef sender,
- AppendEntriesReply appendEntriesReply, RaftState suggestedState) {
- return suggestedState;
- }
+ @Override
+ protected void beforeSendHeartbeat(){
+ if(isolatedLeaderCheck.elapsed(TimeUnit.MILLISECONDS) > context.getConfigParams().getIsolatedCheckIntervalInMillis()){
+ context.getActor().tell(ISOLATED_LEADER_CHECK, context.getActor());
+ isolatedLeaderCheck.reset().start();
+ }
- @Override protected RaftState handleRequestVote(ActorRef sender,
- RequestVote requestVote, RaftState suggestedState) {
- return suggestedState;
}
- @Override protected RaftState handleRequestVoteReply(ActorRef sender,
- RequestVoteReply requestVoteReply, RaftState suggestedState) {
- return suggestedState;
+ @Override
+ public void close() throws Exception {
+ super.close();
}
- @Override protected RaftState state() {
- return RaftState.Leader;
+ @VisibleForTesting
+ void markFollowerActive(String followerId) {
+ getFollower(followerId).markFollowerActive();
}
- @Override public RaftState handleMessage(ActorRef sender, Object message) {
- Preconditions.checkNotNull(sender, "sender should not be null");
-
- if(message instanceof SendHeartBeat) {
- sender.tell(new AppendEntries(
- context.getTermInformation().getCurrentTerm().get() , context.getId(),
- context.getReplicatedLog().last().getIndex(),
- context.getReplicatedLog().last().getTerm(),
- Collections.EMPTY_LIST, context.getCommitIndex().get()), context.getActor());
- return state();
- }
- return super.handleMessage(sender, message);
+ @VisibleForTesting
+ void markFollowerInActive(String followerId) {
+ getFollower(followerId).markFollowerInActive();
}
}