Add (un)lock ops to netconf testtool
[controller.git] / opendaylight / md-sal / sal-akka-raft / src / main / java / org / opendaylight / controller / cluster / raft / behaviors / AbstractLeader.java
index bb6002809c32b802afd0fec99ec9f1ce0fba4d94..462c94ec8a40736cc005c994b74520d9111c3430 100644 (file)
@@ -18,12 +18,14 @@ import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.ImmutableMap.Builder;
 import com.google.protobuf.ByteString;
 import java.io.IOException;
-import java.util.ArrayList;
+import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
+import java.util.Iterator;
+import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
-import java.util.Set;
+import java.util.Map.Entry;
 import java.util.concurrent.TimeUnit;
 import org.opendaylight.controller.cluster.raft.ClientRequestTracker;
 import org.opendaylight.controller.cluster.raft.ClientRequestTrackerImpl;
@@ -81,11 +83,9 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior {
     private final Map<String, FollowerLogInformation> followerToLog;
     private final Map<String, FollowerToSnapshot> mapFollowerToSnapshot = new HashMap<>();
 
-    protected final Set<String> followers;
-
     private Cancellable heartbeatSchedule = null;
 
-    private List<ClientRequestTracker> trackerList = new ArrayList<>();
+    private final Collection<ClientRequestTracker> trackerList = new LinkedList<>();
 
     protected final int minReplicationCount;
 
@@ -96,10 +96,8 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior {
     public AbstractLeader(RaftActorContext context) {
         super(context);
 
-        followers = context.getPeerAddresses().keySet();
-
         final Builder<String, FollowerLogInformation> ftlBuilder = ImmutableMap.builder();
-        for (String followerId : followers) {
+        for (String followerId : context.getPeerAddresses().keySet()) {
             FollowerLogInformation followerLogInformation =
                 new FollowerLogInformationImpl(followerId,
                     context.getCommitIndex(), -1,
@@ -111,11 +109,9 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior {
 
         leaderId = context.getId();
 
-        if(LOG.isDebugEnabled()) {
-            LOG.debug("Election:Leader has following peers: {}", followers);
-        }
+        LOG.debug("Election:Leader has following peers: {}", getFollowerIds());
 
-        minReplicationCount = getMajorityVoteCount(followers.size());
+        minReplicationCount = getMajorityVoteCount(getFollowerIds().size());
 
         // the isolated Leader peer count will be 1 less than the majority vote count.
         // this is because the vote count has the self vote counted in it
@@ -134,6 +130,15 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior {
         scheduleHeartBeat(new FiniteDuration(0, TimeUnit.SECONDS));
     }
 
+    /**
+     * Return an immutable collection of follower identifiers.
+     *
+     * @return Collection of follower IDs
+     */
+    protected final Collection<String> getFollowerIds() {
+        return followerToLog.keySet();
+    }
+
     private Optional<ByteString> getSnapshot() {
         return snapshot;
     }
@@ -200,7 +205,7 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior {
             int replicatedCount = 1;
 
             for (FollowerLogInformation info : followerToLog.values()) {
-                if (info.getMatchIndex().get() >= N) {
+                if (info.getMatchIndex() >= N) {
                     replicatedCount++;
                 }
             }
@@ -224,16 +229,21 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior {
         return this;
     }
 
+    @Override
     protected ClientRequestTracker removeClientRequestTracker(long logIndex) {
-
-        ClientRequestTracker toRemove = findClientRequestTracker(logIndex);
-        if(toRemove != null) {
-            trackerList.remove(toRemove);
+        final Iterator<ClientRequestTracker> it = trackerList.iterator();
+        while (it.hasNext()) {
+            final ClientRequestTracker t = it.next();
+            if (t.getIndex() == logIndex) {
+                it.remove();
+                return t;
+            }
         }
 
-        return toRemove;
+        return null;
     }
 
+    @Override
     protected ClientRequestTracker findClientRequestTracker(long logIndex) {
         for (ClientRequestTracker tracker : trackerList) {
             if (tracker.getIndex() == logIndex) {
@@ -326,8 +336,8 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior {
                     mapFollowerToSnapshot.remove(followerId);
 
                     if(LOG.isDebugEnabled()) {
-                        LOG.debug("followerToLog.get(followerId).getNextIndex().get()=" +
-                            followerToLog.get(followerId).getNextIndex().get());
+                        LOG.debug("followerToLog.get(followerId).getNextIndex()=" +
+                            followerToLog.get(followerId).getNextIndex());
                     }
 
                     if (mapFollowerToSnapshot.isEmpty()) {
@@ -378,7 +388,7 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior {
                 logIndex)
         );
 
-        if (followers.size() == 0) {
+        if (followerToLog.isEmpty()) {
             context.setCommitIndex(logIndex);
             applyLogToStateMachine(logIndex);
         } else {
@@ -388,14 +398,14 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior {
 
     private void sendAppendEntries() {
         // Send an AppendEntries to all followers
-        for (String followerId : followers) {
+        for (Entry<String, FollowerLogInformation> e : followerToLog.entrySet()) {
+            final String followerId = e.getKey();
             ActorSelection followerActor = context.getPeerActorSelection(followerId);
 
             if (followerActor != null) {
                 FollowerLogInformation followerLogInformation = followerToLog.get(followerId);
-                long followerNextIndex = followerLogInformation.getNextIndex().get();
+                long followerNextIndex = followerLogInformation.getNextIndex();
                 boolean isFollowerActive = followerLogInformation.isFollowerActive();
-                List<ReplicatedLogEntry> entries = null;
 
                 if (mapFollowerToSnapshot.get(followerId) != null) {
                     // if install snapshot is in process , then sent next chunk if possible
@@ -410,6 +420,7 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior {
                 } else {
                     long leaderLastIndex = context.getReplicatedLog().lastIndex();
                     long leaderSnapShotIndex = context.getReplicatedLog().getSnapshotIndex();
+                    final List<ReplicatedLogEntry> entries;
 
                     if (isFollowerActive &&
                         context.getReplicatedLog().isPresent(followerNextIndex)) {
@@ -475,23 +486,19 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior {
      *
      */
     private void installSnapshotIfNeeded() {
-        for (String followerId : followers) {
-            ActorSelection followerActor =
-                context.getPeerActorSelection(followerId);
-
-            if(followerActor != null) {
-                FollowerLogInformation followerLogInformation =
-                    followerToLog.get(followerId);
+        for (Entry<String, FollowerLogInformation> e : followerToLog.entrySet()) {
+            final ActorSelection followerActor = context.getPeerActorSelection(e.getKey());
 
-                long nextIndex = followerLogInformation.getNextIndex().get();
+            if (followerActor != null) {
+                long nextIndex = e.getValue().getNextIndex();
 
                 if (!context.getReplicatedLog().isPresent(nextIndex) &&
                     context.getReplicatedLog().isInSnapshot(nextIndex)) {
-                    LOG.info("{} follower needs a snapshot install", followerId);
+                    LOG.info("{} follower needs a snapshot install", e.getKey());
                     if (snapshot.isPresent()) {
                         // if a snapshot is present in the memory, most likely another install is in progress
                         // no need to capture snapshot
-                        sendSnapshotChunk(followerActor, followerId);
+                        sendSnapshotChunk(followerActor, e.getKey());
 
                     } else {
                         initiateCaptureSnapshot();
@@ -530,16 +537,15 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior {
 
 
     private void sendInstallSnapshot() {
-        for (String followerId : followers) {
-            ActorSelection followerActor = context.getPeerActorSelection(followerId);
+        for (Entry<String, FollowerLogInformation> e : followerToLog.entrySet()) {
+            ActorSelection followerActor = context.getPeerActorSelection(e.getKey());
 
-            if(followerActor != null) {
-                FollowerLogInformation followerLogInformation = followerToLog.get(followerId);
-                long nextIndex = followerLogInformation.getNextIndex().get();
+            if (followerActor != null) {
+                long nextIndex = e.getValue().getNextIndex();
 
                 if (!context.getReplicatedLog().isPresent(nextIndex) &&
                     context.getReplicatedLog().isInSnapshot(nextIndex)) {
-                    sendSnapshotChunk(followerActor, followerId);
+                    sendSnapshotChunk(followerActor, e.getKey());
                 }
             }
         }
@@ -590,7 +596,7 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior {
     }
 
     private void sendHeartBeat() {
-        if (followers.size() > 0) {
+        if (!followerToLog.isEmpty()) {
             sendAppendEntries();
         }
     }
@@ -602,7 +608,7 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior {
     }
 
     private void scheduleHeartBeat(FiniteDuration interval) {
-        if(followers.size() == 0){
+        if (followerToLog.isEmpty()) {
             // Optimization - do not bother scheduling a heartbeat as there are
             // no followers
             return;