Merge "Fix NPE in AbstractLeader#handleInstallSnapshotReply"
authorMoiz Raja <moraja@cisco.com>
Mon, 9 Feb 2015 18:10:11 +0000 (18:10 +0000)
committerGerrit Code Review <gerrit@opendaylight.org>
Mon, 9 Feb 2015 18:10:11 +0000 (18:10 +0000)
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractLeader.java

index e28e4b066d372ee54594ecaf9fe0e5259ad4cdfd..410dcee5e5e811e9b9cb485b30420a4d21311a12 100644 (file)
@@ -294,6 +294,9 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior {
             // set currentTerm = T, convert to follower (ยง5.1)
             // This applies to all RPC messages and responses
             if (rpc.getTerm() > context.getTermInformation().getCurrentTerm()) {
+                LOG.debug("{}: Term {} in \"{}\" message is greater than leader's term {}", context.getId(),
+                        rpc.getTerm(), rpc, context.getTermInformation().getCurrentTerm());
+
                 context.getTermInformation().updateAndPersist(rpc.getTerm(), null);
 
                 return switchBehavior(new Follower(context));
@@ -330,12 +333,17 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior {
     private void handleInstallSnapshotReply(InstallSnapshotReply reply) {
         String followerId = reply.getFollowerId();
         FollowerToSnapshot followerToSnapshot = mapFollowerToSnapshot.get(followerId);
+
+        if (followerToSnapshot == null) {
+            LOG.error("{}: FollowerId {} in InstallSnapshotReply not known to Leader",
+                    context.getId(), followerId);
+            return;
+        }
+
         FollowerLogInformation followerLogInformation = followerToLog.get(followerId);
         followerLogInformation.markFollowerActive();
 
-        if (followerToSnapshot != null &&
-            followerToSnapshot.getChunkIndex() == reply.getChunkIndex()) {
-
+        if (followerToSnapshot.getChunkIndex() == reply.getChunkIndex()) {
             if (reply.isSuccess()) {
                 if(followerToSnapshot.isLastChunk(reply.getChunkIndex())) {
                     //this was the last chunk reply
@@ -373,12 +381,10 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior {
 
                 followerToSnapshot.markSendStatus(false);
             }
-
         } else {
-            LOG.error("{}: FollowerId in InstallSnapshotReply not known to Leader" +
-                    " or Chunk Index in InstallSnapshotReply not matching {} != {}",
-                    context.getId(), followerToSnapshot.getChunkIndex(), reply.getChunkIndex()
-            );
+            LOG.error("{}: Chunk index {} in InstallSnapshotReply from follower {} does not match expected index {}",
+                    context.getId(), reply.getChunkIndex(), followerId,
+                    followerToSnapshot.getChunkIndex());
 
             if(reply.getChunkIndex() == INVALID_CHUNK_INDEX){
                 // Since the Follower did not find this index to be valid we should reset the follower snapshot
@@ -413,6 +419,7 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior {
 
     private void sendAppendEntries() {
         // Send an AppendEntries to all followers
+
         for (Entry<String, FollowerLogInformation> e : followerToLog.entrySet()) {
             final String followerId = e.getKey();
             ActorSelection followerActor = context.getPeerActorSelection(followerId);
@@ -422,14 +429,15 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior {
                 long followerNextIndex = followerLogInformation.getNextIndex();
                 boolean isFollowerActive = followerLogInformation.isFollowerActive();
 
-                if (mapFollowerToSnapshot.get(followerId) != null) {
+                FollowerToSnapshot followerToSnapshot = mapFollowerToSnapshot.get(followerId);
+                if (followerToSnapshot != null) {
                     // if install snapshot is in process , then sent next chunk if possible
-                    if (isFollowerActive && mapFollowerToSnapshot.get(followerId).canSendNextChunk()) {
+                    if (isFollowerActive && followerToSnapshot.canSendNextChunk()) {
                         sendSnapshotChunk(followerActor, followerId);
                     } else {
                         // we send a heartbeat even if we have not received a reply for the last chunk
                         sendAppendEntriesToFollower(followerActor, followerNextIndex,
-                            Collections.<ReplicatedLogEntry>emptyList());
+                            Collections.<ReplicatedLogEntry>emptyList(), followerId);
                     }
 
                 } else {
@@ -437,8 +445,13 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior {
                     long leaderSnapShotIndex = context.getReplicatedLog().getSnapshotIndex();
                     final List<ReplicatedLogEntry> entries;
 
-                    if (isFollowerActive &&
-                        context.getReplicatedLog().isPresent(followerNextIndex)) {
+                    LOG.debug("{}: Checking sendAppendEntries for {}, leaderLastIndex: {}, leaderSnapShotIndex: {}",
+                            context.getId(), leaderLastIndex, leaderSnapShotIndex);
+
+                    if (isFollowerActive && context.getReplicatedLog().isPresent(followerNextIndex)) {
+                        LOG.debug("{}: sendAppendEntries: {} is present for {}", context.getId(),
+                                followerNextIndex, followerId);
+
                         // FIXME : Sending one entry at a time
                         entries = context.getReplicatedLog().getFrom(followerNextIndex, 1);
 
@@ -465,23 +478,25 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior {
                         entries =  Collections.<ReplicatedLogEntry>emptyList();
                     }
 
-                    sendAppendEntriesToFollower(followerActor, followerNextIndex, entries);
-
+                    sendAppendEntriesToFollower(followerActor, followerNextIndex, entries, followerId);
                 }
             }
         }
     }
 
     private void sendAppendEntriesToFollower(ActorSelection followerActor, long followerNextIndex,
-        List<ReplicatedLogEntry> entries) {
-        followerActor.tell(
-            new AppendEntries(currentTerm(), context.getId(),
-                prevLogIndex(followerNextIndex),
-                prevLogTerm(followerNextIndex), entries,
-                context.getCommitIndex(),
-                replicatedToAllIndex).toSerializable(),
-            actor()
-        );
+        List<ReplicatedLogEntry> entries, String followerId) {
+        AppendEntries appendEntries = new AppendEntries(currentTerm(), context.getId(),
+            prevLogIndex(followerNextIndex),
+            prevLogTerm(followerNextIndex), entries,
+            context.getCommitIndex(), replicatedToAllIndex);
+
+        if(!entries.isEmpty()) {
+            LOG.debug("{}: Sending AppendEntries to follower {}: {}", context.getId(), followerId,
+                    appendEntries);
+        }
+
+        followerActor.tell(appendEntries.toSerializable(), actor());
     }
 
     /**
@@ -501,6 +516,10 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior {
      *
      */
     private void installSnapshotIfNeeded() {
+        if(LOG.isDebugEnabled()) {
+            LOG.debug("{}: installSnapshotIfNeeded, followers {}", context.getId(), followerToLog.keySet());
+        }
+
         for (Entry<String, FollowerLogInformation> e : followerToLog.entrySet()) {
             final ActorSelection followerActor = context.getPeerActorSelection(e.getKey());
 
@@ -508,7 +527,7 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior {
                 long nextIndex = e.getValue().getNextIndex();
 
                 if (!context.getReplicatedLog().isPresent(nextIndex) &&
-                    context.getReplicatedLog().isInSnapshot(nextIndex)) {
+                        context.getReplicatedLog().isInSnapshot(nextIndex)) {
                     LOG.info("{}: {} follower needs a snapshot install", context.getId(), e.getKey());
                     if (snapshot.isPresent()) {
                         // if a snapshot is present in the memory, most likely another install is in progress
@@ -573,21 +592,27 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior {
     private void sendSnapshotChunk(ActorSelection followerActor, String followerId) {
         try {
             if (snapshot.isPresent()) {
+                ByteString nextSnapshotChunk = getNextSnapshotChunk(followerId,snapshot.get());
+
+                // Note: the previous call to getNextSnapshotChunk has the side-effect of adding
+                // followerId to the followerToSnapshot map.
+                FollowerToSnapshot followerToSnapshot = mapFollowerToSnapshot.get(followerId);
+
                 followerActor.tell(
                     new InstallSnapshot(currentTerm(), context.getId(),
                         context.getReplicatedLog().getSnapshotIndex(),
                         context.getReplicatedLog().getSnapshotTerm(),
-                        getNextSnapshotChunk(followerId,snapshot.get()),
-                        mapFollowerToSnapshot.get(followerId).incrementChunkIndex(),
-                        mapFollowerToSnapshot.get(followerId).getTotalChunks(),
-                        Optional.of(mapFollowerToSnapshot.get(followerId).getLastChunkHashCode())
+                        nextSnapshotChunk,
+                        followerToSnapshot.incrementChunkIndex(),
+                        followerToSnapshot.getTotalChunks(),
+                        Optional.of(followerToSnapshot.getLastChunkHashCode())
                     ).toSerializable(),
                     actor()
                 );
                 LOG.info("{}: InstallSnapshot sent to follower {}, Chunk: {}/{}",
                         context.getId(), followerActor.path(),
-                        mapFollowerToSnapshot.get(followerId).getChunkIndex(),
-                        mapFollowerToSnapshot.get(followerId).getTotalChunks());
+                        followerToSnapshot.getChunkIndex(),
+                        followerToSnapshot.getTotalChunks());
             }
         } catch (IOException e) {
             LOG.error(e, "{}: InstallSnapshot failed for Leader.", context.getId());