Bug-2277 : Isolated Leader Implementation 99/12799/10
authorKamal Rameshan <kramesha@cisco.com>
Thu, 13 Nov 2014 00:00:34 +0000 (16:00 -0800)
committerKamal Rameshan <kramesha@cisco.com>
Wed, 19 Nov 2014 06:44:42 +0000 (22:44 -0800)
A new RaftState has been added and on a scheduler, we check if the leader is isolated, looking at the majority peer statuses.
If Yes, then it switches the behavior to IsolatedLeader.

On the receipt of each AppendEntriesReply, the IsolatedLeader checks for isolation.
And if no, then switches back to either Leader or Follower.

These changes have been tested with TestDriver  and 3 node cluster and the switching of Leader to IsolatedLeader and back,
on the stopping and reinstating of followers, was successful.

The Isolated Leader check interval can be configured via the config subsystem

Change-Id: I42b9165cc477d812c7e0e02339537c0f1fe74934
Signed-off-by: Kamal Rameshan <kramesha@cisco.com>
17 files changed:
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/example/ExampleActor.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/example/TestDriver.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/ConfigParams.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/DefaultConfigParamsImpl.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftState.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/base/messages/IsolatedLeaderCheck.java [new file with mode: 0644]
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractLeader.java [new file with mode: 0644]
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractRaftActorBehavior.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/Candidate.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/IsolatedLeader.java [new file with mode: 0644]
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/Leader.java
opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/IsolatedLeaderTest.java [new file with mode: 0644]
opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/LeaderTest.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DatastoreContext.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/config/yang/config/distributed_datastore_provider/DistributedConfigDataStoreProviderModule.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/config/yang/config/distributed_datastore_provider/DistributedOperationalDataStoreProviderModule.java
opendaylight/md-sal/sal-distributed-datastore/src/main/yang/distributed-datastore-provider.yang

index 8e4a44c..6dfa4af 100644 (file)
@@ -80,7 +80,7 @@ public class ExampleActor extends RaftActor {
         } else if (message instanceof PrintRole) {
             if(LOG.isDebugEnabled()) {
                 String followers = "";
-                if (getRaftState() == RaftState.Leader) {
+                if (getRaftState() == RaftState.Leader || getRaftState() == RaftState.IsolatedLeader) {
                     followers = ((Leader)this.getCurrentBehavior()).printFollowerStates();
                     LOG.debug("{} = {}, Peers={}, followers={}", getId(), getRaftState(), getPeers(), followers);
                 } else {
index f202a8b..de61697 100644 (file)
@@ -44,6 +44,11 @@ public class TestDriver {
      *  stopLoggingForClient:{nodeName}
      *  printNodes
      *  printState
+     *
+     *  Note: when run on IDE and on debug log level, the debug logs in
+     *  AbstractUptypedActor and AbstractUptypedPersistentActor would need to be commented out.
+     *  Also RaftActor handleCommand(), debug log which prints for every command other than AE/AER
+     *
      * @param args
      * @throws Exception
      */
index bff2a27..433c3f7 100644 (file)
@@ -62,4 +62,10 @@ public interface ConfigParams {
      * The number of journal log entries to batch on recovery before applying.
      */
     int getJournalRecoveryLogBatchSize();
+
+    /**
+     * The interval in which the leader needs to check itself if its isolated
+     * @return FiniteDuration
+     */
+    FiniteDuration getIsolatedCheckInterval();
 }
index dc41453..a209223 100644 (file)
@@ -44,6 +44,8 @@ public class DefaultConfigParamsImpl implements ConfigParams {
     private FiniteDuration heartBeatInterval = HEART_BEAT_INTERVAL;
     private long snapshotBatchCount = SNAPSHOT_BATCH_COUNT;
     private int journalRecoveryLogBatchSize = JOURNAL_RECOVERY_LOG_BATCH_SIZE;
+    private FiniteDuration isolatedLeaderCheckInterval =
+        new FiniteDuration(HEART_BEAT_INTERVAL.length() * 1000, HEART_BEAT_INTERVAL.unit());
 
     public void setHeartBeatInterval(FiniteDuration heartBeatInterval) {
         this.heartBeatInterval = heartBeatInterval;
@@ -57,6 +59,10 @@ public class DefaultConfigParamsImpl implements ConfigParams {
         this.journalRecoveryLogBatchSize = journalRecoveryLogBatchSize;
     }
 
+    public void setIsolatedLeaderCheckInterval(FiniteDuration isolatedLeaderCheckInterval) {
+        this.isolatedLeaderCheckInterval = isolatedLeaderCheckInterval;
+    }
+
     @Override
     public long getSnapshotBatchCount() {
         return snapshotBatchCount;
@@ -87,4 +93,9 @@ public class DefaultConfigParamsImpl implements ConfigParams {
     public int getJournalRecoveryLogBatchSize() {
         return journalRecoveryLogBatchSize;
     }
+
+    @Override
+    public FiniteDuration getIsolatedCheckInterval() {
+        return isolatedLeaderCheckInterval;
+    }
 }
diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/base/messages/IsolatedLeaderCheck.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/base/messages/IsolatedLeaderCheck.java
new file mode 100644 (file)
index 0000000..36fd813
--- /dev/null
@@ -0,0 +1,15 @@
+/*
+ * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.raft.base.messages;
+
+/**
+ * Message sent by the IsolatedLeaderCheck scheduler in the Leader to itself
+ * in order to check if its isolated.
+ */
+public class IsolatedLeaderCheck {
+}
diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractLeader.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractLeader.java
new file mode 100644 (file)
index 0000000..d85ac8e
--- /dev/null
@@ -0,0 +1,738 @@
+/*
+ * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.cluster.raft.behaviors;
+
+import akka.actor.ActorRef;
+import akka.actor.ActorSelection;
+import akka.actor.Cancellable;
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Optional;
+import com.google.common.base.Preconditions;
+import com.google.protobuf.ByteString;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+import org.opendaylight.controller.cluster.raft.ClientRequestTracker;
+import org.opendaylight.controller.cluster.raft.ClientRequestTrackerImpl;
+import org.opendaylight.controller.cluster.raft.FollowerLogInformation;
+import org.opendaylight.controller.cluster.raft.FollowerLogInformationImpl;
+import org.opendaylight.controller.cluster.raft.RaftActorContext;
+import org.opendaylight.controller.cluster.raft.RaftState;
+import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
+import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshot;
+import org.opendaylight.controller.cluster.raft.base.messages.InitiateInstallSnapshot;
+import org.opendaylight.controller.cluster.raft.base.messages.Replicate;
+import org.opendaylight.controller.cluster.raft.base.messages.SendHeartBeat;
+import org.opendaylight.controller.cluster.raft.base.messages.SendInstallSnapshot;
+import org.opendaylight.controller.cluster.raft.messages.AppendEntries;
+import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply;
+import org.opendaylight.controller.cluster.raft.messages.InstallSnapshot;
+import org.opendaylight.controller.cluster.raft.messages.InstallSnapshotReply;
+import org.opendaylight.controller.cluster.raft.messages.RaftRPC;
+import org.opendaylight.controller.cluster.raft.messages.RequestVoteReply;
+import scala.concurrent.duration.FiniteDuration;
+
+/**
+ * The behavior of a RaftActor when it is in the Leader state
+ * <p/>
+ * Leaders:
+ * <ul>
+ * <li> Upon election: send initial empty AppendEntries RPCs
+ * (heartbeat) to each server; repeat during idle periods to
+ * prevent election timeouts (§5.2)
+ * <li> If command received from client: append entry to local log,
+ * respond after entry applied to state machine (§5.3)
+ * <li> If last log index ≥ nextIndex for a follower: send
+ * AppendEntries RPC with log entries starting at nextIndex
+ * <ul>
+ * <li> If successful: update nextIndex and matchIndex for
+ * follower (§5.3)
+ * <li> If AppendEntries fails because of log inconsistency:
+ * decrement nextIndex and retry (§5.3)
+ * </ul>
+ * <li> If there exists an N such that N > commitIndex, a majority
+ * of matchIndex[i] ≥ N, and log[N].term == currentTerm:
+ * set commitIndex = N (§5.3, §5.4).
+ */
+public abstract class AbstractLeader extends AbstractRaftActorBehavior {
+    protected final Map<String, FollowerLogInformation> followerToLog = new HashMap<>();
+    protected final Map<String, FollowerToSnapshot> mapFollowerToSnapshot = new HashMap<>();
+
+    protected final Set<String> followers;
+
+    private Cancellable heartbeatSchedule = null;
+
+    private List<ClientRequestTracker> trackerList = new ArrayList<>();
+
+    protected final int minReplicationCount;
+
+    protected final int minIsolatedLeaderPeerCount;
+
+    private Optional<ByteString> snapshot;
+
+    public AbstractLeader(RaftActorContext context) {
+        super(context);
+
+        followers = context.getPeerAddresses().keySet();
+
+        for (String followerId : followers) {
+            FollowerLogInformation followerLogInformation =
+                new FollowerLogInformationImpl(followerId,
+                    new AtomicLong(context.getCommitIndex()),
+                    new AtomicLong(-1),
+                    context.getConfigParams().getElectionTimeOutInterval());
+
+            followerToLog.put(followerId, followerLogInformation);
+        }
+
+        leaderId = context.getId();
+
+        if(LOG.isDebugEnabled()) {
+            LOG.debug("Election:Leader has following peers: {}", followers);
+        }
+
+        minReplicationCount = getMajorityVoteCount(followers.size());
+
+        // the isolated Leader peer count will be 1 less than the majority vote count.
+        // this is because the vote count has the self vote counted in it
+        // for e.g
+        // 0 peers = 1 votesRequired , minIsolatedLeaderPeerCount = 0
+        // 2 peers = 2 votesRequired , minIsolatedLeaderPeerCount = 1
+        // 4 peers = 3 votesRequired, minIsolatedLeaderPeerCount = 2
+        minIsolatedLeaderPeerCount = minReplicationCount > 0 ? (minReplicationCount - 1) : 0;
+
+        snapshot = Optional.absent();
+
+        // Immediately schedule a heartbeat
+        // Upon election: send initial empty AppendEntries RPCs
+        // (heartbeat) to each server; repeat during idle periods to
+        // prevent election timeouts (§5.2)
+        scheduleHeartBeat(new FiniteDuration(0, TimeUnit.SECONDS));
+    }
+
+    private Optional<ByteString> getSnapshot() {
+        return snapshot;
+    }
+
+    @VisibleForTesting
+    void setSnapshot(Optional<ByteString> snapshot) {
+        this.snapshot = snapshot;
+    }
+
+    @Override
+    protected RaftActorBehavior handleAppendEntries(ActorRef sender,
+        AppendEntries appendEntries) {
+
+        if(LOG.isDebugEnabled()) {
+            LOG.debug(appendEntries.toString());
+        }
+
+        return this;
+    }
+
+    @Override
+    protected RaftActorBehavior handleAppendEntriesReply(ActorRef sender,
+        AppendEntriesReply appendEntriesReply) {
+
+        if(! appendEntriesReply.isSuccess()) {
+            if(LOG.isDebugEnabled()) {
+                LOG.debug(appendEntriesReply.toString());
+            }
+        }
+
+        // Update the FollowerLogInformation
+        String followerId = appendEntriesReply.getFollowerId();
+        FollowerLogInformation followerLogInformation =
+            followerToLog.get(followerId);
+
+        if(followerLogInformation == null){
+            LOG.error("Unknown follower {}", followerId);
+            return this;
+        }
+
+        followerLogInformation.markFollowerActive();
+
+        if (appendEntriesReply.isSuccess()) {
+            followerLogInformation
+                .setMatchIndex(appendEntriesReply.getLogLastIndex());
+            followerLogInformation
+                .setNextIndex(appendEntriesReply.getLogLastIndex() + 1);
+        } else {
+
+            // TODO: When we find that the follower is out of sync with the
+            // Leader we simply decrement that followers next index by 1.
+            // Would it be possible to do better than this? The RAFT spec
+            // does not explicitly deal with it but may be something for us to
+            // think about
+
+            followerLogInformation.decrNextIndex();
+        }
+
+        // Now figure out if this reply warrants a change in the commitIndex
+        // If there exists an N such that N > commitIndex, a majority
+        // of matchIndex[i] ≥ N, and log[N].term == currentTerm:
+        // set commitIndex = N (§5.3, §5.4).
+        for (long N = context.getCommitIndex() + 1; ; N++) {
+            int replicatedCount = 1;
+
+            for (FollowerLogInformation info : followerToLog.values()) {
+                if (info.getMatchIndex().get() >= N) {
+                    replicatedCount++;
+                }
+            }
+
+            if (replicatedCount >= minReplicationCount) {
+                ReplicatedLogEntry replicatedLogEntry = context.getReplicatedLog().get(N);
+                if (replicatedLogEntry != null &&
+                    replicatedLogEntry.getTerm() == currentTerm()) {
+                    context.setCommitIndex(N);
+                }
+            } else {
+                break;
+            }
+        }
+
+        // Apply the change to the state machine
+        if (context.getCommitIndex() > context.getLastApplied()) {
+            applyLogToStateMachine(context.getCommitIndex());
+        }
+
+        return this;
+    }
+
+    protected ClientRequestTracker removeClientRequestTracker(long logIndex) {
+
+        ClientRequestTracker toRemove = findClientRequestTracker(logIndex);
+        if(toRemove != null) {
+            trackerList.remove(toRemove);
+        }
+
+        return toRemove;
+    }
+
+    protected ClientRequestTracker findClientRequestTracker(long logIndex) {
+        for (ClientRequestTracker tracker : trackerList) {
+            if (tracker.getIndex() == logIndex) {
+                return tracker;
+            }
+        }
+        return null;
+    }
+
+    @Override
+    protected RaftActorBehavior handleRequestVoteReply(ActorRef sender,
+        RequestVoteReply requestVoteReply) {
+        return this;
+    }
+
+    @Override
+    public RaftState state() {
+        return RaftState.Leader;
+    }
+
+    @Override
+    public RaftActorBehavior handleMessage(ActorRef sender, Object originalMessage) {
+        Preconditions.checkNotNull(sender, "sender should not be null");
+
+        Object message = fromSerializableMessage(originalMessage);
+
+        if (message instanceof RaftRPC) {
+            RaftRPC rpc = (RaftRPC) message;
+            // If RPC request or response contains term T > currentTerm:
+            // set currentTerm = T, convert to follower (§5.1)
+            // This applies to all RPC messages and responses
+            if (rpc.getTerm() > context.getTermInformation().getCurrentTerm()) {
+                context.getTermInformation().updateAndPersist(rpc.getTerm(), null);
+
+                return switchBehavior(new Follower(context));
+            }
+        }
+
+        try {
+            if (message instanceof SendHeartBeat) {
+                sendHeartBeat();
+                return this;
+
+            } else if(message instanceof InitiateInstallSnapshot) {
+                installSnapshotIfNeeded();
+
+            } else if(message instanceof SendInstallSnapshot) {
+                // received from RaftActor
+                setSnapshot(Optional.of(((SendInstallSnapshot) message).getSnapshot()));
+                sendInstallSnapshot();
+
+            } else if (message instanceof Replicate) {
+                replicate((Replicate) message);
+
+            } else if (message instanceof InstallSnapshotReply){
+                handleInstallSnapshotReply((InstallSnapshotReply) message);
+
+            }
+        } finally {
+            scheduleHeartBeat(context.getConfigParams().getHeartBeatInterval());
+        }
+
+        return super.handleMessage(sender, message);
+    }
+
+    private void handleInstallSnapshotReply(InstallSnapshotReply reply) {
+        String followerId = reply.getFollowerId();
+        FollowerToSnapshot followerToSnapshot = mapFollowerToSnapshot.get(followerId);
+        FollowerLogInformation followerLogInformation = followerToLog.get(followerId);
+        followerLogInformation.markFollowerActive();
+
+        if (followerToSnapshot != null &&
+            followerToSnapshot.getChunkIndex() == reply.getChunkIndex()) {
+
+            if (reply.isSuccess()) {
+                if(followerToSnapshot.isLastChunk(reply.getChunkIndex())) {
+                    //this was the last chunk reply
+                    if(LOG.isDebugEnabled()) {
+                        LOG.debug("InstallSnapshotReply received, " +
+                                "last chunk received, Chunk:{}. Follower:{} Setting nextIndex:{}",
+                            reply.getChunkIndex(), followerId,
+                            context.getReplicatedLog().getSnapshotIndex() + 1
+                        );
+                    }
+
+                    followerLogInformation.setMatchIndex(
+                        context.getReplicatedLog().getSnapshotIndex());
+                    followerLogInformation.setNextIndex(
+                        context.getReplicatedLog().getSnapshotIndex() + 1);
+                    mapFollowerToSnapshot.remove(followerId);
+
+                    if(LOG.isDebugEnabled()) {
+                        LOG.debug("followerToLog.get(followerId).getNextIndex().get()=" +
+                            followerToLog.get(followerId).getNextIndex().get());
+                    }
+
+                    if (mapFollowerToSnapshot.isEmpty()) {
+                        // once there are no pending followers receiving snapshots
+                        // we can remove snapshot from the memory
+                        setSnapshot(Optional.<ByteString>absent());
+                    }
+
+                } else {
+                    followerToSnapshot.markSendStatus(true);
+                }
+            } else {
+                LOG.info("InstallSnapshotReply received, " +
+                        "sending snapshot chunk failed, Will retry, Chunk:{}",
+                    reply.getChunkIndex()
+                );
+                followerToSnapshot.markSendStatus(false);
+            }
+
+        } else {
+            LOG.error("ERROR!!" +
+                    "FollowerId in InstallSnapshotReply not known to Leader" +
+                    " or Chunk Index in InstallSnapshotReply not matching {} != {}",
+                followerToSnapshot.getChunkIndex(), reply.getChunkIndex()
+            );
+        }
+    }
+
+    private void replicate(Replicate replicate) {
+        long logIndex = replicate.getReplicatedLogEntry().getIndex();
+
+        if(LOG.isDebugEnabled()) {
+            LOG.debug("Replicate message {}", logIndex);
+        }
+
+        // Create a tracker entry we will use this later to notify the
+        // client actor
+        trackerList.add(
+            new ClientRequestTrackerImpl(replicate.getClientActor(),
+                replicate.getIdentifier(),
+                logIndex)
+        );
+
+        if (followers.size() == 0) {
+            context.setCommitIndex(logIndex);
+            applyLogToStateMachine(logIndex);
+        } else {
+            sendAppendEntries();
+        }
+    }
+
+    private void sendAppendEntries() {
+        // Send an AppendEntries to all followers
+        for (String followerId : followers) {
+            ActorSelection followerActor = context.getPeerActorSelection(followerId);
+
+            if (followerActor != null) {
+                FollowerLogInformation followerLogInformation = followerToLog.get(followerId);
+                long followerNextIndex = followerLogInformation.getNextIndex().get();
+                boolean isFollowerActive = followerLogInformation.isFollowerActive();
+                List<ReplicatedLogEntry> entries = null;
+
+                if (mapFollowerToSnapshot.get(followerId) != null) {
+                    // if install snapshot is in process , then sent next chunk if possible
+                    if (isFollowerActive && mapFollowerToSnapshot.get(followerId).canSendNextChunk()) {
+                        sendSnapshotChunk(followerActor, followerId);
+                    } else {
+                        // we send a heartbeat even if we have not received a reply for the last chunk
+                        sendAppendEntriesToFollower(followerActor, followerNextIndex,
+                            Collections.<ReplicatedLogEntry>emptyList());
+                    }
+
+                } else {
+                    long leaderLastIndex = context.getReplicatedLog().lastIndex();
+                    long leaderSnapShotIndex = context.getReplicatedLog().getSnapshotIndex();
+
+                    if (isFollowerActive &&
+                        context.getReplicatedLog().isPresent(followerNextIndex)) {
+                        // FIXME : Sending one entry at a time
+                        entries = context.getReplicatedLog().getFrom(followerNextIndex, 1);
+
+                    } else if (isFollowerActive && followerNextIndex >= 0 &&
+                        leaderLastIndex >= followerNextIndex ) {
+                        // if the followers next index is not present in the leaders log, and
+                        // if the follower is just not starting and if leader's index is more than followers index
+                        // then snapshot should be sent
+
+                        if(LOG.isDebugEnabled()) {
+                            LOG.debug("InitiateInstallSnapshot to follower:{}," +
+                                    "follower-nextIndex:{}, leader-snapshot-index:{},  " +
+                                    "leader-last-index:{}", followerId,
+                                followerNextIndex, leaderSnapShotIndex, leaderLastIndex
+                            );
+                        }
+                        actor().tell(new InitiateInstallSnapshot(), actor());
+
+                        // we would want to sent AE as the capture snapshot might take time
+                        entries =  Collections.<ReplicatedLogEntry>emptyList();
+
+                    } else {
+                        //we send an AppendEntries, even if the follower is inactive
+                        // in-order to update the followers timestamp, in case it becomes active again
+                        entries =  Collections.<ReplicatedLogEntry>emptyList();
+                    }
+
+                    sendAppendEntriesToFollower(followerActor, followerNextIndex, entries);
+
+                }
+            }
+        }
+    }
+
+    private void sendAppendEntriesToFollower(ActorSelection followerActor, long followerNextIndex,
+        List<ReplicatedLogEntry> entries) {
+        followerActor.tell(
+            new AppendEntries(currentTerm(), context.getId(),
+                prevLogIndex(followerNextIndex),
+                prevLogTerm(followerNextIndex), entries,
+                context.getCommitIndex()).toSerializable(),
+            actor()
+        );
+    }
+
+    /**
+     * An installSnapshot is scheduled at a interval that is a multiple of
+     * a HEARTBEAT_INTERVAL. This is to avoid the need to check for installing
+     * snapshots at every heartbeat.
+     *
+     * Install Snapshot works as follows
+     * 1. Leader sends a InitiateInstallSnapshot message to self
+     * 2. Leader then initiates the capture snapshot by sending a CaptureSnapshot message to actor
+     * 3. RaftActor on receipt of the CaptureSnapshotReply (from Shard), stores the received snapshot in the replicated log
+     * and makes a call to Leader's handleMessage , with SendInstallSnapshot message.
+     * 4. Leader , picks the snapshot from im-mem ReplicatedLog and sends it in chunks to the Follower
+     * 5. On complete, Follower sends back a InstallSnapshotReply.
+     * 6. On receipt of the InstallSnapshotReply for the last chunk, Leader marks the install complete for that follower
+     * and replenishes the memory by deleting the snapshot in Replicated log.
+     *
+     */
+    private void installSnapshotIfNeeded() {
+        for (String followerId : followers) {
+            ActorSelection followerActor =
+                context.getPeerActorSelection(followerId);
+
+            if(followerActor != null) {
+                FollowerLogInformation followerLogInformation =
+                    followerToLog.get(followerId);
+
+                long nextIndex = followerLogInformation.getNextIndex().get();
+
+                if (!context.getReplicatedLog().isPresent(nextIndex) &&
+                    context.getReplicatedLog().isInSnapshot(nextIndex)) {
+                    LOG.info("{} follower needs a snapshot install", followerId);
+                    if (snapshot.isPresent()) {
+                        // if a snapshot is present in the memory, most likely another install is in progress
+                        // no need to capture snapshot
+                        sendSnapshotChunk(followerActor, followerId);
+
+                    } else {
+                        initiateCaptureSnapshot();
+                        //we just need 1 follower who would need snapshot to be installed.
+                        // when we have the snapshot captured, we would again check (in SendInstallSnapshot)
+                        // who needs an install and send to all who need
+                        break;
+                    }
+
+                }
+            }
+        }
+    }
+
+    // on every install snapshot, we try to capture the snapshot.
+    // Once a capture is going on, another one issued will get ignored by RaftActor.
+    private void initiateCaptureSnapshot() {
+        LOG.info("Initiating Snapshot Capture to Install Snapshot, Leader:{}", getLeaderId());
+        ReplicatedLogEntry lastAppliedEntry = context.getReplicatedLog().get(context.getLastApplied());
+        long lastAppliedIndex = -1;
+        long lastAppliedTerm = -1;
+
+        if (lastAppliedEntry != null) {
+            lastAppliedIndex = lastAppliedEntry.getIndex();
+            lastAppliedTerm = lastAppliedEntry.getTerm();
+        } else if (context.getReplicatedLog().getSnapshotIndex() > -1)  {
+            lastAppliedIndex = context.getReplicatedLog().getSnapshotIndex();
+            lastAppliedTerm = context.getReplicatedLog().getSnapshotTerm();
+        }
+
+        boolean isInstallSnapshotInitiated = true;
+        actor().tell(new CaptureSnapshot(lastIndex(), lastTerm(),
+                lastAppliedIndex, lastAppliedTerm, isInstallSnapshotInitiated),
+            actor());
+    }
+
+
+    private void sendInstallSnapshot() {
+        for (String followerId : followers) {
+            ActorSelection followerActor = context.getPeerActorSelection(followerId);
+
+            if(followerActor != null) {
+                FollowerLogInformation followerLogInformation = followerToLog.get(followerId);
+                long nextIndex = followerLogInformation.getNextIndex().get();
+
+                if (!context.getReplicatedLog().isPresent(nextIndex) &&
+                    context.getReplicatedLog().isInSnapshot(nextIndex)) {
+                    sendSnapshotChunk(followerActor, followerId);
+                }
+            }
+        }
+    }
+
+    /**
+     *  Sends a snapshot chunk to a given follower
+     *  InstallSnapshot should qualify as a heartbeat too.
+     */
+    private void sendSnapshotChunk(ActorSelection followerActor, String followerId) {
+        try {
+            if (snapshot.isPresent()) {
+                followerActor.tell(
+                    new InstallSnapshot(currentTerm(), context.getId(),
+                        context.getReplicatedLog().getSnapshotIndex(),
+                        context.getReplicatedLog().getSnapshotTerm(),
+                        getNextSnapshotChunk(followerId,snapshot.get()),
+                        mapFollowerToSnapshot.get(followerId).incrementChunkIndex(),
+                        mapFollowerToSnapshot.get(followerId).getTotalChunks()
+                    ).toSerializable(),
+                    actor()
+                );
+                LOG.info("InstallSnapshot sent to follower {}, Chunk: {}/{}",
+                    followerActor.path(), mapFollowerToSnapshot.get(followerId).getChunkIndex(),
+                    mapFollowerToSnapshot.get(followerId).getTotalChunks());
+            }
+        } catch (IOException e) {
+            LOG.error(e, "InstallSnapshot failed for Leader.");
+        }
+    }
+
+    /**
+     * Acccepts snaphot as ByteString, enters into map for future chunks
+     * creates and return a ByteString chunk
+     */
+    private ByteString getNextSnapshotChunk(String followerId, ByteString snapshotBytes) throws IOException {
+        FollowerToSnapshot followerToSnapshot = mapFollowerToSnapshot.get(followerId);
+        if (followerToSnapshot == null) {
+            followerToSnapshot = new FollowerToSnapshot(snapshotBytes);
+            mapFollowerToSnapshot.put(followerId, followerToSnapshot);
+        }
+        ByteString nextChunk = followerToSnapshot.getNextChunk();
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("Leader's snapshot nextChunk size:{}", nextChunk.size());
+        }
+        return nextChunk;
+    }
+
+    private void sendHeartBeat() {
+        if (followers.size() > 0) {
+            sendAppendEntries();
+        }
+    }
+
+    private void stopHeartBeat() {
+        if (heartbeatSchedule != null && !heartbeatSchedule.isCancelled()) {
+            heartbeatSchedule.cancel();
+        }
+    }
+
+    private void scheduleHeartBeat(FiniteDuration interval) {
+        if(followers.size() == 0){
+            // Optimization - do not bother scheduling a heartbeat as there are
+            // no followers
+            return;
+        }
+
+        stopHeartBeat();
+
+        // Schedule a heartbeat. When the scheduler triggers a SendHeartbeat
+        // message is sent to itself.
+        // Scheduling the heartbeat only once here because heartbeats do not
+        // need to be sent if there are other messages being sent to the remote
+        // actor.
+        heartbeatSchedule = context.getActorSystem().scheduler().scheduleOnce(
+            interval, context.getActor(), new SendHeartBeat(),
+            context.getActorSystem().dispatcher(), context.getActor());
+    }
+
+    @Override
+    public void close() throws Exception {
+        stopHeartBeat();
+    }
+
+    @Override
+    public String getLeaderId() {
+        return context.getId();
+    }
+
+    protected boolean isLeaderIsolated() {
+        int minPresent = minIsolatedLeaderPeerCount;
+        for (FollowerLogInformation followerLogInformation : followerToLog.values()) {
+            if (followerLogInformation.isFollowerActive()) {
+                --minPresent;
+                if (minPresent == 0) {
+                    break;
+                }
+            }
+        }
+        return (minPresent != 0);
+    }
+
+    /**
+     * Encapsulates the snapshot bytestring and handles the logic of sending
+     * snapshot chunks
+     */
+    protected class FollowerToSnapshot {
+        private ByteString snapshotBytes;
+        private int offset = 0;
+        // the next snapshot chunk is sent only if the replyReceivedForOffset matches offset
+        private int replyReceivedForOffset;
+        // if replyStatus is false, the previous chunk is attempted
+        private boolean replyStatus = false;
+        private int chunkIndex;
+        private int totalChunks;
+
+        public FollowerToSnapshot(ByteString snapshotBytes) {
+            this.snapshotBytes = snapshotBytes;
+            replyReceivedForOffset = -1;
+            chunkIndex = 1;
+            int size = snapshotBytes.size();
+            totalChunks = ( size / context.getConfigParams().getSnapshotChunkSize()) +
+                ((size % context.getConfigParams().getSnapshotChunkSize()) > 0 ? 1 : 0);
+            if(LOG.isDebugEnabled()) {
+                LOG.debug("Snapshot {} bytes, total chunks to send:{}",
+                    size, totalChunks);
+            }
+        }
+
+        public ByteString getSnapshotBytes() {
+            return snapshotBytes;
+        }
+
+        public int incrementOffset() {
+            if(replyStatus) {
+                // if prev chunk failed, we would want to sent the same chunk again
+                offset = offset + context.getConfigParams().getSnapshotChunkSize();
+            }
+            return offset;
+        }
+
+        public int incrementChunkIndex() {
+            if (replyStatus) {
+                // if prev chunk failed, we would want to sent the same chunk again
+                chunkIndex =  chunkIndex + 1;
+            }
+            return chunkIndex;
+        }
+
+        public int getChunkIndex() {
+            return chunkIndex;
+        }
+
+        public int getTotalChunks() {
+            return totalChunks;
+        }
+
+        public boolean canSendNextChunk() {
+            // we only send a false if a chunk is sent but we have not received a reply yet
+            return replyReceivedForOffset == offset;
+        }
+
+        public boolean isLastChunk(int chunkIndex) {
+            return totalChunks == chunkIndex;
+        }
+
+        public void markSendStatus(boolean success) {
+            if (success) {
+                // if the chunk sent was successful
+                replyReceivedForOffset = offset;
+                replyStatus = true;
+            } else {
+                // if the chunk sent was failure
+                replyReceivedForOffset = offset;
+                replyStatus = false;
+            }
+        }
+
+        public ByteString getNextChunk() {
+            int snapshotLength = getSnapshotBytes().size();
+            int start = incrementOffset();
+            int size = context.getConfigParams().getSnapshotChunkSize();
+            if (context.getConfigParams().getSnapshotChunkSize() > snapshotLength) {
+                size = snapshotLength;
+            } else {
+                if ((start + context.getConfigParams().getSnapshotChunkSize()) > snapshotLength) {
+                    size = snapshotLength - start;
+                }
+            }
+
+            if(LOG.isDebugEnabled()) {
+                LOG.debug("length={}, offset={},size={}",
+                    snapshotLength, start, size);
+            }
+            return getSnapshotBytes().substring(start, start + size);
+
+        }
+    }
+
+    // called from example-actor for printing the follower-states
+    public String printFollowerStates() {
+        StringBuilder sb = new StringBuilder();
+        for(FollowerLogInformation followerLogInformation : followerToLog.values()) {
+            boolean isFollowerActive = followerLogInformation.isFollowerActive();
+            sb.append("{"+followerLogInformation.getId() + " state:" + isFollowerActive + "},");
+
+        }
+        return "[" + sb.toString() + "]";
+    }
+
+    @VisibleForTesting
+    void markFollowerActive(String followerId) {
+        followerToLog.get(followerId).markFollowerActive();
+    }
+}
index eed74bb..f235221 100644 (file)
@@ -390,7 +390,7 @@ public abstract class AbstractRaftActorBehavior implements RaftActorBehavior {
     }
 
     protected RaftActorBehavior switchBehavior(RaftActorBehavior behavior) {
-        LOG.info("Switching from behavior {} to {}", this.state(), behavior.state());
+        LOG.info("{} :- Switching from behavior {} to {}", context.getId(), this.state(), behavior.state());
         try {
             close();
         } catch (Exception e) {
@@ -399,4 +399,27 @@ public abstract class AbstractRaftActorBehavior implements RaftActorBehavior {
 
         return behavior;
     }
+
+    protected int getMajorityVoteCount(int numPeers) {
+        // Votes are required from a majority of the peers including self.
+        // The numMajority field therefore stores a calculated value
+        // of the number of votes required for this candidate to win an
+        // election based on it's known peers.
+        // If a peer was added during normal operation and raft replicas
+        // came to know about them then the new peer would also need to be
+        // taken into consideration when calculating this value.
+        // Here are some examples for what the numMajority would be for n
+        // peers
+        // 0 peers = 1 numMajority -: (0 + 1) / 2 + 1 = 1
+        // 2 peers = 2 numMajority -: (2 + 1) / 2 + 1 = 2
+        // 4 peers = 3 numMajority -: (4 + 1) / 2 + 1 = 3
+
+        int numMajority = 0;
+        if (numPeers > 0) {
+            int self = 1;
+            numMajority = (numPeers + self) / 2 + 1;
+        }
+        return numMajority;
+
+    }
 }
index 4a3e2c5..7024172 100644 (file)
@@ -56,25 +56,7 @@ public class Candidate extends AbstractRaftActorBehavior {
             LOG.debug("Election:Candidate has following peers: {}", peers);
         }
 
-        if(peers.size() > 0) {
-            // Votes are required from a majority of the peers including self.
-            // The votesRequired field therefore stores a calculated value
-            // of the number of votes required for this candidate to win an
-            // election based on it's known peers.
-            // If a peer was added during normal operation and raft replicas
-            // came to know about them then the new peer would also need to be
-            // taken into consideration when calculating this value.
-            // Here are some examples for what the votesRequired would be for n
-            // peers
-            // 0 peers = 1 votesRequired (0 + 1) / 2 + 1 = 1
-            // 2 peers = 2 votesRequired (2 + 1) / 2 + 1 = 2
-            // 4 peers = 3 votesRequired (4 + 1) / 2 + 1 = 3
-            int noOfPeers = peers.size();
-            int self = 1;
-            votesRequired = (noOfPeers + self) / 2 + 1;
-        } else {
-            votesRequired = 0;
-        }
+        votesRequired = getMajorityVoteCount(peers.size());
 
         startNewTerm();
         scheduleElection(electionDuration());
diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/IsolatedLeader.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/IsolatedLeader.java
new file mode 100644 (file)
index 0000000..4f77711
--- /dev/null
@@ -0,0 +1,52 @@
+/*
+ * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.raft.behaviors;
+
+import akka.actor.ActorRef;
+import org.opendaylight.controller.cluster.raft.RaftActorContext;
+import org.opendaylight.controller.cluster.raft.RaftState;
+import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply;
+
+/**
+ * Leader which is termed as isolated.
+ * <p/>
+ * If the reply from the majority of the followers  is not received then the leader changes its behavior
+ * to IsolatedLeader. An isolated leader may have followers and they would continue to receive replicated messages.
+ * <p/>
+ * A schedule is run, at an interval of (10 * Heartbeat-time-interval),  in the Leader
+ * to check if its isolated or not.
+ * <p/>
+ * In the Isolated Leader , on every AppendEntriesReply, we aggressively check if the leader is isolated.
+ * If no, then the state is switched back to Leader.
+ *
+ */
+public class IsolatedLeader extends AbstractLeader {
+    public IsolatedLeader(RaftActorContext context) {
+        super(context);
+    }
+
+    // we received an Append Entries reply, we should switch the Behavior to Leader
+    @Override
+    protected RaftActorBehavior handleAppendEntriesReply(ActorRef sender,
+        AppendEntriesReply appendEntriesReply) {
+        RaftActorBehavior ret = super.handleAppendEntriesReply(sender, appendEntriesReply);
+
+        // it can happen that this isolated leader interacts with a new leader in the cluster and
+        // changes its state to Follower, hence we only need to switch to Leader if the state is still Isolated
+        if (ret.state() == RaftState.IsolatedLeader && !isLeaderIsolated()) {
+            LOG.info("IsolatedLeader {} switching from IsolatedLeader to Leader", leaderId);
+            return switchBehavior(new Leader(context));
+        }
+        return ret;
+    }
+
+    @Override
+    public RaftState state() {
+        return RaftState.IsolatedLeader;
+    }
+}
index d83362b..0dd3900 100644 (file)
@@ -9,42 +9,14 @@
 package org.opendaylight.controller.cluster.raft.behaviors;
 
 import akka.actor.ActorRef;
-import akka.actor.ActorSelection;
 import akka.actor.Cancellable;
 import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Optional;
 import com.google.common.base.Preconditions;
-import com.google.protobuf.ByteString;
-import org.opendaylight.controller.cluster.raft.ClientRequestTracker;
-import org.opendaylight.controller.cluster.raft.ClientRequestTrackerImpl;
-import org.opendaylight.controller.cluster.raft.FollowerLogInformation;
-import org.opendaylight.controller.cluster.raft.FollowerLogInformationImpl;
 import org.opendaylight.controller.cluster.raft.RaftActorContext;
-import org.opendaylight.controller.cluster.raft.RaftState;
-import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
-import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshot;
 import org.opendaylight.controller.cluster.raft.base.messages.InitiateInstallSnapshot;
-import org.opendaylight.controller.cluster.raft.base.messages.Replicate;
-import org.opendaylight.controller.cluster.raft.base.messages.SendHeartBeat;
-import org.opendaylight.controller.cluster.raft.base.messages.SendInstallSnapshot;
-import org.opendaylight.controller.cluster.raft.messages.AppendEntries;
-import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply;
-import org.opendaylight.controller.cluster.raft.messages.InstallSnapshot;
-import org.opendaylight.controller.cluster.raft.messages.InstallSnapshotReply;
-import org.opendaylight.controller.cluster.raft.messages.RaftRPC;
-import org.opendaylight.controller.cluster.raft.messages.RequestVoteReply;
+import org.opendaylight.controller.cluster.raft.base.messages.IsolatedLeaderCheck;
 import scala.concurrent.duration.FiniteDuration;
 
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicLong;
-
 /**
  * The behavior of a RaftActor when it is in the Leader state
  * <p/>
@@ -67,546 +39,41 @@ import java.util.concurrent.atomic.AtomicLong;
  * of matchIndex[i] ≥ N, and log[N].term == currentTerm:
  * set commitIndex = N (§5.3, §5.4).
  */
-public class Leader extends AbstractRaftActorBehavior {
-
-
-    protected final Map<String, FollowerLogInformation> followerToLog = new HashMap<>();
-    protected final Map<String, FollowerToSnapshot> mapFollowerToSnapshot = new HashMap<>();
-
-    private final Set<String> followers;
-
-    private Cancellable heartbeatSchedule = null;
+public class Leader extends AbstractLeader {
     private Cancellable installSnapshotSchedule = null;
-
-    private List<ClientRequestTracker> trackerList = new ArrayList<>();
-
-    private final int minReplicationCount;
-
-    private Optional<ByteString> snapshot;
+    private Cancellable isolatedLeaderCheckSchedule = null;
 
     public Leader(RaftActorContext context) {
         super(context);
 
-        followers = context.getPeerAddresses().keySet();
-
-        for (String followerId : followers) {
-            FollowerLogInformation followerLogInformation =
-                new FollowerLogInformationImpl(followerId,
-                    new AtomicLong(context.getCommitIndex()),
-                    new AtomicLong(-1),
-                    context.getConfigParams().getElectionTimeOutInterval());
-
-            followerToLog.put(followerId, followerLogInformation);
-        }
-
-        if(LOG.isDebugEnabled()) {
-            LOG.debug("Election:Leader has following peers: {}", followers);
-        }
-
-        if (followers.size() > 0) {
-            minReplicationCount = (followers.size() + 1) / 2 + 1;
-        } else {
-            minReplicationCount = 0;
-        }
-
-        snapshot = Optional.absent();
-
-        // Immediately schedule a heartbeat
-        // Upon election: send initial empty AppendEntries RPCs
-        // (heartbeat) to each server; repeat during idle periods to
-        // prevent election timeouts (§5.2)
-        scheduleHeartBeat(new FiniteDuration(0, TimeUnit.SECONDS));
-
-        scheduleInstallSnapshotCheck(
-            new FiniteDuration(context.getConfigParams().getHeartBeatInterval().length() * 1000,
-                context.getConfigParams().getHeartBeatInterval().unit())
-        );
-
-    }
-
-    private Optional<ByteString> getSnapshot() {
-        return snapshot;
-    }
-
-    @VisibleForTesting
-    void setSnapshot(Optional<ByteString> snapshot) {
-        this.snapshot = snapshot;
-    }
-
-    @Override protected RaftActorBehavior handleAppendEntries(ActorRef sender,
-        AppendEntries appendEntries) {
-
-        if(LOG.isDebugEnabled()) {
-            LOG.debug(appendEntries.toString());
-        }
-
-        return this;
-    }
-
-    @Override protected RaftActorBehavior handleAppendEntriesReply(ActorRef sender,
-        AppendEntriesReply appendEntriesReply) {
-
-        if(! appendEntriesReply.isSuccess()) {
-            if(LOG.isDebugEnabled()) {
-                LOG.debug(appendEntriesReply.toString());
-            }
-        }
-
-        // Update the FollowerLogInformation
-        String followerId = appendEntriesReply.getFollowerId();
-        FollowerLogInformation followerLogInformation =
-            followerToLog.get(followerId);
-
-        if(followerLogInformation == null){
-            LOG.error("Unknown follower {}", followerId);
-            return this;
-        }
-
-        followerLogInformation.markFollowerActive();
-
-        if (appendEntriesReply.isSuccess()) {
-            followerLogInformation
-                .setMatchIndex(appendEntriesReply.getLogLastIndex());
-            followerLogInformation
-                .setNextIndex(appendEntriesReply.getLogLastIndex() + 1);
-        } else {
-
-            // TODO: When we find that the follower is out of sync with the
-            // Leader we simply decrement that followers next index by 1.
-            // Would it be possible to do better than this? The RAFT spec
-            // does not explicitly deal with it but may be something for us to
-            // think about
-
-            followerLogInformation.decrNextIndex();
-        }
-
-        // Now figure out if this reply warrants a change in the commitIndex
-        // If there exists an N such that N > commitIndex, a majority
-        // of matchIndex[i] ≥ N, and log[N].term == currentTerm:
-        // set commitIndex = N (§5.3, §5.4).
-        for (long N = context.getCommitIndex() + 1; ; N++) {
-            int replicatedCount = 1;
-
-            for (FollowerLogInformation info : followerToLog.values()) {
-                if (info.getMatchIndex().get() >= N) {
-                    replicatedCount++;
-                }
-            }
-
-            if (replicatedCount >= minReplicationCount) {
-                ReplicatedLogEntry replicatedLogEntry =
-                    context.getReplicatedLog().get(N);
-                if (replicatedLogEntry != null
-                    && replicatedLogEntry.getTerm()
-                    == currentTerm()) {
-                    context.setCommitIndex(N);
-                }
-            } else {
-                break;
-            }
-        }
-
-        // Apply the change to the state machine
-        if (context.getCommitIndex() > context.getLastApplied()) {
-            applyLogToStateMachine(context.getCommitIndex());
-        }
-
-        return this;
-    }
-
-    protected ClientRequestTracker removeClientRequestTracker(long logIndex) {
-
-        ClientRequestTracker toRemove = findClientRequestTracker(logIndex);
-        if(toRemove != null) {
-            trackerList.remove(toRemove);
-        }
-
-        return toRemove;
-    }
-
-    protected ClientRequestTracker findClientRequestTracker(long logIndex) {
-        for (ClientRequestTracker tracker : trackerList) {
-            if (tracker.getIndex() == logIndex) {
-                return tracker;
-            }
-        }
-
-        return null;
-    }
-
-    @Override protected RaftActorBehavior handleRequestVoteReply(ActorRef sender,
-        RequestVoteReply requestVoteReply) {
-        return this;
-    }
+        scheduleInstallSnapshotCheck(context.getConfigParams().getIsolatedCheckInterval());
 
-    @Override public RaftState state() {
-        return RaftState.Leader;
+        scheduleIsolatedLeaderCheck(
+            new FiniteDuration(context.getConfigParams().getHeartBeatInterval().length() * 10,
+                context.getConfigParams().getHeartBeatInterval().unit()));
     }
 
     @Override public RaftActorBehavior handleMessage(ActorRef sender, Object originalMessage) {
         Preconditions.checkNotNull(sender, "sender should not be null");
 
-        Object message = fromSerializableMessage(originalMessage);
-
-        if (message instanceof RaftRPC) {
-            RaftRPC rpc = (RaftRPC) message;
-            // If RPC request or response contains term T > currentTerm:
-            // set currentTerm = T, convert to follower (§5.1)
-            // This applies to all RPC messages and responses
-            if (rpc.getTerm() > context.getTermInformation().getCurrentTerm()) {
-                context.getTermInformation().updateAndPersist(rpc.getTerm(), null);
-
-                return switchBehavior(new Follower(context));
-            }
-        }
-
-        try {
-            if (message instanceof SendHeartBeat) {
-                sendHeartBeat();
-                return this;
-
-            } else if(message instanceof InitiateInstallSnapshot) {
-                installSnapshotIfNeeded();
-
-            } else if(message instanceof SendInstallSnapshot) {
-                // received from RaftActor
-                setSnapshot(Optional.of(((SendInstallSnapshot) message).getSnapshot()));
-                sendInstallSnapshot();
-
-            } else if (message instanceof Replicate) {
-                replicate((Replicate) message);
-
-            } else if (message instanceof InstallSnapshotReply){
-                handleInstallSnapshotReply(
-                    (InstallSnapshotReply) message);
-            }
-        } finally {
-            scheduleHeartBeat(context.getConfigParams().getHeartBeatInterval());
-        }
-
-        return super.handleMessage(sender, message);
-    }
-
-    private void handleInstallSnapshotReply(InstallSnapshotReply reply) {
-        String followerId = reply.getFollowerId();
-        FollowerToSnapshot followerToSnapshot = mapFollowerToSnapshot.get(followerId);
-        FollowerLogInformation followerLogInformation = followerToLog.get(followerId);
-        followerLogInformation.markFollowerActive();
-
-        if (followerToSnapshot != null &&
-            followerToSnapshot.getChunkIndex() == reply.getChunkIndex()) {
-
-            if (reply.isSuccess()) {
-                if(followerToSnapshot.isLastChunk(reply.getChunkIndex())) {
-                    //this was the last chunk reply
-                    if(LOG.isDebugEnabled()) {
-                        LOG.debug("InstallSnapshotReply received, " +
-                                "last chunk received, Chunk:{}. Follower:{} Setting nextIndex:{}",
-                            reply.getChunkIndex(), followerId,
-                            context.getReplicatedLog().getSnapshotIndex() + 1
-                        );
-                    }
-
-                    followerLogInformation.setMatchIndex(
-                        context.getReplicatedLog().getSnapshotIndex());
-                    followerLogInformation.setNextIndex(
-                        context.getReplicatedLog().getSnapshotIndex() + 1);
-                    mapFollowerToSnapshot.remove(followerId);
-
-                    if(LOG.isDebugEnabled()) {
-                        LOG.debug("followerToLog.get(followerId).getNextIndex().get()=" +
-                            followerToLog.get(followerId).getNextIndex().get());
-                    }
-
-                    if (mapFollowerToSnapshot.isEmpty()) {
-                        // once there are no pending followers receiving snapshots
-                        // we can remove snapshot from the memory
-                        setSnapshot(Optional.<ByteString>absent());
-                    }
-
-                } else {
-                    followerToSnapshot.markSendStatus(true);
-                }
-            } else {
-                LOG.info("InstallSnapshotReply received, " +
-                        "sending snapshot chunk failed, Will retry, Chunk:{}",
-                    reply.getChunkIndex()
-                );
-                followerToSnapshot.markSendStatus(false);
+        if (originalMessage instanceof IsolatedLeaderCheck) {
+            if (isLeaderIsolated()) {
+                LOG.info("At least {} followers need to be active, Switching {} from Leader to IsolatedLeader",
+                    minIsolatedLeaderPeerCount, leaderId);
+                return switchBehavior(new IsolatedLeader(context));
             }
-
-        } else {
-            LOG.error("ERROR!!" +
-                    "FollowerId in InstallSnapshotReply not known to Leader" +
-                    " or Chunk Index in InstallSnapshotReply not matching {} != {}",
-                followerToSnapshot.getChunkIndex(), reply.getChunkIndex()
-            );
-        }
-    }
-
-    private void replicate(Replicate replicate) {
-        long logIndex = replicate.getReplicatedLogEntry().getIndex();
-
-        if(LOG.isDebugEnabled()) {
-            LOG.debug("Replicate message {}", logIndex);
         }
 
-        // Create a tracker entry we will use this later to notify the
-        // client actor
-        trackerList.add(
-            new ClientRequestTrackerImpl(replicate.getClientActor(),
-                replicate.getIdentifier(),
-                logIndex)
-        );
-
-        if (followers.size() == 0) {
-            context.setCommitIndex(logIndex);
-            applyLogToStateMachine(logIndex);
-        } else {
-            sendAppendEntries();
-        }
+        return super.handleMessage(sender, originalMessage);
     }
 
-    private void sendAppendEntries() {
-        // Send an AppendEntries to all followers
-        for (String followerId : followers) {
-            ActorSelection followerActor = context.getPeerActorSelection(followerId);
-
-            if (followerActor != null) {
-                FollowerLogInformation followerLogInformation = followerToLog.get(followerId);
-                long followerNextIndex = followerLogInformation.getNextIndex().get();
-                boolean isFollowerActive = followerLogInformation.isFollowerActive();
-                List<ReplicatedLogEntry> entries = null;
-
-                if (mapFollowerToSnapshot.get(followerId) != null) {
-                    // if install snapshot is in process , then sent next chunk if possible
-                    if (isFollowerActive && mapFollowerToSnapshot.get(followerId).canSendNextChunk()) {
-                        sendSnapshotChunk(followerActor, followerId);
-                    } else {
-                        // we send a heartbeat even if we have not received a reply for the last chunk
-                        sendAppendEntriesToFollower(followerActor, followerNextIndex,
-                            Collections.<ReplicatedLogEntry>emptyList());
-                    }
-
-                } else {
-                    long leaderLastIndex = context.getReplicatedLog().lastIndex();
-                    long leaderSnapShotIndex = context.getReplicatedLog().getSnapshotIndex();
-
-                    if (isFollowerActive &&
-                        context.getReplicatedLog().isPresent(followerNextIndex)) {
-                        // FIXME : Sending one entry at a time
-                        entries = context.getReplicatedLog().getFrom(followerNextIndex, 1);
-
-                    } else if (isFollowerActive && followerNextIndex >= 0 &&
-                        leaderLastIndex >= followerNextIndex ) {
-                        // if the followers next index is not present in the leaders log, and
-                        // if the follower is just not starting and if leader's index is more than followers index
-                        // then snapshot should be sent
-
-                        if(LOG.isDebugEnabled()) {
-                            LOG.debug("InitiateInstallSnapshot to follower:{}," +
-                                    "follower-nextIndex:{}, leader-snapshot-index:{},  " +
-                                    "leader-last-index:{}", followerId,
-                                followerNextIndex, leaderSnapShotIndex, leaderLastIndex
-                            );
-                        }
-                        actor().tell(new InitiateInstallSnapshot(), actor());
-
-                        // we would want to sent AE as the capture snapshot might take time
-                        entries =  Collections.<ReplicatedLogEntry>emptyList();
-
-                    } else {
-                        //we send an AppendEntries, even if the follower is inactive
-                        // in-order to update the followers timestamp, in case it becomes active again
-                        entries =  Collections.<ReplicatedLogEntry>emptyList();
-                    }
-
-                    sendAppendEntriesToFollower(followerActor, followerNextIndex, entries);
-
-                }
-            }
-        }
-    }
-
-    private void sendAppendEntriesToFollower(ActorSelection followerActor, long followerNextIndex,
-        List<ReplicatedLogEntry> entries) {
-        followerActor.tell(
-            new AppendEntries(currentTerm(), context.getId(),
-                prevLogIndex(followerNextIndex),
-                prevLogTerm(followerNextIndex), entries,
-                context.getCommitIndex()).toSerializable(),
-            actor()
-        );
-    }
-
-    /**
-     * An installSnapshot is scheduled at a interval that is a multiple of
-     * a HEARTBEAT_INTERVAL. This is to avoid the need to check for installing
-     * snapshots at every heartbeat.
-     *
-     * Install Snapshot works as follows
-     * 1. Leader sends a InitiateInstallSnapshot message to self
-     * 2. Leader then initiates the capture snapshot by sending a CaptureSnapshot message to actor
-     * 3. RaftActor on receipt of the CaptureSnapshotReply (from Shard), stores the received snapshot in the replicated log
-     * and makes a call to Leader's handleMessage , with SendInstallSnapshot message.
-     * 4. Leader , picks the snapshot from im-mem ReplicatedLog and sends it in chunks to the Follower
-     * 5. On complete, Follower sends back a InstallSnapshotReply.
-     * 6. On receipt of the InstallSnapshotReply for the last chunk, Leader marks the install complete for that follower
-     * and replenishes the memory by deleting the snapshot in Replicated log.
-     *
-     */
-    private void installSnapshotIfNeeded() {
-        for (String followerId : followers) {
-            ActorSelection followerActor =
-                context.getPeerActorSelection(followerId);
-
-            if(followerActor != null) {
-                FollowerLogInformation followerLogInformation =
-                    followerToLog.get(followerId);
-
-                long nextIndex = followerLogInformation.getNextIndex().get();
-
-                if (!context.getReplicatedLog().isPresent(nextIndex) &&
-                    context.getReplicatedLog().isInSnapshot(nextIndex)) {
-                    LOG.info("{} follower needs a snapshot install", followerId);
-                    if (snapshot.isPresent()) {
-                        // if a snapshot is present in the memory, most likely another install is in progress
-                        // no need to capture snapshot
-                        sendSnapshotChunk(followerActor, followerId);
-
-                    } else {
-                        initiateCaptureSnapshot();
-                        //we just need 1 follower who would need snapshot to be installed.
-                        // when we have the snapshot captured, we would again check (in SendInstallSnapshot)
-                        // who needs an install and send to all who need
-                        break;
-                    }
-
-                }
-            }
-        }
-    }
-
-    // on every install snapshot, we try to capture the snapshot.
-    // Once a capture is going on, another one issued will get ignored by RaftActor.
-    private void initiateCaptureSnapshot() {
-        LOG.info("Initiating Snapshot Capture to Install Snapshot, Leader:{}", getLeaderId());
-        ReplicatedLogEntry lastAppliedEntry = context.getReplicatedLog().get(context.getLastApplied());
-        long lastAppliedIndex = -1;
-        long lastAppliedTerm = -1;
-
-        if (lastAppliedEntry != null) {
-            lastAppliedIndex = lastAppliedEntry.getIndex();
-            lastAppliedTerm = lastAppliedEntry.getTerm();
-        } else if (context.getReplicatedLog().getSnapshotIndex() > -1)  {
-            lastAppliedIndex = context.getReplicatedLog().getSnapshotIndex();
-            lastAppliedTerm = context.getReplicatedLog().getSnapshotTerm();
-        }
-
-        boolean isInstallSnapshotInitiated = true;
-        actor().tell(new CaptureSnapshot(lastIndex(), lastTerm(),
-                lastAppliedIndex, lastAppliedTerm, isInstallSnapshotInitiated),
-            actor());
-    }
-
-
-    private void sendInstallSnapshot() {
-        for (String followerId : followers) {
-            ActorSelection followerActor = context.getPeerActorSelection(followerId);
-
-            if(followerActor != null) {
-                FollowerLogInformation followerLogInformation = followerToLog.get(followerId);
-                long nextIndex = followerLogInformation.getNextIndex().get();
-
-                if (!context.getReplicatedLog().isPresent(nextIndex) &&
-                    context.getReplicatedLog().isInSnapshot(nextIndex)) {
-                    sendSnapshotChunk(followerActor, followerId);
-                }
-            }
-        }
-    }
-
-    /**
-     *  Sends a snapshot chunk to a given follower
-     *  InstallSnapshot should qualify as a heartbeat too.
-     */
-    private void sendSnapshotChunk(ActorSelection followerActor, String followerId) {
-        try {
-            if (snapshot.isPresent()) {
-                followerActor.tell(
-                    new InstallSnapshot(currentTerm(), context.getId(),
-                        context.getReplicatedLog().getSnapshotIndex(),
-                        context.getReplicatedLog().getSnapshotTerm(),
-                        getNextSnapshotChunk(followerId,snapshot.get()),
-                        mapFollowerToSnapshot.get(followerId).incrementChunkIndex(),
-                        mapFollowerToSnapshot.get(followerId).getTotalChunks()
-                    ).toSerializable(),
-                    actor()
-                );
-                LOG.info("InstallSnapshot sent to follower {}, Chunk: {}/{}",
-                    followerActor.path(), mapFollowerToSnapshot.get(followerId).getChunkIndex(),
-                    mapFollowerToSnapshot.get(followerId).getTotalChunks());
-            }
-        } catch (IOException e) {
-            LOG.error(e, "InstallSnapshot failed for Leader.");
-        }
-    }
-
-    /**
-     * Acccepts snaphot as ByteString, enters into map for future chunks
-     * creates and return a ByteString chunk
-     */
-    private ByteString getNextSnapshotChunk(String followerId, ByteString snapshotBytes) throws IOException {
-        FollowerToSnapshot followerToSnapshot = mapFollowerToSnapshot.get(followerId);
-        if (followerToSnapshot == null) {
-            followerToSnapshot = new FollowerToSnapshot(snapshotBytes);
-            mapFollowerToSnapshot.put(followerId, followerToSnapshot);
-        }
-        ByteString nextChunk = followerToSnapshot.getNextChunk();
-        if (LOG.isDebugEnabled()) {
-            LOG.debug("Leader's snapshot nextChunk size:{}", nextChunk.size());
-        }
-        return nextChunk;
-    }
-
-    private void sendHeartBeat() {
-        if (followers.size() > 0) {
-            sendAppendEntries();
-        }
-    }
-
-    private void stopHeartBeat() {
-        if (heartbeatSchedule != null && !heartbeatSchedule.isCancelled()) {
-            heartbeatSchedule.cancel();
-        }
-    }
-
-    private void stopInstallSnapshotSchedule() {
+    protected void stopInstallSnapshotSchedule() {
         if (installSnapshotSchedule != null && !installSnapshotSchedule.isCancelled()) {
             installSnapshotSchedule.cancel();
         }
     }
 
-    private void scheduleHeartBeat(FiniteDuration interval) {
-        if(followers.size() == 0){
-            // Optimization - do not bother scheduling a heartbeat as there are
-            // no followers
-            return;
-        }
-
-        stopHeartBeat();
-
-        // Schedule a heartbeat. When the scheduler triggers a SendHeartbeat
-        // message is sent to itself.
-        // Scheduling the heartbeat only once here because heartbeats do not
-        // need to be sent if there are other messages being sent to the remote
-        // actor.
-        heartbeatSchedule = context.getActorSystem().scheduler().scheduleOnce(
-            interval, context.getActor(), new SendHeartBeat(),
-            context.getActorSystem().dispatcher(), context.getActor());
-    }
-
-    private void scheduleInstallSnapshotCheck(FiniteDuration interval) {
+    protected void scheduleInstallSnapshotCheck(FiniteDuration interval) {
         if(followers.size() == 0){
             // Optimization - do not bother scheduling a heartbeat as there are
             // no followers
@@ -624,122 +91,22 @@ public class Leader extends AbstractRaftActorBehavior {
                 context.getActorSystem().dispatcher(), context.getActor());
     }
 
-
-
-    @Override public void close() throws Exception {
-        stopHeartBeat();
-    }
-
-    @Override public String getLeaderId() {
-        return context.getId();
-    }
-
-    /**
-     * Encapsulates the snapshot bytestring and handles the logic of sending
-     * snapshot chunks
-     */
-    protected class FollowerToSnapshot {
-        private ByteString snapshotBytes;
-        private int offset = 0;
-        // the next snapshot chunk is sent only if the replyReceivedForOffset matches offset
-        private int replyReceivedForOffset;
-        // if replyStatus is false, the previous chunk is attempted
-        private boolean replyStatus = false;
-        private int chunkIndex;
-        private int totalChunks;
-
-        public FollowerToSnapshot(ByteString snapshotBytes) {
-            this.snapshotBytes = snapshotBytes;
-            replyReceivedForOffset = -1;
-            chunkIndex = 1;
-            int size = snapshotBytes.size();
-            totalChunks = ( size / context.getConfigParams().getSnapshotChunkSize()) +
-                ((size % context.getConfigParams().getSnapshotChunkSize()) > 0 ? 1 : 0);
-            if(LOG.isDebugEnabled()) {
-                LOG.debug("Snapshot {} bytes, total chunks to send:{}",
-                    size, totalChunks);
-            }
-        }
-
-        public ByteString getSnapshotBytes() {
-            return snapshotBytes;
-        }
-
-        public int incrementOffset() {
-            if(replyStatus) {
-                // if prev chunk failed, we would want to sent the same chunk again
-                offset = offset + context.getConfigParams().getSnapshotChunkSize();
-            }
-            return offset;
-        }
-
-        public int incrementChunkIndex() {
-            if (replyStatus) {
-                // if prev chunk failed, we would want to sent the same chunk again
-                chunkIndex =  chunkIndex + 1;
-            }
-            return chunkIndex;
-        }
-
-        public int getChunkIndex() {
-            return chunkIndex;
-        }
-
-        public int getTotalChunks() {
-            return totalChunks;
-        }
-
-        public boolean canSendNextChunk() {
-            // we only send a false if a chunk is sent but we have not received a reply yet
-            return replyReceivedForOffset == offset;
-        }
-
-        public boolean isLastChunk(int chunkIndex) {
-            return totalChunks == chunkIndex;
-        }
-
-        public void markSendStatus(boolean success) {
-            if (success) {
-                // if the chunk sent was successful
-                replyReceivedForOffset = offset;
-                replyStatus = true;
-            } else {
-                // if the chunk sent was failure
-                replyReceivedForOffset = offset;
-                replyStatus = false;
-            }
-        }
-
-        public ByteString getNextChunk() {
-            int snapshotLength = getSnapshotBytes().size();
-            int start = incrementOffset();
-            int size = context.getConfigParams().getSnapshotChunkSize();
-            if (context.getConfigParams().getSnapshotChunkSize() > snapshotLength) {
-                size = snapshotLength;
-            } else {
-                if ((start + context.getConfigParams().getSnapshotChunkSize()) > snapshotLength) {
-                    size = snapshotLength - start;
-                }
-            }
-
-            if(LOG.isDebugEnabled()) {
-                LOG.debug("length={}, offset={},size={}",
-                    snapshotLength, start, size);
-            }
-            return getSnapshotBytes().substring(start, start + size);
-
+    protected void stopIsolatedLeaderCheckSchedule() {
+        if (isolatedLeaderCheckSchedule != null && !isolatedLeaderCheckSchedule.isCancelled()) {
+            isolatedLeaderCheckSchedule.cancel();
         }
     }
 
-    // called from example-actor for printing the follower-states
-    public String printFollowerStates() {
-        StringBuilder sb = new StringBuilder();
-        for(FollowerLogInformation followerLogInformation : followerToLog.values()) {
-            boolean isFollowerActive = followerLogInformation.isFollowerActive();
-            sb.append("{"+followerLogInformation.getId() + " state:" + isFollowerActive + "},");
+    protected void scheduleIsolatedLeaderCheck(FiniteDuration isolatedCheckInterval) {
+        isolatedLeaderCheckSchedule = context.getActorSystem().scheduler().schedule(isolatedCheckInterval, isolatedCheckInterval,
+            context.getActor(), new IsolatedLeaderCheck(),
+            context.getActorSystem().dispatcher(), context.getActor());
+    }
 
-        }
-        return "[" + sb.toString() + "]";
+    @Override public void close() throws Exception {
+        stopInstallSnapshotSchedule();
+        stopIsolatedLeaderCheckSchedule();
+        super.close();
     }
 
     @VisibleForTesting
diff --git a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/IsolatedLeaderTest.java b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/IsolatedLeaderTest.java
new file mode 100644 (file)
index 0000000..708068a
--- /dev/null
@@ -0,0 +1,141 @@
+/*
+ * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.raft.behaviors;
+
+import akka.actor.ActorRef;
+import akka.actor.Props;
+import akka.testkit.JavaTestKit;
+import java.util.HashMap;
+import java.util.Map;
+import org.junit.Test;
+import org.opendaylight.controller.cluster.raft.MockRaftActorContext;
+import org.opendaylight.controller.cluster.raft.RaftActorContext;
+import org.opendaylight.controller.cluster.raft.RaftState;
+import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply;
+import org.opendaylight.controller.cluster.raft.utils.DoNothingActor;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+public class IsolatedLeaderTest  extends AbstractRaftActorBehaviorTest {
+
+    private ActorRef leaderActor =
+        getSystem().actorOf(Props.create(DoNothingActor.class));
+
+    private ActorRef senderActor =
+        getSystem().actorOf(Props.create(DoNothingActor.class));
+
+    @Override
+    protected RaftActorBehavior createBehavior(
+        RaftActorContext actorContext) {
+        return new Leader(actorContext);
+    }
+
+    @Override
+    protected RaftActorContext createActorContext() {
+        return createActorContext(leaderActor);
+    }
+
+
+    @Test
+    public void testHandleMessageWithThreeMembers() {
+        new JavaTestKit(getSystem()) {{
+            String followerAddress1 = "akka://test/user/$a";
+            String followerAddress2 = "akka://test/user/$b";
+
+            MockRaftActorContext leaderActorContext = (MockRaftActorContext) createActorContext();
+            Map<String, String> peerAddresses = new HashMap<>();
+            peerAddresses.put("follower-1", followerAddress1);
+            peerAddresses.put("follower-2", followerAddress2);
+            leaderActorContext.setPeerAddresses(peerAddresses);
+
+            IsolatedLeader isolatedLeader = new IsolatedLeader(leaderActorContext);
+            assertTrue(isolatedLeader.state() == RaftState.IsolatedLeader);
+
+            // in a 3 node cluster, even if 1 follower is returns a reply, the isolatedLeader is not isolated
+            RaftActorBehavior behavior = isolatedLeader.handleMessage(senderActor,
+                new AppendEntriesReply("follower-1", isolatedLeader.lastTerm() - 1, true,
+                    isolatedLeader.lastIndex() - 1, isolatedLeader.lastTerm() - 1));
+
+            assertEquals(RaftState.Leader, behavior.state());
+
+            behavior = isolatedLeader.handleMessage(senderActor,
+                new AppendEntriesReply("follower-2", isolatedLeader.lastTerm() - 1, true,
+                    isolatedLeader.lastIndex() -1, isolatedLeader.lastTerm() -1 ));
+
+            assertEquals(RaftState.Leader, behavior.state());
+        }};
+    }
+
+    @Test
+    public void testHandleMessageWithFiveMembers() {
+        new JavaTestKit(getSystem()) {{
+
+            String followerAddress1 = "akka://test/user/$a";
+            String followerAddress2 = "akka://test/user/$b";
+            String followerAddress3 = "akka://test/user/$c";
+            String followerAddress4 = "akka://test/user/$d";
+
+            MockRaftActorContext leaderActorContext = (MockRaftActorContext) createActorContext();
+            Map<String, String> peerAddresses = new HashMap<>();
+            peerAddresses.put("follower-1", followerAddress1);
+            peerAddresses.put("follower-2", followerAddress2);
+            peerAddresses.put("follower-3", followerAddress3);
+            peerAddresses.put("follower-4", followerAddress4);
+            leaderActorContext.setPeerAddresses(peerAddresses);
+
+            IsolatedLeader isolatedLeader = new IsolatedLeader(leaderActorContext);
+            assertEquals(RaftState.IsolatedLeader, isolatedLeader.state());
+
+            // in a 5 member cluster, atleast 2 followers need to be active and return a reply
+            RaftActorBehavior behavior = isolatedLeader.handleMessage(senderActor,
+                new AppendEntriesReply("follower-1", isolatedLeader.lastTerm() - 1, true,
+                    isolatedLeader.lastIndex() -1, isolatedLeader.lastTerm() -1 ));
+
+            assertEquals(RaftState.IsolatedLeader, behavior.state());
+
+            behavior = isolatedLeader.handleMessage(senderActor,
+                new AppendEntriesReply("follower-2", isolatedLeader.lastTerm() - 1, true,
+                    isolatedLeader.lastIndex() -1, isolatedLeader.lastTerm() -1 ));
+
+            assertEquals(RaftState.Leader, behavior.state());
+
+            behavior = isolatedLeader.handleMessage(senderActor,
+                new AppendEntriesReply("follower-3", isolatedLeader.lastTerm() - 1, true,
+                    isolatedLeader.lastIndex() -1, isolatedLeader.lastTerm() -1 ));
+
+            assertEquals(RaftState.Leader, behavior.state());
+        }};
+    }
+
+    @Test
+    public void testHandleMessageFromAnotherLeader() {
+        new JavaTestKit(getSystem()) {{
+            String followerAddress1 = "akka://test/user/$a";
+            String followerAddress2 = "akka://test/user/$b";
+
+            MockRaftActorContext leaderActorContext = (MockRaftActorContext) createActorContext();
+            Map<String, String> peerAddresses = new HashMap<>();
+            peerAddresses.put("follower-1", followerAddress1);
+            peerAddresses.put("follower-2", followerAddress2);
+            leaderActorContext.setPeerAddresses(peerAddresses);
+
+            IsolatedLeader isolatedLeader = new IsolatedLeader(leaderActorContext);
+            assertTrue(isolatedLeader.state() == RaftState.IsolatedLeader);
+
+            // if an append-entries reply is received by the isolated-leader, and that reply
+            // has a term  > than its own term, then IsolatedLeader switches to Follower
+            // bowing itself to another leader in the cluster
+            RaftActorBehavior behavior = isolatedLeader.handleMessage(senderActor,
+                new AppendEntriesReply("follower-1", isolatedLeader.lastTerm() + 1, true,
+                    isolatedLeader.lastIndex() + 1, isolatedLeader.lastTerm() + 1));
+
+            assertEquals(RaftState.Follower, behavior.state());
+        }};
+
+    }
+}
index 168eb3e..6b534de 100644 (file)
@@ -1,10 +1,20 @@
 package org.opendaylight.controller.cluster.raft.behaviors;
 
 import akka.actor.ActorRef;
+import akka.actor.PoisonPill;
 import akka.actor.Props;
+import akka.actor.Terminated;
 import akka.testkit.JavaTestKit;
 import com.google.common.base.Optional;
+import com.google.common.util.concurrent.Uninterruptibles;
 import com.google.protobuf.ByteString;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.ObjectOutputStream;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
 import org.junit.Assert;
 import org.junit.Test;
 import org.opendaylight.controller.cluster.raft.DefaultConfigParamsImpl;
@@ -18,6 +28,7 @@ import org.opendaylight.controller.cluster.raft.base.messages.ApplyLogEntries;
 import org.opendaylight.controller.cluster.raft.base.messages.ApplyState;
 import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshot;
 import org.opendaylight.controller.cluster.raft.base.messages.InitiateInstallSnapshot;
+import org.opendaylight.controller.cluster.raft.base.messages.IsolatedLeaderCheck;
 import org.opendaylight.controller.cluster.raft.base.messages.Replicate;
 import org.opendaylight.controller.cluster.raft.base.messages.SendHeartBeat;
 import org.opendaylight.controller.cluster.raft.base.messages.SendInstallSnapshot;
@@ -31,15 +42,6 @@ import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor;
 import org.opendaylight.controller.protobuff.messages.cluster.raft.AppendEntriesMessages;
 import org.opendaylight.controller.protobuff.messages.cluster.raft.InstallSnapshotMessages;
 import scala.concurrent.duration.FiniteDuration;
-
-import java.io.ByteArrayOutputStream;
-import java.io.IOException;
-import java.io.ObjectOutputStream;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.TimeUnit;
-
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertTrue;
@@ -941,10 +943,82 @@ public class LeaderTest extends AbstractRaftActorBehaviorTest {
                 raftActorBehavior = leader.handleRequestVoteReply(getRef(), new RequestVoteReply(1, false));
 
                 assertEquals(RaftState.Leader, raftActorBehavior.state());
+            }};
+    }
 
+    @Test
+    public void testIsolatedLeaderCheckNoFollowers() {
+        new JavaTestKit(getSystem()) {{
+            ActorRef leaderActor = getTestActor();
 
-            }};
+            MockRaftActorContext leaderActorContext =
+                new MockRaftActorContext("leader", getSystem(), leaderActor);
+
+            Map<String, String> peerAddresses = new HashMap<>();
+            leaderActorContext.setPeerAddresses(peerAddresses);
 
+            Leader leader = new Leader(leaderActorContext);
+            RaftActorBehavior behavior = leader.handleMessage(leaderActor, new IsolatedLeaderCheck());
+            Assert.assertTrue(behavior instanceof Leader);
+        }};
+    }
+
+    @Test
+    public void testIsolatedLeaderCheckTwoFollowers() throws Exception {
+        new JavaTestKit(getSystem()) {{
+
+            ActorRef followerActor1 = getTestActor();
+            ActorRef followerActor2 = getTestActor();
+
+            MockRaftActorContext leaderActorContext = (MockRaftActorContext) createActorContext();
+
+            Map<String, String> peerAddresses = new HashMap<>();
+            peerAddresses.put("follower-1", followerActor1.path().toString());
+            peerAddresses.put("follower-2", followerActor2.path().toString());
+
+            leaderActorContext.setPeerAddresses(peerAddresses);
+
+            Leader leader = new Leader(leaderActorContext);
+            leader.stopIsolatedLeaderCheckSchedule();
+
+            leader.markFollowerActive("follower-1");
+            leader.markFollowerActive("follower-2");
+            RaftActorBehavior behavior = leader.handleMessage(leaderActor, new IsolatedLeaderCheck());
+            Assert.assertTrue("Behavior not instance of Leader when all followers are active",
+                behavior instanceof Leader);
+
+            // kill 1 follower and verify if that got killed
+            final JavaTestKit probe = new JavaTestKit(getSystem());
+            probe.watch(followerActor1);
+            followerActor1.tell(PoisonPill.getInstance(), ActorRef.noSender());
+            final Terminated termMsg1 = probe.expectMsgClass(Terminated.class);
+            assertEquals(termMsg1.getActor(), followerActor1);
+
+            //sleep enough for all the follower stopwatches to lapse
+            Uninterruptibles.sleepUninterruptibly(leaderActorContext.getConfigParams().
+                getElectionTimeOutInterval().toMillis(), TimeUnit.MILLISECONDS);
+
+            leader.markFollowerActive("follower-2");
+            behavior = leader.handleMessage(leaderActor, new IsolatedLeaderCheck());
+            Assert.assertTrue("Behavior not instance of Leader when majority of followers are active",
+                behavior instanceof Leader);
+
+            // kill 2nd follower and leader should change to Isolated leader
+            followerActor2.tell(PoisonPill.getInstance(), null);
+            probe.watch(followerActor2);
+            followerActor2.tell(PoisonPill.getInstance(), ActorRef.noSender());
+            final Terminated termMsg2 = probe.expectMsgClass(Terminated.class);
+            assertEquals(termMsg2.getActor(), followerActor2);
+
+            //sleep enough for the remaining the follower stopwatches to lapse
+            Uninterruptibles.sleepUninterruptibly(leaderActorContext.getConfigParams().
+                getElectionTimeOutInterval().toMillis(), TimeUnit.MILLISECONDS);
+
+            behavior = leader.handleMessage(leaderActor, new IsolatedLeaderCheck());
+            Assert.assertTrue("Behavior not instance of IsolatedLeader when majority followers are inactive",
+                behavior instanceof IsolatedLeader);
+
+        }};
     }
 
     class MockLeader extends Leader {
index 2048bde..e18c00e 100644 (file)
@@ -119,6 +119,7 @@ public class DatastoreContext {
         private Timeout shardLeaderElectionTimeout = new Timeout(30, TimeUnit.SECONDS);
         private boolean persistent = true;
         private ConfigurationReader configurationReader = new FileConfigurationReader();
+        private int shardIsolatedLeaderCheckIntervalInMillis = shardHeartbeatIntervalInMillis * 10;
 
         public Builder shardTransactionIdleTimeout(Duration shardTransactionIdleTimeout) {
             this.shardTransactionIdleTimeout = shardTransactionIdleTimeout;
@@ -180,18 +181,24 @@ public class DatastoreContext {
             return this;
         }
 
-
         public Builder persistent(boolean persistent){
             this.persistent = persistent;
             return this;
         }
 
+        public Builder shardIsolatedLeaderCheckIntervalInMillis(int shardIsolatedLeaderCheckIntervalInMillis) {
+            this.shardIsolatedLeaderCheckIntervalInMillis = shardIsolatedLeaderCheckIntervalInMillis;
+            return this;
+        }
+
         public DatastoreContext build() {
             DefaultConfigParamsImpl raftConfig = new DefaultConfigParamsImpl();
             raftConfig.setHeartBeatInterval(new FiniteDuration(shardHeartbeatIntervalInMillis,
                     TimeUnit.MILLISECONDS));
             raftConfig.setJournalRecoveryLogBatchSize(shardJournalRecoveryLogBatchSize);
             raftConfig.setSnapshotBatchCount(shardSnapshotBatchCount);
+            raftConfig.setIsolatedLeaderCheckInterval(
+                new FiniteDuration(shardIsolatedLeaderCheckIntervalInMillis, TimeUnit.MILLISECONDS));
 
             return new DatastoreContext(dataStoreProperties, raftConfig, dataStoreMXBeanType,
                     operationTimeoutInSeconds, shardTransactionIdleTimeout,
index 2f3fbdc..8eb653a 100644 (file)
@@ -63,6 +63,8 @@ public class DistributedConfigDataStoreProviderModule extends
                 .shardTransactionCommitQueueCapacity(
                         props.getShardTransactionCommitQueueCapacity().getValue().intValue())
                 .persistent(props.getPersistent().booleanValue())
+                .shardIsolatedLeaderCheckIntervalInMillis(
+                    props.getShardIsolatedLeaderCheckIntervalInMillis().getValue())
                 .build();
 
         return DistributedDataStoreFactory.createInstance("config", getConfigSchemaServiceDependency(),
index ecb3a91..2a12aff 100644 (file)
@@ -63,6 +63,8 @@ public class DistributedOperationalDataStoreProviderModule extends
                 .shardTransactionCommitQueueCapacity(
                         props.getShardTransactionCommitQueueCapacity().getValue().intValue())
                 .persistent(props.getPersistent().booleanValue())
+                .shardIsolatedLeaderCheckIntervalInMillis(
+                    props.getShardIsolatedLeaderCheckIntervalInMillis().getValue())
                 .build();
 
         return DistributedDataStoreFactory.createInstance("operational",
index 995e98f..4d3d438 100644 (file)
@@ -153,6 +153,13 @@ module distributed-datastore-provider {
             type boolean;
             description "Enable or disable data persistence";
          }
+
+        leaf shard-isolated-leader-check-interval-in-millis {
+            default 5000;
+            type heartbeat-interval-type;
+            description "The interval at which the leader of the shard will check if its majority
+                        followers are active and term itself as isolated";
+        }
     }
 
     // Augments the 'configuration' choice node under modules/module.