* terms of the Eclipse Public License v1.0 which accompanies this distribution,
* and is available at http://www.eclipse.org/legal/epl-v10.html
*/
-
package org.opendaylight.controller.cluster.raft.behaviors;
import akka.actor.ActorRef;
+import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
-import org.opendaylight.controller.cluster.raft.FollowerLogInformationImpl;
+import com.google.common.base.Stopwatch;
+import java.util.concurrent.TimeUnit;
import org.opendaylight.controller.cluster.raft.RaftActorContext;
-import org.opendaylight.controller.cluster.raft.RaftReplicator;
-import org.opendaylight.controller.cluster.raft.RaftState;
-import org.opendaylight.controller.cluster.raft.internal.messages.SendHeartBeat;
-import org.opendaylight.controller.cluster.raft.messages.AppendEntries;
-
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.atomic.AtomicLong;
+import org.opendaylight.controller.cluster.raft.base.messages.IsolatedLeaderCheck;
/**
* The behavior of a RaftActor when it is in the Leader state
- * <p>
+ * <p/>
* Leaders:
* <ul>
* <li> Upon election: send initial empty AppendEntries RPCs
* of matchIndex[i] ≥ N, and log[N].term == currentTerm:
* set commitIndex = N (§5.3, §5.4).
*/
-public class Leader extends AbstractRaftActorBehavior {
-
-
- private final Map<String, ActorRef> followerToReplicator = new HashMap<>();
+public class Leader extends AbstractLeader {
+ private static final IsolatedLeaderCheck ISOLATED_LEADER_CHECK = new IsolatedLeaderCheck();
+ private final Stopwatch isolatedLeaderCheck;
- public Leader(RaftActorContext context, List<String> followers){
+ public Leader(RaftActorContext context) {
super(context);
+ isolatedLeaderCheck = Stopwatch.createStarted();
+ }
+
+ @Override public RaftActorBehavior handleMessage(ActorRef sender, Object originalMessage) {
+ Preconditions.checkNotNull(sender, "sender should not be null");
- for(String follower : followers) {
+ if (originalMessage instanceof IsolatedLeaderCheck) {
+ if (isLeaderIsolated()) {
+ LOG.warn("{}: At least {} followers need to be active, Switching {} from Leader to IsolatedLeader",
+ context.getId(), minIsolatedLeaderPeerCount, leaderId);
- ActorRef replicator = context.actorOf(
- RaftReplicator.props(
- new FollowerLogInformationImpl(follower,
- new AtomicLong(0),
- new AtomicLong(0)),
- context.getActor()
- )
- );
+ return switchBehavior(new IsolatedLeader(context));
+ }
+ }
- // Create a replicator for each follower
- followerToReplicator.put(follower, replicator);
+ return super.handleMessage(sender, originalMessage);
+ }
+ @Override
+ protected void beforeSendHeartbeat(){
+ if(isolatedLeaderCheck.elapsed(TimeUnit.MILLISECONDS) > context.getConfigParams().getIsolatedCheckIntervalInMillis()){
+ context.getActor().tell(ISOLATED_LEADER_CHECK, context.getActor());
+ isolatedLeaderCheck.reset().start();
}
}
- @Override public RaftState handleMessage(ActorRef sender, Object message) {
- Preconditions.checkNotNull(sender, "sender should not be null");
+ @Override
+ public void close() throws Exception {
+ super.close();
+ }
- if(message instanceof SendHeartBeat) {
- sender.tell(new AppendEntries(
- context.getTermInformation().getCurrentTerm().get() , context.getId(),
- context.getReplicatedLog().last().getIndex(),
- context.getReplicatedLog().last().getTerm(),
- Collections.EMPTY_LIST), context.getActor());
- }
- return RaftState.Leader;
+ @VisibleForTesting
+ void markFollowerActive(String followerId) {
+ getFollower(followerId).markFollowerActive();
+ }
+
+ @VisibleForTesting
+ void markFollowerInActive(String followerId) {
+ getFollower(followerId).markFollowerInActive();
}
}