Merge "Bug 2669: Use slf4j Logger instead of akka LoggingAdapter"
[controller.git] / opendaylight / md-sal / sal-akka-raft / src / main / java / org / opendaylight / controller / cluster / raft / behaviors / Follower.java
index 1cfdf9dba8b912a5bc23f78b85c447621298d908..8a0788702d81f3dfe04d713bcc99e527273c82d2 100644 (file)
@@ -9,8 +9,9 @@
 package org.opendaylight.controller.cluster.raft.behaviors;
 
 import akka.actor.ActorRef;
-import akka.event.LoggingAdapter;
+import com.google.common.annotations.VisibleForTesting;
 import com.google.protobuf.ByteString;
+import java.util.ArrayList;
 import org.opendaylight.controller.cluster.raft.RaftActorContext;
 import org.opendaylight.controller.cluster.raft.RaftState;
 import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
@@ -24,8 +25,6 @@ import org.opendaylight.controller.cluster.raft.messages.InstallSnapshotReply;
 import org.opendaylight.controller.cluster.raft.messages.RaftRPC;
 import org.opendaylight.controller.cluster.raft.messages.RequestVoteReply;
 
-import java.util.ArrayList;
-
 /**
  * The behavior of a RaftActor in the Follower state
  * <p/>
@@ -37,24 +36,48 @@ import java.util.ArrayList;
  * </ul>
  */
 public class Follower extends AbstractRaftActorBehavior {
-    private ByteString snapshotChunksCollected = ByteString.EMPTY;
 
-    private final LoggingAdapter LOG;
+    private SnapshotTracker snapshotTracker = null;
 
     public Follower(RaftActorContext context) {
         super(context);
 
-        LOG = context.getLogger();
-
         scheduleElection(electionDuration());
     }
 
-    @Override protected RaftState handleAppendEntries(ActorRef sender,
-        AppendEntries appendEntries) {
+    private boolean isLogEntryPresent(long index){
+        if(index == context.getReplicatedLog().getSnapshotIndex()){
+            return true;
+        }
+
+        ReplicatedLogEntry previousEntry = context.getReplicatedLog()
+                .get(index);
+
+        return previousEntry != null;
+
+    }
+
+    private long getLogEntryTerm(long index){
+        if(index == context.getReplicatedLog().getSnapshotIndex()){
+            return context.getReplicatedLog().getSnapshotTerm();
+        }
+
+        ReplicatedLogEntry previousEntry = context.getReplicatedLog()
+                .get(index);
+
+        if(previousEntry != null){
+            return previousEntry.getTerm();
+        }
+
+        return -1;
+    }
+
+    @Override protected RaftActorBehavior handleAppendEntries(ActorRef sender,
+                                                              AppendEntries appendEntries) {
 
         if(appendEntries.getEntries() != null && appendEntries.getEntries().size() > 0) {
             if(LOG.isDebugEnabled()) {
-                LOG.debug(appendEntries.toString());
+                LOG.debug("{}: handleAppendEntries: {}", context.getId(), appendEntries);
             }
         }
 
@@ -71,50 +94,50 @@ public class Follower extends AbstractRaftActorBehavior {
         // 2. Reply false if log doesn’t contain an entry at prevLogIndex
         // whose term matches prevLogTerm (§5.3)
 
-        ReplicatedLogEntry previousEntry = context.getReplicatedLog()
-            .get(appendEntries.getPrevLogIndex());
+        long prevLogTerm = getLogEntryTerm(appendEntries.getPrevLogIndex());
+        boolean prevEntryPresent = isLogEntryPresent(appendEntries.getPrevLogIndex());
 
 
         boolean outOfSync = true;
 
         // First check if the logs are in sync or not
         if (lastIndex() == -1
-            && appendEntries.getPrevLogIndex() != -1) {
+                && appendEntries.getPrevLogIndex() != -1) {
 
             // The follower's log is out of sync because the leader does have
             // an entry at prevLogIndex and this follower has no entries in
             // it's log.
 
             if(LOG.isDebugEnabled()) {
-                LOG.debug("The followers log is empty and the senders prevLogIndex is {}",
-                    appendEntries.getPrevLogIndex());
+                LOG.debug("{}: The followers log is empty and the senders prevLogIndex is {}",
+                        context.getId(), appendEntries.getPrevLogIndex());
             }
 
         } else if (lastIndex() > -1
-            && appendEntries.getPrevLogIndex() != -1
-            && previousEntry == null) {
+                && appendEntries.getPrevLogIndex() != -1
+                && !prevEntryPresent) {
 
             // The follower's log is out of sync because the Leader's
             // prevLogIndex entry was not found in it's log
 
             if(LOG.isDebugEnabled()) {
-                LOG.debug("The log is not empty but the prevLogIndex {} was not found in it",
-                    appendEntries.getPrevLogIndex());
+                LOG.debug("{}: The log is not empty but the prevLogIndex {} was not found in it",
+                        context.getId(), appendEntries.getPrevLogIndex());
             }
 
         } else if (lastIndex() > -1
-            && previousEntry != null
-            && previousEntry.getTerm()!= appendEntries.getPrevLogTerm()) {
+                && prevEntryPresent
+                && prevLogTerm != appendEntries.getPrevLogTerm()) {
 
             // The follower's log is out of sync because the Leader's
             // prevLogIndex entry does exist in the follower's log but it has
             // a different term in it
 
-            if(LOG.isDebugEnabled()) {
+            if (LOG.isDebugEnabled()) {
                 LOG.debug(
-                    "Cannot append entries because previous entry term {}  is not equal to append entries prevLogTerm {}"
-                    , previousEntry.getTerm()
-                    , appendEntries.getPrevLogTerm());
+                        "{}: Cannot append entries because previous entry term {}  is not equal to append entries prevLogTerm {}"
+                        , context.getId(), prevLogTerm
+                        , appendEntries.getPrevLogTerm());
             }
         } else {
             outOfSync = false;
@@ -124,25 +147,23 @@ public class Follower extends AbstractRaftActorBehavior {
             // We found that the log was out of sync so just send a negative
             // reply and return
             if(LOG.isDebugEnabled()) {
-                LOG.debug("Follower is out-of-sync, " +
+                LOG.debug("{}: Follower ({}) is out-of-sync, " +
                         "so sending negative reply, lastIndex():{}, lastTerm():{}",
-                    lastIndex(), lastTerm()
+                        context.getId(), context.getId(), lastIndex(), lastTerm()
                 );
             }
             sender.tell(
                 new AppendEntriesReply(context.getId(), currentTerm(), false,
                     lastIndex(), lastTerm()), actor()
             );
-            return state();
+            return this;
         }
 
         if (appendEntries.getEntries() != null
             && appendEntries.getEntries().size() > 0) {
             if(LOG.isDebugEnabled()) {
-                LOG.debug(
-                    "Number of entries to be appended = " + appendEntries
-                        .getEntries().size()
-                );
+                LOG.debug("{}: Number of entries to be appended = {}", context.getId(),
+                        appendEntries.getEntries().size());
             }
 
             // 3. If an existing entry conflicts with a new one (same index
@@ -167,10 +188,8 @@ public class Follower extends AbstractRaftActorBehavior {
                     }
 
                     if(LOG.isDebugEnabled()) {
-                        LOG.debug(
-                            "Removing entries from log starting at "
-                                + matchEntry.getIndex()
-                        );
+                        LOG.debug("{}: Removing entries from log starting at {}", context.getId(),
+                                matchEntry.getIndex());
                     }
 
                     // Entries do not match so remove all subsequent entries
@@ -181,27 +200,23 @@ public class Follower extends AbstractRaftActorBehavior {
             }
 
             if(LOG.isDebugEnabled()) {
-                context.getLogger().debug(
-                    "After cleanup entries to be added from = " + (addEntriesFrom
-                        + lastIndex())
-                );
+                LOG.debug("{}: After cleanup entries to be added from = {}", context.getId(),
+                        (addEntriesFrom + lastIndex()));
             }
 
             // 4. Append any new entries not already in the log
             for (int i = addEntriesFrom;
                  i < appendEntries.getEntries().size(); i++) {
 
-                context.getLogger().info(
-                    "Append entry to log " + appendEntries.getEntries().get(
-                        i).getData()
-                        .toString()
-                );
-                context.getReplicatedLog()
-                    .appendAndPersist(appendEntries.getEntries().get(i));
+                if(LOG.isDebugEnabled()) {
+                    LOG.debug("{}: Append entry to log {}", context.getId(),
+                            appendEntries.getEntries().get(i).getData());
+                }
+                context.getReplicatedLog().appendAndPersist(appendEntries.getEntries().get(i));
             }
 
             if(LOG.isDebugEnabled()) {
-                LOG.debug("Log size is now " + context.getReplicatedLog().size());
+                LOG.debug("{}: Log size is now {}", context.getId(), context.getReplicatedLog().size());
             }
         }
 
@@ -216,7 +231,7 @@ public class Follower extends AbstractRaftActorBehavior {
 
         if (prevCommitIndex != context.getCommitIndex()) {
             if(LOG.isDebugEnabled()) {
-                LOG.debug("Commit index set to " + context.getCommitIndex());
+                LOG.debug("{}: Commit index set to {}", context.getId(), context.getCommitIndex());
             }
         }
 
@@ -226,9 +241,9 @@ public class Follower extends AbstractRaftActorBehavior {
         if (appendEntries.getLeaderCommit() > context.getLastApplied() &&
             context.getLastApplied() < lastIndex()) {
             if(LOG.isDebugEnabled()) {
-                LOG.debug("applyLogToStateMachine, " +
+                LOG.debug("{}: applyLogToStateMachine, " +
                         "appendEntries.getLeaderCommit():{}," +
-                        "context.getLastApplied():{}, lastIndex():{}",
+                        "context.getLastApplied():{}, lastIndex():{}", context.getId(),
                     appendEntries.getLeaderCommit(), context.getLastApplied(), lastIndex()
                 );
             }
@@ -239,24 +254,28 @@ public class Follower extends AbstractRaftActorBehavior {
         sender.tell(new AppendEntriesReply(context.getId(), currentTerm(), true,
             lastIndex(), lastTerm()), actor());
 
-        return state();
+        if (!context.isSnapshotCaptureInitiated()) {
+            fakeSnapshot(appendEntries.getReplicatedToAllIndex(), appendEntries.getReplicatedToAllIndex());
+        }
+
+        return this;
     }
 
-    @Override protected RaftState handleAppendEntriesReply(ActorRef sender,
+    @Override protected RaftActorBehavior handleAppendEntriesReply(ActorRef sender,
         AppendEntriesReply appendEntriesReply) {
-        return state();
+        return this;
     }
 
-    @Override protected RaftState handleRequestVoteReply(ActorRef sender,
+    @Override protected RaftActorBehavior handleRequestVoteReply(ActorRef sender,
         RequestVoteReply requestVoteReply) {
-        return state();
+        return this;
     }
 
     @Override public RaftState state() {
         return RaftState.Follower;
     }
 
-    @Override public RaftState handleMessage(ActorRef sender, Object originalMessage) {
+    @Override public RaftActorBehavior handleMessage(ActorRef sender, Object originalMessage) {
 
         Object message = fromSerializableMessage(originalMessage);
 
@@ -271,7 +290,7 @@ public class Follower extends AbstractRaftActorBehavior {
         }
 
         if (message instanceof ElectionTimeout) {
-            return RaftState.Candidate;
+            return switchBehavior(new Candidate(context));
 
         } else if (message instanceof InstallSnapshot) {
             InstallSnapshot installSnapshot = (InstallSnapshot) message;
@@ -286,52 +305,59 @@ public class Follower extends AbstractRaftActorBehavior {
     private void handleInstallSnapshot(ActorRef sender, InstallSnapshot installSnapshot) {
 
         if(LOG.isDebugEnabled()) {
-            LOG.debug("InstallSnapshot received by follower " +
-                    "datasize:{} , Chunk:{}/{}", installSnapshot.getData().size(),
+            LOG.debug("{}: InstallSnapshot received by follower " +
+                    "datasize:{} , Chunk:{}/{}", context.getId(), installSnapshot.getData().size(),
                 installSnapshot.getChunkIndex(), installSnapshot.getTotalChunks()
             );
         }
 
-        try {
-            if (installSnapshot.getChunkIndex() == installSnapshot.getTotalChunks()) {
-                // this is the last chunk, create a snapshot object and apply
-
-                snapshotChunksCollected = snapshotChunksCollected.concat(installSnapshot.getData());
-                context.getLogger().debug("Last chunk received: snapshotChunksCollected.size:{}",
-                    snapshotChunksCollected.size());
+        if(snapshotTracker == null){
+            snapshotTracker = new SnapshotTracker(LOG, installSnapshot.getTotalChunks());
+        }
 
-                Snapshot snapshot = Snapshot.create(snapshotChunksCollected.toByteArray(),
-                    new ArrayList<ReplicatedLogEntry>(),
-                    installSnapshot.getLastIncludedIndex(),
-                    installSnapshot.getLastIncludedTerm(),
-                    installSnapshot.getLastIncludedIndex(),
-                    installSnapshot.getLastIncludedTerm());
+        try {
+            if(snapshotTracker.addChunk(installSnapshot.getChunkIndex(), installSnapshot.getData(),
+                    installSnapshot.getLastChunkHashCode())){
+                Snapshot snapshot = Snapshot.create(snapshotTracker.getSnapshot(),
+                        new ArrayList<ReplicatedLogEntry>(),
+                        installSnapshot.getLastIncludedIndex(),
+                        installSnapshot.getLastIncludedTerm(),
+                        installSnapshot.getLastIncludedIndex(),
+                        installSnapshot.getLastIncludedTerm());
 
                 actor().tell(new ApplySnapshot(snapshot), actor());
 
-            } else {
-                // we have more to go
-                snapshotChunksCollected = snapshotChunksCollected.concat(installSnapshot.getData());
+                snapshotTracker = null;
 
-                if(LOG.isDebugEnabled()) {
-                    LOG.debug("Chunk={},snapshotChunksCollected.size:{}",
-                        installSnapshot.getChunkIndex(), snapshotChunksCollected.size());
-                }
             }
 
             sender.tell(new InstallSnapshotReply(
-                currentTerm(), context.getId(), installSnapshot.getChunkIndex(),
-                true), actor());
+                    currentTerm(), context.getId(), installSnapshot.getChunkIndex(),
+                    true), actor());
+
+        } catch (SnapshotTracker.InvalidChunkException e) {
+
+            sender.tell(new InstallSnapshotReply(currentTerm(), context.getId(),
+                    -1, false), actor());
+            snapshotTracker = null;
 
-        } catch (Exception e) {
-            context.getLogger().error("Exception in InstallSnapshot of follower", e);
+        } catch (Exception e){
+            LOG.error("{}: Exception in InstallSnapshot of follower", context.getId(), e);
             //send reply with success as false. The chunk will be sent again on failure
             sender.tell(new InstallSnapshotReply(currentTerm(), context.getId(),
-                installSnapshot.getChunkIndex(), false), actor());
+                    installSnapshot.getChunkIndex(), false), actor());
+
         }
     }
 
     @Override public void close() throws Exception {
         stopElection();
     }
+
+    @VisibleForTesting
+    ByteString getSnapshotChunksCollected(){
+        return snapshotTracker != null ? snapshotTracker.getCollectedChunks() : ByteString.EMPTY;
+    }
+
+
 }