Test RaftActor using a simple program
[controller.git] / opendaylight / md-sal / sal-akka-raft / src / main / java / org / opendaylight / controller / cluster / raft / behaviors / Leader.java
index cfefd21c568029d1e4ce9eed3e5c139724fb5498..c06ee9bd2b836c784c4adb75e7f318875ebf22c3 100644 (file)
@@ -12,17 +12,22 @@ import akka.actor.ActorRef;
 import akka.actor.ActorSelection;
 import akka.actor.Cancellable;
 import com.google.common.base.Preconditions;
+import org.opendaylight.controller.cluster.raft.ClientRequestTracker;
+import org.opendaylight.controller.cluster.raft.ClientRequestTrackerImpl;
 import org.opendaylight.controller.cluster.raft.FollowerLogInformation;
 import org.opendaylight.controller.cluster.raft.FollowerLogInformationImpl;
 import org.opendaylight.controller.cluster.raft.RaftActorContext;
 import org.opendaylight.controller.cluster.raft.RaftState;
+import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
+import org.opendaylight.controller.cluster.raft.internal.messages.ApplyState;
+import org.opendaylight.controller.cluster.raft.internal.messages.Replicate;
 import org.opendaylight.controller.cluster.raft.internal.messages.SendHeartBeat;
 import org.opendaylight.controller.cluster.raft.messages.AppendEntries;
 import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply;
 import org.opendaylight.controller.cluster.raft.messages.RequestVoteReply;
 import scala.concurrent.duration.FiniteDuration;
 
-import java.util.Collections;
+import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -61,21 +66,37 @@ public class Leader extends AbstractRaftActorBehavior {
 
     private Cancellable heartbeatCancel = null;
 
-    public Leader(RaftActorContext context, List<String> followerPaths) {
+    private List<ClientRequestTracker> trackerList = new ArrayList<>();
+
+    private final int minReplicationCount;
+
+    public Leader(RaftActorContext context) {
         super(context);
 
-        for (String followerPath : followerPaths) {
+        if(lastIndex() >= 0) {
+            context.setCommitIndex(lastIndex());
+        }
+
+        for (String followerId : context.getPeerAddresses().keySet()) {
             FollowerLogInformation followerLogInformation =
-                new FollowerLogInformationImpl(followerPath,
-                    new AtomicLong(0),
-                    new AtomicLong(0));
+                new FollowerLogInformationImpl(followerId,
+                    new AtomicLong(lastIndex()),
+                    new AtomicLong(-1));
 
-            followerToActor.put(followerPath,
-                context.actorSelection(followerLogInformation.getId()));
-            followerToLog.put(followerPath, followerLogInformation);
+            followerToActor.put(followerId,
+                context.actorSelection(context.getPeerAddress(followerId)));
+
+            followerToLog.put(followerId, followerLogInformation);
+
+        }
 
+        if (followerToActor.size() > 0) {
+            minReplicationCount = (followerToActor.size() + 1) / 2 + 1;
+        } else {
+            minReplicationCount = 0;
         }
 
+
         // Immediately schedule a heartbeat
         // Upon election: send initial empty AppendEntries RPCs
         // (heartbeat) to each server; repeat during idle periods to
@@ -87,47 +108,184 @@ public class Leader extends AbstractRaftActorBehavior {
 
     @Override protected RaftState handleAppendEntries(ActorRef sender,
         AppendEntries appendEntries, RaftState suggestedState) {
+
+        context.getLogger()
+            .error("An unexpected AppendEntries received in state " + state());
+
         return suggestedState;
     }
 
     @Override protected RaftState handleAppendEntriesReply(ActorRef sender,
         AppendEntriesReply appendEntriesReply, RaftState suggestedState) {
+
+        // Do not take any other action since a behavior change is coming
+        if (suggestedState != state())
+            return suggestedState;
+
+        // Update the FollowerLogInformation
+        String followerId = appendEntriesReply.getFollowerId();
+        FollowerLogInformation followerLogInformation =
+            followerToLog.get(followerId);
+        if (appendEntriesReply.isSuccess()) {
+            followerLogInformation
+                .setMatchIndex(appendEntriesReply.getLogLastIndex());
+            followerLogInformation
+                .setNextIndex(appendEntriesReply.getLogLastIndex() + 1);
+        } else {
+            followerLogInformation.decrNextIndex();
+        }
+
+        // Now figure out if this reply warrants a change in the commitIndex
+        // If there exists an N such that N > commitIndex, a majority
+        // of matchIndex[i] ≥ N, and log[N].term == currentTerm:
+        // set commitIndex = N (§5.3, §5.4).
+        for (long N = context.getCommitIndex() + 1; ; N++) {
+            int replicatedCount = 1;
+
+            for (FollowerLogInformation info : followerToLog.values()) {
+                if (info.getMatchIndex().get() >= N) {
+                    replicatedCount++;
+                }
+            }
+
+            if (replicatedCount >= minReplicationCount){
+                ReplicatedLogEntry replicatedLogEntry =
+                    context.getReplicatedLog().get(N);
+                if (replicatedLogEntry != null
+                    && replicatedLogEntry.getTerm()
+                    == currentTerm()) {
+                    context.setCommitIndex(N);
+                }
+            } else {
+                break;
+            }
+        }
+
+        if(context.getCommitIndex() > context.getLastApplied()){
+            applyLogToStateMachine(context.getCommitIndex());
+        }
+
         return suggestedState;
     }
 
+    protected ClientRequestTracker findClientRequestTracker(long logIndex) {
+        for (ClientRequestTracker tracker : trackerList) {
+            if (tracker.getIndex() == logIndex) {
+                return tracker;
+            }
+        }
+
+        return null;
+    }
+
     @Override protected RaftState handleRequestVoteReply(ActorRef sender,
         RequestVoteReply requestVoteReply, RaftState suggestedState) {
         return suggestedState;
     }
 
-    @Override protected RaftState state() {
+    @Override public RaftState state() {
         return RaftState.Leader;
     }
 
     @Override public RaftState handleMessage(ActorRef sender, Object message) {
         Preconditions.checkNotNull(sender, "sender should not be null");
 
-        scheduleHeartBeat(HEART_BEAT_INTERVAL);
-
-        if (message instanceof SendHeartBeat) {
-            for (ActorSelection follower : followerToActor.values()) {
-                follower.tell(new AppendEntries(
-                    context.getTermInformation().getCurrentTerm(),
-                    context.getId(),
-                    context.getReplicatedLog().last().getIndex(),
-                    context.getReplicatedLog().last().getTerm(),
-                    Collections.EMPTY_LIST, context.getCommitIndex()),
-                    context.getActor());
+        try {
+            if (message instanceof SendHeartBeat) {
+                return sendHeartBeat();
+            } else if (message instanceof Replicate) {
+
+                Replicate replicate = (Replicate) message;
+                long logIndex = replicate.getReplicatedLogEntry().getIndex();
+
+                context.getLogger().debug("Replicate message " + logIndex);
+
+                if (followerToActor.size() == 0) {
+                    context.setCommitIndex(
+                        replicate.getReplicatedLogEntry().getIndex());
+
+                    context.getActor()
+                        .tell(new ApplyState(replicate.getClientActor(),
+                                replicate.getIdentifier(),
+                                replicate.getReplicatedLogEntry()),
+                            context.getActor()
+                        );
+                } else {
+
+                    trackerList.add(
+                        new ClientRequestTrackerImpl(replicate.getClientActor(),
+                            replicate.getIdentifier(),
+                            logIndex)
+                    );
+
+                    ReplicatedLogEntry prevEntry =
+                        context.getReplicatedLog().get(lastIndex() - 1);
+                    long prevLogIndex = -1;
+                    long prevLogTerm = -1;
+                    if (prevEntry != null) {
+                        prevLogIndex = prevEntry.getIndex();
+                        prevLogTerm = prevEntry.getTerm();
+                    }
+                    // Send an AppendEntries to all followers
+                    for (String followerId : followerToActor.keySet()) {
+                        ActorSelection followerActor =
+                            followerToActor.get(followerId);
+                        FollowerLogInformation followerLogInformation =
+                            followerToLog.get(followerId);
+                        followerActor.tell(
+                            new AppendEntries(currentTerm(), context.getId(),
+                                prevLogIndex, prevLogTerm,
+                                context.getReplicatedLog().getFrom(
+                                    followerLogInformation.getNextIndex()
+                                        .get()
+                                ), context.getCommitIndex()
+                            ),
+                            actor()
+                        );
+                    }
+                }
             }
-            return state();
+        } finally {
+            scheduleHeartBeat(HEART_BEAT_INTERVAL);
         }
+
         return super.handleMessage(sender, message);
     }
 
-    private void scheduleHeartBeat(FiniteDuration interval) {
+    private RaftState sendHeartBeat() {
+        if (followerToActor.size() > 0) {
+            for (String follower : followerToActor.keySet()) {
+
+                FollowerLogInformation followerLogInformation =
+                    followerToLog.get(follower);
+
+                AtomicLong nextIndex =
+                    followerLogInformation.getNextIndex();
+
+                List<ReplicatedLogEntry> entries =
+                    context.getReplicatedLog().getFrom(nextIndex.get());
+
+                followerToActor.get(follower).tell(new AppendEntries(
+                        context.getTermInformation().getCurrentTerm(),
+                        context.getId(),
+                        context.getReplicatedLog().lastIndex(),
+                        context.getReplicatedLog().lastTerm(),
+                        entries, context.getCommitIndex()),
+                    context.getActor()
+                );
+            }
+        }
+        return state();
+    }
+
+    private void stopHeartBeat() {
         if (heartbeatCancel != null && !heartbeatCancel.isCancelled()) {
             heartbeatCancel.cancel();
         }
+    }
+
+    private void scheduleHeartBeat(FiniteDuration interval) {
+        stopHeartBeat();
 
         // Schedule a heartbeat. When the scheduler triggers a SendHeartbeat
         // message is sent to itself.
@@ -135,9 +293,18 @@ public class Leader extends AbstractRaftActorBehavior {
         // need to be sent if there are other messages being sent to the remote
         // actor.
         heartbeatCancel =
-            context.getActorSystem().scheduler().scheduleOnce(interval,
+            context.getActorSystem().scheduler().scheduleOnce(
+                interval,
                 context.getActor(), new SendHeartBeat(),
                 context.getActorSystem().dispatcher(), context.getActor());
     }
 
+    @Override public void close() throws Exception {
+        stopHeartBeat();
+    }
+
+    @Override public String getLeaderId() {
+        return context.getId();
+    }
+
 }