Merge "Added requuired-capabilities to the impl/.../config/default-config.xml and...
[controller.git] / opendaylight / md-sal / sal-akka-raft / src / main / java / org / opendaylight / controller / cluster / raft / behaviors / AbstractLeader.java
index 4993d25f202ecee0be55f7ec17b26528924a2e67..da1627b98e7e4e8204385914795300ce073a71ad 100644 (file)
@@ -18,10 +18,11 @@ import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.ImmutableMap.Builder;
 import com.google.protobuf.ByteString;
 import java.io.IOException;
-import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
+import java.util.Iterator;
+import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
@@ -84,7 +85,7 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior {
 
     private Cancellable heartbeatSchedule = null;
 
-    private List<ClientRequestTracker> trackerList = new ArrayList<>();
+    private final Collection<ClientRequestTracker> trackerList = new LinkedList<>();
 
     protected final int minReplicationCount;
 
@@ -108,7 +109,7 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior {
 
         leaderId = context.getId();
 
-        LOG.debug("Election:Leader has following peers: {}", getFollowerIds());
+        LOG.debug("{}: Election: Leader has following peers: {}", context.getId(), getFollowerIds());
 
         minReplicationCount = getMajorityVoteCount(getFollowerIds().size());
 
@@ -152,7 +153,7 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior {
         AppendEntries appendEntries) {
 
         if(LOG.isDebugEnabled()) {
-            LOG.debug(appendEntries.toString());
+            LOG.debug("{}: handleAppendEntries: {}", context.getId(), appendEntries);
         }
 
         return this;
@@ -164,7 +165,7 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior {
 
         if(! appendEntriesReply.isSuccess()) {
             if(LOG.isDebugEnabled()) {
-                LOG.debug(appendEntriesReply.toString());
+                LOG.debug("{}: handleAppendEntriesReply: {}", context.getId(), appendEntriesReply);
             }
         }
 
@@ -174,7 +175,7 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior {
             followerToLog.get(followerId);
 
         if(followerLogInformation == null){
-            LOG.error("Unknown follower {}", followerId);
+            LOG.error("{}: handleAppendEntriesReply - unknown follower {}", context.getId(), followerId);
             return this;
         }
 
@@ -204,7 +205,7 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior {
             int replicatedCount = 1;
 
             for (FollowerLogInformation info : followerToLog.values()) {
-                if (info.getMatchIndex().get() >= N) {
+                if (info.getMatchIndex() >= N) {
                     replicatedCount++;
                 }
             }
@@ -228,16 +229,21 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior {
         return this;
     }
 
+    @Override
     protected ClientRequestTracker removeClientRequestTracker(long logIndex) {
-
-        ClientRequestTracker toRemove = findClientRequestTracker(logIndex);
-        if(toRemove != null) {
-            trackerList.remove(toRemove);
+        final Iterator<ClientRequestTracker> it = trackerList.iterator();
+        while (it.hasNext()) {
+            final ClientRequestTracker t = it.next();
+            if (t.getIndex() == logIndex) {
+                it.remove();
+                return t;
+            }
         }
 
-        return toRemove;
+        return null;
     }
 
+    @Override
     protected ClientRequestTracker findClientRequestTracker(long logIndex) {
         for (ClientRequestTracker tracker : trackerList) {
             if (tracker.getIndex() == logIndex) {
@@ -316,9 +322,9 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior {
                 if(followerToSnapshot.isLastChunk(reply.getChunkIndex())) {
                     //this was the last chunk reply
                     if(LOG.isDebugEnabled()) {
-                        LOG.debug("InstallSnapshotReply received, " +
+                        LOG.debug("{}: InstallSnapshotReply received, " +
                                 "last chunk received, Chunk:{}. Follower:{} Setting nextIndex:{}",
-                            reply.getChunkIndex(), followerId,
+                                context.getId(), reply.getChunkIndex(), followerId,
                             context.getReplicatedLog().getSnapshotIndex() + 1
                         );
                     }
@@ -330,8 +336,8 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior {
                     mapFollowerToSnapshot.remove(followerId);
 
                     if(LOG.isDebugEnabled()) {
-                        LOG.debug("followerToLog.get(followerId).getNextIndex().get()=" +
-                            followerToLog.get(followerId).getNextIndex().get());
+                        LOG.debug("{}: followerToLog.get(followerId).getNextIndex()=" +
+                                context.getId(), followerToLog.get(followerId).getNextIndex());
                     }
 
                     if (mapFollowerToSnapshot.isEmpty()) {
@@ -344,19 +350,16 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior {
                     followerToSnapshot.markSendStatus(true);
                 }
             } else {
-                LOG.info("InstallSnapshotReply received, " +
-                        "sending snapshot chunk failed, Will retry, Chunk:{}",
-                    reply.getChunkIndex()
-                );
+                LOG.info("{}: InstallSnapshotReply received sending snapshot chunk failed, Will retry, Chunk: {}",
+                        context.getId(), reply.getChunkIndex());
 
                 followerToSnapshot.markSendStatus(false);
             }
 
         } else {
-            LOG.error("ERROR!!" +
-                    "FollowerId in InstallSnapshotReply not known to Leader" +
+            LOG.error("{}: FollowerId in InstallSnapshotReply not known to Leader" +
                     " or Chunk Index in InstallSnapshotReply not matching {} != {}",
-                followerToSnapshot.getChunkIndex(), reply.getChunkIndex()
+                    context.getId(), followerToSnapshot.getChunkIndex(), reply.getChunkIndex()
             );
 
             if(reply.getChunkIndex() == INVALID_CHUNK_INDEX){
@@ -371,7 +374,7 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior {
         long logIndex = replicate.getReplicatedLogEntry().getIndex();
 
         if(LOG.isDebugEnabled()) {
-            LOG.debug("Replicate message {}", logIndex);
+            LOG.debug("{}: Replicate message {}", context.getId(), logIndex);
         }
 
         // Create a tracker entry we will use this later to notify the
@@ -398,9 +401,8 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior {
 
             if (followerActor != null) {
                 FollowerLogInformation followerLogInformation = followerToLog.get(followerId);
-                long followerNextIndex = followerLogInformation.getNextIndex().get();
+                long followerNextIndex = followerLogInformation.getNextIndex();
                 boolean isFollowerActive = followerLogInformation.isFollowerActive();
-                List<ReplicatedLogEntry> entries = null;
 
                 if (mapFollowerToSnapshot.get(followerId) != null) {
                     // if install snapshot is in process , then sent next chunk if possible
@@ -415,6 +417,7 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior {
                 } else {
                     long leaderLastIndex = context.getReplicatedLog().lastIndex();
                     long leaderSnapShotIndex = context.getReplicatedLog().getSnapshotIndex();
+                    final List<ReplicatedLogEntry> entries;
 
                     if (isFollowerActive &&
                         context.getReplicatedLog().isPresent(followerNextIndex)) {
@@ -428,11 +431,10 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior {
                         // then snapshot should be sent
 
                         if(LOG.isDebugEnabled()) {
-                            LOG.debug("InitiateInstallSnapshot to follower:{}," +
-                                    "follower-nextIndex:{}, leader-snapshot-index:{},  " +
-                                    "leader-last-index:{}", followerId,
-                                followerNextIndex, leaderSnapShotIndex, leaderLastIndex
-                            );
+                            LOG.debug(String.format("%s: InitiateInstallSnapshot to follower: %s," +
+                                    "follower-nextIndex: %s, leader-snapshot-index: %s,  " +
+                                    "leader-last-index: %s", context.getId(), followerId,
+                                followerNextIndex, leaderSnapShotIndex, leaderLastIndex));
                         }
                         actor().tell(new InitiateInstallSnapshot(), actor());
 
@@ -484,11 +486,11 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior {
             final ActorSelection followerActor = context.getPeerActorSelection(e.getKey());
 
             if (followerActor != null) {
-                long nextIndex = e.getValue().getNextIndex().get();
+                long nextIndex = e.getValue().getNextIndex();
 
                 if (!context.getReplicatedLog().isPresent(nextIndex) &&
                     context.getReplicatedLog().isInSnapshot(nextIndex)) {
-                    LOG.info("{} follower needs a snapshot install", e.getKey());
+                    LOG.info("{}: {} follower needs a snapshot install", context.getId(), e.getKey());
                     if (snapshot.isPresent()) {
                         // if a snapshot is present in the memory, most likely another install is in progress
                         // no need to capture snapshot
@@ -510,7 +512,7 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior {
     // on every install snapshot, we try to capture the snapshot.
     // Once a capture is going on, another one issued will get ignored by RaftActor.
     private void initiateCaptureSnapshot() {
-        LOG.info("Initiating Snapshot Capture to Install Snapshot, Leader:{}", getLeaderId());
+        LOG.info("{}: Initiating Snapshot Capture to Install Snapshot, Leader:{}", context.getId(), getLeaderId());
         ReplicatedLogEntry lastAppliedEntry = context.getReplicatedLog().get(context.getLastApplied());
         long lastAppliedIndex = -1;
         long lastAppliedTerm = -1;
@@ -535,7 +537,7 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior {
             ActorSelection followerActor = context.getPeerActorSelection(e.getKey());
 
             if (followerActor != null) {
-                long nextIndex = e.getValue().getNextIndex().get();
+                long nextIndex = e.getValue().getNextIndex();
 
                 if (!context.getReplicatedLog().isPresent(nextIndex) &&
                     context.getReplicatedLog().isInSnapshot(nextIndex)) {
@@ -563,12 +565,13 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior {
                     ).toSerializable(),
                     actor()
                 );
-                LOG.info("InstallSnapshot sent to follower {}, Chunk: {}/{}",
-                    followerActor.path(), mapFollowerToSnapshot.get(followerId).getChunkIndex(),
-                    mapFollowerToSnapshot.get(followerId).getTotalChunks());
+                LOG.info("{}: InstallSnapshot sent to follower {}, Chunk: {}/{}",
+                        context.getId(), followerActor.path(),
+                        mapFollowerToSnapshot.get(followerId).getChunkIndex(),
+                        mapFollowerToSnapshot.get(followerId).getTotalChunks());
             }
         } catch (IOException e) {
-            LOG.error(e, "InstallSnapshot failed for Leader.");
+            LOG.error(e, "{}: InstallSnapshot failed for Leader.", context.getId());
         }
     }
 
@@ -584,7 +587,7 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior {
         }
         ByteString nextChunk = followerToSnapshot.getNextChunk();
         if (LOG.isDebugEnabled()) {
-            LOG.debug("Leader's snapshot nextChunk size:{}", nextChunk.size());
+            LOG.debug("{}: Leader's snapshot nextChunk size:{}", context.getId(), nextChunk.size());
         }
         return nextChunk;
     }
@@ -648,14 +651,14 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior {
      * snapshot chunks
      */
     protected class FollowerToSnapshot {
-        private ByteString snapshotBytes;
+        private final ByteString snapshotBytes;
         private int offset = 0;
         // the next snapshot chunk is sent only if the replyReceivedForOffset matches offset
         private int replyReceivedForOffset;
         // if replyStatus is false, the previous chunk is attempted
         private boolean replyStatus = false;
         private int chunkIndex;
-        private int totalChunks;
+        private final int totalChunks;
         private int lastChunkHashCode = AbstractLeader.INITIAL_LAST_CHUNK_HASH_CODE;
         private int nextChunkHashCode = AbstractLeader.INITIAL_LAST_CHUNK_HASH_CODE;
 
@@ -665,8 +668,8 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior {
             totalChunks = ( size / context.getConfigParams().getSnapshotChunkSize()) +
                 ((size % context.getConfigParams().getSnapshotChunkSize()) > 0 ? 1 : 0);
             if(LOG.isDebugEnabled()) {
-                LOG.debug("Snapshot {} bytes, total chunks to send:{}",
-                    size, totalChunks);
+                LOG.debug("{}: Snapshot {} bytes, total chunks to send:{}",
+                        context.getId(), size, totalChunks);
             }
             replyReceivedForOffset = -1;
             chunkIndex = AbstractLeader.FIRST_CHUNK_INDEX;
@@ -735,7 +738,7 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior {
             }
 
             if(LOG.isDebugEnabled()) {
-                LOG.debug("length={}, offset={},size={}",
+                LOG.debug("{}: Next chunk: length={}, offset={},size={}", context.getId(),
                     snapshotLength, start, size);
             }
             ByteString substring = getSnapshotBytes().substring(start, start + size);