Merge "BUG 2518 : Throttle operations in a transaction"
authorTom Pantelis <tpanteli@brocade.com>
Mon, 26 Jan 2015 18:30:25 +0000 (18:30 +0000)
committerGerrit Code Review <gerrit@opendaylight.org>
Mon, 26 Jan 2015 18:30:25 +0000 (18:30 +0000)
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/example/ExampleActor.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/AbstractReplicatedLogImpl.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/FollowerLogInformation.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/FollowerLogInformationImpl.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActor.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractLeader.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/Leader.java
opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/AbstractReplicatedLogImplTest.java
opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/FollowerLogInformationImplTest.java
opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/LeaderTest.java
opendaylight/md-sal/sal-binding-broker/src/main/java/org/opendaylight/controller/sal/binding/impl/connect/dom/RpcInvocationStrategy.java

index 684c3ac30ecb5cbcb441f90bf430e63e0b5267a7..8f416b3abc45145e2f95307332052b66cdb4b5a1 100644 (file)
@@ -75,9 +75,8 @@ public class ExampleActor extends RaftActor {
 
         } else if (message instanceof PrintRole) {
             if(LOG.isDebugEnabled()) {
-                String followers = "";
                 if (getRaftState() == RaftState.Leader || getRaftState() == RaftState.IsolatedLeader) {
-                    followers = ((Leader)this.getCurrentBehavior()).printFollowerStates();
+                    final String followers = ((Leader)this.getCurrentBehavior()).printFollowerStates();
                     LOG.debug("{} = {}, Peers={}, followers={}", getId(), getRaftState(),
                         getRaftActorContext().getPeerAddresses().keySet(), followers);
                 } else {
index 653520c2e47db4be19de53c9e3a42099904298d8..d1c3fefee8309208a6df6fe9b539a10ee000ddef 100644 (file)
@@ -8,6 +8,7 @@
 package org.opendaylight.controller.cluster.raft;
 
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.List;
 
 /**
@@ -19,13 +20,13 @@ public abstract class AbstractReplicatedLogImpl implements ReplicatedLog {
     // We define this as ArrayList so we can use ensureCapacity.
     protected ArrayList<ReplicatedLogEntry> journal;
 
-    protected long snapshotIndex = -1;
-    protected long snapshotTerm = -1;
+    private long snapshotIndex = -1;
+    private long snapshotTerm = -1;
 
     // to be used for rollback during save snapshot failure
-    protected ArrayList<ReplicatedLogEntry> snapshottedJournal;
-    protected long previousSnapshotIndex = -1;
-    protected long previousSnapshotTerm = -1;
+    private ArrayList<ReplicatedLogEntry> snapshottedJournal;
+    private long previousSnapshotIndex = -1;
+    private long previousSnapshotTerm = -1;
     protected int dataSize = 0;
 
     public AbstractReplicatedLogImpl(long snapshotIndex,
@@ -36,7 +37,7 @@ public abstract class AbstractReplicatedLogImpl implements ReplicatedLog {
     }
 
     public AbstractReplicatedLogImpl() {
-        this.journal = new ArrayList<>();
+        this(-1L, -1L, Collections.<ReplicatedLogEntry>emptyList());
     }
 
     protected int adjustedIndex(long logEntryIndex) {
@@ -116,19 +117,18 @@ public abstract class AbstractReplicatedLogImpl implements ReplicatedLog {
     public List<ReplicatedLogEntry> getFrom(long logEntryIndex, int max) {
         int adjustedIndex = adjustedIndex(logEntryIndex);
         int size = journal.size();
-        List<ReplicatedLogEntry> entries = new ArrayList<>(100);
         if (adjustedIndex >= 0 && adjustedIndex < size) {
             // physical index should be less than list size and >= 0
             int maxIndex = adjustedIndex + max;
             if(maxIndex > size){
                 maxIndex = size;
             }
-            entries.addAll(journal.subList(adjustedIndex, maxIndex));
+            return new ArrayList<>(journal.subList(adjustedIndex, maxIndex));
+        } else {
+            return Collections.emptyList();
         }
-        return entries;
     }
 
-
     @Override
     public long size() {
        return journal.size();
index 4d2bad5c2effe1f55a7abb3e74786e30390abaab..6d0c14e733a8c81bb2e29a663915eb5776422a6d 100644 (file)
@@ -5,11 +5,8 @@
  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
  * and is available at http://www.eclipse.org/legal/epl-v10.html
  */
-
 package org.opendaylight.controller.cluster.raft;
 
-import java.util.concurrent.atomic.AtomicLong;
-
 /**
  * The state of the followers log as known by the Leader
  */
@@ -19,13 +16,13 @@ public interface FollowerLogInformation {
      * Increment the value of the nextIndex
      * @return
      */
-    public long incrNextIndex();
+    long incrNextIndex();
 
     /**
      * Decrement the value of the nextIndex
      * @return
      */
-    public long decrNextIndex();
+    long decrNextIndex();
 
     /**
      *
@@ -37,45 +34,43 @@ public interface FollowerLogInformation {
      * Increment the value of the matchIndex
      * @return
      */
-    public long incrMatchIndex();
+    long incrMatchIndex();
 
-    public void setMatchIndex(long matchIndex);
+    void setMatchIndex(long matchIndex);
 
     /**
      * The identifier of the follower
      * This could simply be the url of the remote actor
      */
-    public String getId();
+    String getId();
 
     /**
      * for each server, index of the next log entry
      * to send to that server (initialized to leader
      *    last log index + 1)
      */
-    public AtomicLong getNextIndex();
+    long getNextIndex();
 
     /**
      * for each server, index of highest log entry
      * known to be replicated on server
      *    (initialized to 0, increases monotonically)
      */
-    public AtomicLong getMatchIndex();
+    long getMatchIndex();
 
     /**
      * Checks if the follower is active by comparing the last updated with the duration
      * @return boolean
      */
-    public boolean isFollowerActive();
+    boolean isFollowerActive();
 
     /**
      * restarts the timeout clock of the follower
      */
-    public void markFollowerActive();
+    void markFollowerActive();
 
     /**
      * This will stop the timeout clock
      */
-    public void markFollowerInActive();
-
-
+    void markFollowerInActive();
 }
index 7df80af58a717e99c1a226c3bd6cbb17ccd92afc..7a690d3d18be84433f9e37c874a88b277b83f7cb 100644 (file)
@@ -9,61 +9,69 @@
 package org.opendaylight.controller.cluster.raft;
 
 import com.google.common.base.Stopwatch;
-import scala.concurrent.duration.FiniteDuration;
-
 import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicLongFieldUpdater;
+import scala.concurrent.duration.FiniteDuration;
 
-public class FollowerLogInformationImpl implements FollowerLogInformation{
+public class FollowerLogInformationImpl implements FollowerLogInformation {
+    private static final AtomicLongFieldUpdater<FollowerLogInformationImpl> NEXT_INDEX_UPDATER = AtomicLongFieldUpdater.newUpdater(FollowerLogInformationImpl.class, "nextIndex");
+    private static final AtomicLongFieldUpdater<FollowerLogInformationImpl> MATCH_INDEX_UPDATER = AtomicLongFieldUpdater.newUpdater(FollowerLogInformationImpl.class, "matchIndex");
 
     private final String id;
 
-    private final AtomicLong nextIndex;
+    private final Stopwatch stopwatch = new Stopwatch();
 
-    private final AtomicLong matchIndex;
+    private final long followerTimeoutMillis;
 
-    private final Stopwatch stopwatch;
+    private volatile long nextIndex;
 
-    private final long followerTimeoutMillis;
+    private volatile long matchIndex;
 
-    public FollowerLogInformationImpl(String id, AtomicLong nextIndex,
-        AtomicLong matchIndex, FiniteDuration followerTimeoutDuration) {
+    public FollowerLogInformationImpl(String id, long nextIndex,
+        long matchIndex, FiniteDuration followerTimeoutDuration) {
         this.id = id;
         this.nextIndex = nextIndex;
         this.matchIndex = matchIndex;
-        this.stopwatch = new Stopwatch();
         this.followerTimeoutMillis = followerTimeoutDuration.toMillis();
     }
 
+    @Override
     public long incrNextIndex(){
-        return nextIndex.incrementAndGet();
+        return NEXT_INDEX_UPDATER.incrementAndGet(this);
     }
 
-    @Override public long decrNextIndex() {
-        return nextIndex.decrementAndGet();
+    @Override
+    public long decrNextIndex() {
+        return NEXT_INDEX_UPDATER.decrementAndGet(this);
     }
 
-    @Override public void setNextIndex(long nextIndex) {
-        this.nextIndex.set(nextIndex);
+    @Override
+    public void setNextIndex(long nextIndex) {
+        this.nextIndex = nextIndex;
     }
 
+    @Override
     public long incrMatchIndex(){
-        return matchIndex.incrementAndGet();
+        return MATCH_INDEX_UPDATER.incrementAndGet(this);
     }
 
-    @Override public void setMatchIndex(long matchIndex) {
-        this.matchIndex.set(matchIndex);
+    @Override
+    public void setMatchIndex(long matchIndex) {
+        this.matchIndex = matchIndex;
     }
 
+    @Override
     public String getId() {
         return id;
     }
 
-    public AtomicLong getNextIndex() {
+    @Override
+    public long getNextIndex() {
         return nextIndex;
     }
 
-    public AtomicLong getMatchIndex() {
+    @Override
+    public long getMatchIndex() {
         return matchIndex;
     }
 
index 8e97c5877cba4cb90d195a6bb33cd42c449179dc..164c2cea561349cf178d63965eccd0a313e29b4e 100644 (file)
@@ -204,7 +204,7 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
         timer.stop();
         LOG.info("Recovery snapshot applied for {} in {}: snapshotIndex={}, snapshotTerm={}, journal-size=" +
                 replicatedLog.size(), persistenceId(), timer.toString(),
-                replicatedLog.snapshotIndex, replicatedLog.snapshotTerm);
+                replicatedLog.getSnapshotIndex(), replicatedLog.getSnapshotTerm());
     }
 
     private void onRecoveredJournalLogEntry(ReplicatedLogEntry logEntry) {
@@ -268,8 +268,8 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
                 "Persistence Id =  " + persistenceId() +
                 " Last index in log={}, snapshotIndex={}, snapshotTerm={}, " +
                 "journal-size={}",
-            replicatedLog.lastIndex(), replicatedLog.snapshotIndex,
-            replicatedLog.snapshotTerm, replicatedLog.size());
+            replicatedLog.lastIndex(), replicatedLog.getSnapshotIndex(),
+            replicatedLog.getSnapshotTerm(), replicatedLog.size());
 
         initializeBehavior();
     }
index e5c5dc752d3a257e9ce2f5852bf3350b006b8116..462c94ec8a40736cc005c994b74520d9111c3430 100644 (file)
@@ -14,16 +14,19 @@ import akka.actor.Cancellable;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Optional;
 import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.ImmutableMap.Builder;
 import com.google.protobuf.ByteString;
 import java.io.IOException;
-import java.util.ArrayList;
+import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
+import java.util.Iterator;
+import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
-import java.util.Set;
+import java.util.Map.Entry;
 import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicLong;
 import org.opendaylight.controller.cluster.raft.ClientRequestTracker;
 import org.opendaylight.controller.cluster.raft.ClientRequestTrackerImpl;
 import org.opendaylight.controller.cluster.raft.FollowerLogInformation;
@@ -77,14 +80,12 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior {
     // This would be passed as the hash code of the last chunk when sending the first chunk
     public static final int INITIAL_LAST_CHUNK_HASH_CODE = -1;
 
-    protected final Map<String, FollowerLogInformation> followerToLog = new HashMap<>();
-    protected final Map<String, FollowerToSnapshot> mapFollowerToSnapshot = new HashMap<>();
-
-    protected final Set<String> followers;
+    private final Map<String, FollowerLogInformation> followerToLog;
+    private final Map<String, FollowerToSnapshot> mapFollowerToSnapshot = new HashMap<>();
 
     private Cancellable heartbeatSchedule = null;
 
-    private List<ClientRequestTracker> trackerList = new ArrayList<>();
+    private final Collection<ClientRequestTracker> trackerList = new LinkedList<>();
 
     protected final int minReplicationCount;
 
@@ -95,25 +96,22 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior {
     public AbstractLeader(RaftActorContext context) {
         super(context);
 
-        followers = context.getPeerAddresses().keySet();
-
-        for (String followerId : followers) {
+        final Builder<String, FollowerLogInformation> ftlBuilder = ImmutableMap.builder();
+        for (String followerId : context.getPeerAddresses().keySet()) {
             FollowerLogInformation followerLogInformation =
                 new FollowerLogInformationImpl(followerId,
-                    new AtomicLong(context.getCommitIndex()),
-                    new AtomicLong(-1),
+                    context.getCommitIndex(), -1,
                     context.getConfigParams().getElectionTimeOutInterval());
 
-            followerToLog.put(followerId, followerLogInformation);
+            ftlBuilder.put(followerId, followerLogInformation);
         }
+        followerToLog = ftlBuilder.build();
 
         leaderId = context.getId();
 
-        if(LOG.isDebugEnabled()) {
-            LOG.debug("Election:Leader has following peers: {}", followers);
-        }
+        LOG.debug("Election:Leader has following peers: {}", getFollowerIds());
 
-        minReplicationCount = getMajorityVoteCount(followers.size());
+        minReplicationCount = getMajorityVoteCount(getFollowerIds().size());
 
         // the isolated Leader peer count will be 1 less than the majority vote count.
         // this is because the vote count has the self vote counted in it
@@ -132,6 +130,15 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior {
         scheduleHeartBeat(new FiniteDuration(0, TimeUnit.SECONDS));
     }
 
+    /**
+     * Return an immutable collection of follower identifiers.
+     *
+     * @return Collection of follower IDs
+     */
+    protected final Collection<String> getFollowerIds() {
+        return followerToLog.keySet();
+    }
+
     private Optional<ByteString> getSnapshot() {
         return snapshot;
     }
@@ -198,7 +205,7 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior {
             int replicatedCount = 1;
 
             for (FollowerLogInformation info : followerToLog.values()) {
-                if (info.getMatchIndex().get() >= N) {
+                if (info.getMatchIndex() >= N) {
                     replicatedCount++;
                 }
             }
@@ -222,16 +229,21 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior {
         return this;
     }
 
+    @Override
     protected ClientRequestTracker removeClientRequestTracker(long logIndex) {
-
-        ClientRequestTracker toRemove = findClientRequestTracker(logIndex);
-        if(toRemove != null) {
-            trackerList.remove(toRemove);
+        final Iterator<ClientRequestTracker> it = trackerList.iterator();
+        while (it.hasNext()) {
+            final ClientRequestTracker t = it.next();
+            if (t.getIndex() == logIndex) {
+                it.remove();
+                return t;
+            }
         }
 
-        return toRemove;
+        return null;
     }
 
+    @Override
     protected ClientRequestTracker findClientRequestTracker(long logIndex) {
         for (ClientRequestTracker tracker : trackerList) {
             if (tracker.getIndex() == logIndex) {
@@ -324,8 +336,8 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior {
                     mapFollowerToSnapshot.remove(followerId);
 
                     if(LOG.isDebugEnabled()) {
-                        LOG.debug("followerToLog.get(followerId).getNextIndex().get()=" +
-                            followerToLog.get(followerId).getNextIndex().get());
+                        LOG.debug("followerToLog.get(followerId).getNextIndex()=" +
+                            followerToLog.get(followerId).getNextIndex());
                     }
 
                     if (mapFollowerToSnapshot.isEmpty()) {
@@ -376,7 +388,7 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior {
                 logIndex)
         );
 
-        if (followers.size() == 0) {
+        if (followerToLog.isEmpty()) {
             context.setCommitIndex(logIndex);
             applyLogToStateMachine(logIndex);
         } else {
@@ -386,14 +398,14 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior {
 
     private void sendAppendEntries() {
         // Send an AppendEntries to all followers
-        for (String followerId : followers) {
+        for (Entry<String, FollowerLogInformation> e : followerToLog.entrySet()) {
+            final String followerId = e.getKey();
             ActorSelection followerActor = context.getPeerActorSelection(followerId);
 
             if (followerActor != null) {
                 FollowerLogInformation followerLogInformation = followerToLog.get(followerId);
-                long followerNextIndex = followerLogInformation.getNextIndex().get();
+                long followerNextIndex = followerLogInformation.getNextIndex();
                 boolean isFollowerActive = followerLogInformation.isFollowerActive();
-                List<ReplicatedLogEntry> entries = null;
 
                 if (mapFollowerToSnapshot.get(followerId) != null) {
                     // if install snapshot is in process , then sent next chunk if possible
@@ -408,6 +420,7 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior {
                 } else {
                     long leaderLastIndex = context.getReplicatedLog().lastIndex();
                     long leaderSnapShotIndex = context.getReplicatedLog().getSnapshotIndex();
+                    final List<ReplicatedLogEntry> entries;
 
                     if (isFollowerActive &&
                         context.getReplicatedLog().isPresent(followerNextIndex)) {
@@ -473,23 +486,19 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior {
      *
      */
     private void installSnapshotIfNeeded() {
-        for (String followerId : followers) {
-            ActorSelection followerActor =
-                context.getPeerActorSelection(followerId);
+        for (Entry<String, FollowerLogInformation> e : followerToLog.entrySet()) {
+            final ActorSelection followerActor = context.getPeerActorSelection(e.getKey());
 
-            if(followerActor != null) {
-                FollowerLogInformation followerLogInformation =
-                    followerToLog.get(followerId);
-
-                long nextIndex = followerLogInformation.getNextIndex().get();
+            if (followerActor != null) {
+                long nextIndex = e.getValue().getNextIndex();
 
                 if (!context.getReplicatedLog().isPresent(nextIndex) &&
                     context.getReplicatedLog().isInSnapshot(nextIndex)) {
-                    LOG.info("{} follower needs a snapshot install", followerId);
+                    LOG.info("{} follower needs a snapshot install", e.getKey());
                     if (snapshot.isPresent()) {
                         // if a snapshot is present in the memory, most likely another install is in progress
                         // no need to capture snapshot
-                        sendSnapshotChunk(followerActor, followerId);
+                        sendSnapshotChunk(followerActor, e.getKey());
 
                     } else {
                         initiateCaptureSnapshot();
@@ -528,16 +537,15 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior {
 
 
     private void sendInstallSnapshot() {
-        for (String followerId : followers) {
-            ActorSelection followerActor = context.getPeerActorSelection(followerId);
+        for (Entry<String, FollowerLogInformation> e : followerToLog.entrySet()) {
+            ActorSelection followerActor = context.getPeerActorSelection(e.getKey());
 
-            if(followerActor != null) {
-                FollowerLogInformation followerLogInformation = followerToLog.get(followerId);
-                long nextIndex = followerLogInformation.getNextIndex().get();
+            if (followerActor != null) {
+                long nextIndex = e.getValue().getNextIndex();
 
                 if (!context.getReplicatedLog().isPresent(nextIndex) &&
                     context.getReplicatedLog().isInSnapshot(nextIndex)) {
-                    sendSnapshotChunk(followerActor, followerId);
+                    sendSnapshotChunk(followerActor, e.getKey());
                 }
             }
         }
@@ -588,7 +596,7 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior {
     }
 
     private void sendHeartBeat() {
-        if (followers.size() > 0) {
+        if (!followerToLog.isEmpty()) {
             sendAppendEntries();
         }
     }
@@ -600,7 +608,7 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior {
     }
 
     private void scheduleHeartBeat(FiniteDuration interval) {
-        if(followers.size() == 0){
+        if (followerToLog.isEmpty()) {
             // Optimization - do not bother scheduling a heartbeat as there are
             // no followers
             return;
@@ -759,17 +767,38 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior {
 
     // called from example-actor for printing the follower-states
     public String printFollowerStates() {
-        StringBuilder sb = new StringBuilder();
-        for(FollowerLogInformation followerLogInformation : followerToLog.values()) {
-            boolean isFollowerActive = followerLogInformation.isFollowerActive();
-            sb.append("{"+followerLogInformation.getId() + " state:" + isFollowerActive + "},");
+        final StringBuilder sb = new StringBuilder();
 
+        sb.append('[');
+        for (FollowerLogInformation followerLogInformation : followerToLog.values()) {
+            sb.append('{');
+            sb.append(followerLogInformation.getId());
+            sb.append(" state:");
+            sb.append(followerLogInformation.isFollowerActive());
+            sb.append("},");
         }
-        return "[" + sb.toString() + "]";
+        sb.append(']');
+
+        return sb.toString();
+    }
+
+    @VisibleForTesting
+    public FollowerLogInformation getFollower(String followerId) {
+        return followerToLog.get(followerId);
+    }
+
+    @VisibleForTesting
+    protected void setFollowerSnapshot(String followerId, FollowerToSnapshot snapshot) {
+        mapFollowerToSnapshot.put(followerId, snapshot);
+    }
+
+    @VisibleForTesting
+    public int followerSnapshotSize() {
+        return mapFollowerToSnapshot.size();
     }
 
     @VisibleForTesting
-    void markFollowerActive(String followerId) {
-        followerToLog.get(followerId).markFollowerActive();
+    public int followerLogSize() {
+        return followerToLog.size();
     }
 }
index 97ecef370f08f00c7a2041b1897498a10d42fd60..ee3cc65dddb90815aecd50771c834d3ab461ecd4 100644 (file)
@@ -5,7 +5,6 @@
  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
  * and is available at http://www.eclipse.org/legal/epl-v10.html
  */
-
 package org.opendaylight.controller.cluster.raft.behaviors;
 
 import akka.actor.ActorRef;
@@ -74,7 +73,7 @@ public class Leader extends AbstractLeader {
     }
 
     protected void scheduleInstallSnapshotCheck(FiniteDuration interval) {
-        if(followers.size() == 0){
+        if (getFollowerIds().isEmpty()) {
             // Optimization - do not bother scheduling a heartbeat as there are
             // no followers
             return;
@@ -103,7 +102,8 @@ public class Leader extends AbstractLeader {
             context.getActorSystem().dispatcher(), context.getActor());
     }
 
-    @Override public void close() throws Exception {
+    @Override
+    public void close() throws Exception {
         stopInstallSnapshotSchedule();
         stopIsolatedLeaderCheckSchedule();
         super.close();
@@ -111,11 +111,11 @@ public class Leader extends AbstractLeader {
 
     @VisibleForTesting
     void markFollowerActive(String followerId) {
-        followerToLog.get(followerId).markFollowerActive();
+        getFollower(followerId).markFollowerActive();
     }
 
     @VisibleForTesting
     void markFollowerInActive(String followerId) {
-        followerToLog.get(followerId).markFollowerInActive();
+        getFollower(followerId).markFollowerInActive();
     }
 }
index d95c9d502712159334171301868ee9af4cfa99d7..d53ccf25002dbbf407d2e4dc5dc35c0ec231c590 100644 (file)
@@ -154,16 +154,6 @@ public class AbstractReplicatedLogImplTest {
         public void removeFromAndPersist(final long index) {
         }
 
-        @Override
-        public void setSnapshotIndex(final long snapshotIndex) {
-            this.snapshotIndex = snapshotIndex;
-        }
-
-        @Override
-        public void setSnapshotTerm(final long snapshotTerm) {
-            this.snapshotTerm = snapshotTerm;
-        }
-
         @Override
         public int dataSize() {
             return -1;
index 7df9f3713ffc302dcf100c45a93b8fd034b0fffa..a092c46533d251414ee7f22cf843c4fc3765f691 100644 (file)
@@ -7,14 +7,12 @@
  */
 package org.opendaylight.controller.cluster.raft;
 
-
 import com.google.common.base.Stopwatch;
 import com.google.common.util.concurrent.Uninterruptibles;
 import org.junit.Test;
 import scala.concurrent.duration.FiniteDuration;
 
 import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicLong;
 
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
@@ -29,7 +27,7 @@ public class FollowerLogInformationImplTest {
 
         FollowerLogInformation followerLogInformation =
             new FollowerLogInformationImpl(
-                "follower1", new AtomicLong(10), new AtomicLong(9), timeoutDuration);
+                "follower1", 10, 9, timeoutDuration);
 
 
 
index 895fe35bff7588526fac71e996e9120968234af2..151015e97ec785277beaf51b0bdb4fe0681f6582 100644 (file)
@@ -67,12 +67,12 @@ public class LeaderTest extends AbstractRaftActorBehaviorTest {
         }};
     }
 
-
     @Test
     public void testThatLeaderSendsAHeartbeatMessageToAllFollowers() {
         new JavaTestKit(getSystem()) {{
 
             new Within(duration("1 seconds")) {
+                @Override
                 protected void run() {
 
                     ActorRef followerActor = getTestActor();
@@ -92,6 +92,7 @@ public class LeaderTest extends AbstractRaftActorBehaviorTest {
                     final String out =
                         new ExpectMsg<String>(duration("1 seconds"), "match hint") {
                             // do not put code outside this method, will run afterwards
+                            @Override
                             protected String match(Object in) {
                                 Object msg = fromSerializableMessage(in);
                                 if (msg instanceof AppendEntries) {
@@ -117,6 +118,7 @@ public class LeaderTest extends AbstractRaftActorBehaviorTest {
         new JavaTestKit(getSystem()) {{
 
             new Within(duration("1 seconds")) {
+                @Override
                 protected void run() {
 
                     ActorRef followerActor = getTestActor();
@@ -145,6 +147,7 @@ public class LeaderTest extends AbstractRaftActorBehaviorTest {
                     final String out =
                         new ExpectMsg<String>(duration("1 seconds"), "match hint") {
                             // do not put code outside this method, will run afterwards
+                            @Override
                             protected String match(Object in) {
                                 Object msg = fromSerializableMessage(in);
                                 if (msg instanceof AppendEntries) {
@@ -169,6 +172,7 @@ public class LeaderTest extends AbstractRaftActorBehaviorTest {
         new JavaTestKit(getSystem()) {{
 
             new Within(duration("1 seconds")) {
+                @Override
                 protected void run() {
 
                     ActorRef raftActor = getTestActor();
@@ -195,6 +199,7 @@ public class LeaderTest extends AbstractRaftActorBehaviorTest {
                         new ExpectMsg<String>(duration("1 seconds"),
                             "match hint") {
                             // do not put code outside this method, will run afterwards
+                            @Override
                             protected String match(Object in) {
                                 if (in instanceof ApplyState) {
                                     if (((ApplyState) in).getIdentifier().equals("state-id")) {
@@ -482,6 +487,7 @@ public class LeaderTest extends AbstractRaftActorBehaviorTest {
             final String out =
                 new ExpectMsg<String>(duration("1 seconds"), "match hint") {
                     // do not put code outside this method, will run afterwards
+                    @Override
                     protected String match(Object in) {
                         if (in instanceof InstallSnapshotMessages.InstallSnapshot) {
                             InstallSnapshot is = (InstallSnapshot)
@@ -562,13 +568,13 @@ public class LeaderTest extends AbstractRaftActorBehaviorTest {
 
             assertTrue(raftBehavior instanceof Leader);
 
-            assertEquals(leader.mapFollowerToSnapshot.size(), 0);
-            assertEquals(leader.followerToLog.size(), 1);
-            assertNotNull(leader.followerToLog.get(followerActor.path().toString()));
-            FollowerLogInformation fli = leader.followerToLog.get(followerActor.path().toString());
-            assertEquals(snapshotIndex, fli.getMatchIndex().get());
-            assertEquals(snapshotIndex, fli.getMatchIndex().get());
-            assertEquals(snapshotIndex + 1, fli.getNextIndex().get());
+            assertEquals(0, leader.followerSnapshotSize());
+            assertEquals(1, leader.followerLogSize());
+            assertNotNull(leader.getFollower(followerActor.path().toString()));
+            FollowerLogInformation fli = leader.getFollower(followerActor.path().toString());
+            assertEquals(snapshotIndex, fli.getMatchIndex());
+            assertEquals(snapshotIndex, fli.getMatchIndex());
+            assertEquals(snapshotIndex + 1, fli.getNextIndex());
         }};
     }
 
@@ -779,6 +785,7 @@ public class LeaderTest extends AbstractRaftActorBehaviorTest {
         return createActorContext(leaderActor);
     }
 
+    @Override
     protected RaftActorContext createActorContext(ActorRef actorRef) {
         return new MockRaftActorContext("test", getSystem(), actorRef);
     }
@@ -1180,8 +1187,7 @@ public class LeaderTest extends AbstractRaftActorBehaviorTest {
 
         public void createFollowerToSnapshot(String followerId, ByteString bs ) {
             fts = new FollowerToSnapshot(bs);
-            mapFollowerToSnapshot.put(followerId, fts);
-
+            setFollowerSnapshot(followerId, fts);
         }
     }
 
index f03d07eb99c0fc2ae212cc358402ba67c17a4a74..2a6de4af0a0d2ed5eb6f1887ad7e1a26aa166f4c 100644 (file)
@@ -8,11 +8,16 @@
 
 package org.opendaylight.controller.sal.binding.impl.connect.dom;
 
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Function;
+import com.google.common.base.Optional;
+import com.google.common.collect.ImmutableList;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
 import java.lang.ref.WeakReference;
 import java.lang.reflect.Method;
 import java.util.Collections;
 import java.util.concurrent.Future;
-
 import org.opendaylight.controller.sal.core.api.RpcProvisionRegistry;
 import org.opendaylight.yangtools.yang.binding.DataContainer;
 import org.opendaylight.yangtools.yang.binding.DataObject;
@@ -26,12 +31,6 @@ import org.opendaylight.yangtools.yang.data.api.Node;
 import org.opendaylight.yangtools.yang.data.impl.ImmutableCompositeNode;
 import org.opendaylight.yangtools.yang.data.impl.codec.BindingIndependentMappingService;
 
-import com.google.common.base.Function;
-import com.google.common.base.Optional;
-import com.google.common.collect.ImmutableList;
-import com.google.common.util.concurrent.Futures;
-import com.google.common.util.concurrent.ListenableFuture;
-
 /*
  * RPC's can have both input, output, one or the other, or neither.
  *
@@ -44,6 +43,20 @@ import com.google.common.util.concurrent.ListenableFuture;
  *
  */
 public class RpcInvocationStrategy {
+    private final Function<RpcResult<CompositeNode>, RpcResult<?>> transformationFunction = new Function<RpcResult<CompositeNode>, RpcResult<?>>() {
+        @SuppressWarnings("rawtypes")
+        @Override
+        public RpcResult<?> apply(final RpcResult<CompositeNode> result) {
+            final Object output;
+            if (getOutputClass() != null && result.getResult() != null) {
+                output = mappingService.dataObjectFromDataDom(getOutputClass().get(), result.getResult());
+            } else {
+                output = null;
+            }
+
+            return RpcResultBuilder.from( (RpcResult)result ).withResult( output ).build();
+        }
+    };
 
     private final BindingIndependentMappingService mappingService;
     private final RpcProvisionRegistry biRpcRegistry;
@@ -61,26 +74,24 @@ public class RpcInvocationStrategy {
                                  final Method targetMethod,
                                  final BindingIndependentMappingService mappingService,
                                  final RpcProvisionRegistry biRpcRegistry ) {
-
+        this.mappingService = mappingService;
+        this.biRpcRegistry = biRpcRegistry;
         this.targetMethod = targetMethod;
         this.rpc = rpc;
 
-        Optional<Class<?>> outputClassOption = BindingReflections.resolveRpcOutputClass(targetMethod);
-        Optional<Class<? extends DataContainer>> inputClassOption = BindingReflections.resolveRpcInputClass(targetMethod);
-
-        if ( outputClassOption != null && outputClassOption.isPresent() ) {
-            this.outputClass = new WeakReference(outputClassOption.get() ) ;
+        final Optional<Class<?>> outputClassOption = BindingReflections.resolveRpcOutputClass(targetMethod);
+        if (outputClassOption.isPresent()) {
+            this.outputClass = new WeakReference(outputClassOption.get());
         } else {
-            this.outputClass = null ;
+            this.outputClass = null;
         }
-        if ( inputClassOption != null && inputClassOption.isPresent() ) {
-            this.inputClass = new WeakReference(inputClassOption.get() ) ;
+
+        final Optional<Class<? extends DataContainer>> inputClassOption = BindingReflections.resolveRpcInputClass(targetMethod);
+        if (inputClassOption.isPresent() ) {
+            this.inputClass = new WeakReference(inputClassOption.get());
         } else {
-            this.inputClass = null ;
+            this.inputClass = null;
         }
-
-        this.mappingService = mappingService;
-        this.biRpcRegistry = biRpcRegistry;
     }
 
     @SuppressWarnings({ "unchecked" })
@@ -98,25 +109,6 @@ public class RpcInvocationStrategy {
             inputXml = ImmutableCompositeNode.create( rpc, Collections.<Node<?>>emptyList() );
         }
 
-        Function<RpcResult<CompositeNode>, RpcResult<?>> transformationFunction =
-                                       new Function<RpcResult<CompositeNode>, RpcResult<?>>() {
-            @SuppressWarnings("rawtypes")
-            @Override
-            public RpcResult<?> apply(RpcResult<CompositeNode> result) {
-
-                Object output = null;
-
-                if( getOutputClass() != null ) {
-                    if (result.getResult() != null) {
-                        output = mappingService.dataObjectFromDataDom(getOutputClass().get(),
-                                                                    result.getResult());
-                    }
-                }
-
-                return RpcResultBuilder.from( (RpcResult)result ).withResult( output ).build();
-            }
-        };
-
         return Futures.transform(biRpcRegistry.invokeRpc(rpc, inputXml), transformationFunction);
     }
 
@@ -153,7 +145,8 @@ public class RpcInvocationStrategy {
     }
 
     @SuppressWarnings("rawtypes")
-    public WeakReference<Class> getOutputClass() {
+    @VisibleForTesting
+    WeakReference<Class> getOutputClass() {
         return outputClass;
     }
 }