Merge changes I3e404877,Ida2a5c32,I9e6ce426,I6a4b90f6,I79717533
authorTony Tkacik <ttkacik@cisco.com>
Mon, 26 Jan 2015 09:04:20 +0000 (09:04 +0000)
committerGerrit Code Review <gerrit@opendaylight.org>
Mon, 26 Jan 2015 09:04:20 +0000 (09:04 +0000)
* changes:
  Change trackerList to a LinkedList
  Improve AbstractLeader tracker removal
  use AtomicLongfieldUpdater in FollowerLogInformationImpl
  Use an ImmutableList instance for getFrom()
  Hide AbstractReplicatedLogImpl index fields

opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/AbstractReplicatedLogImpl.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/FollowerLogInformationImpl.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActor.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractLeader.java
opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/AbstractReplicatedLogImplTest.java

index 653520c2e47db4be19de53c9e3a42099904298d8..d1c3fefee8309208a6df6fe9b539a10ee000ddef 100644 (file)
@@ -8,6 +8,7 @@
 package org.opendaylight.controller.cluster.raft;
 
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.List;
 
 /**
@@ -19,13 +20,13 @@ public abstract class AbstractReplicatedLogImpl implements ReplicatedLog {
     // We define this as ArrayList so we can use ensureCapacity.
     protected ArrayList<ReplicatedLogEntry> journal;
 
-    protected long snapshotIndex = -1;
-    protected long snapshotTerm = -1;
+    private long snapshotIndex = -1;
+    private long snapshotTerm = -1;
 
     // to be used for rollback during save snapshot failure
-    protected ArrayList<ReplicatedLogEntry> snapshottedJournal;
-    protected long previousSnapshotIndex = -1;
-    protected long previousSnapshotTerm = -1;
+    private ArrayList<ReplicatedLogEntry> snapshottedJournal;
+    private long previousSnapshotIndex = -1;
+    private long previousSnapshotTerm = -1;
     protected int dataSize = 0;
 
     public AbstractReplicatedLogImpl(long snapshotIndex,
@@ -36,7 +37,7 @@ public abstract class AbstractReplicatedLogImpl implements ReplicatedLog {
     }
 
     public AbstractReplicatedLogImpl() {
-        this.journal = new ArrayList<>();
+        this(-1L, -1L, Collections.<ReplicatedLogEntry>emptyList());
     }
 
     protected int adjustedIndex(long logEntryIndex) {
@@ -116,19 +117,18 @@ public abstract class AbstractReplicatedLogImpl implements ReplicatedLog {
     public List<ReplicatedLogEntry> getFrom(long logEntryIndex, int max) {
         int adjustedIndex = adjustedIndex(logEntryIndex);
         int size = journal.size();
-        List<ReplicatedLogEntry> entries = new ArrayList<>(100);
         if (adjustedIndex >= 0 && adjustedIndex < size) {
             // physical index should be less than list size and >= 0
             int maxIndex = adjustedIndex + max;
             if(maxIndex > size){
                 maxIndex = size;
             }
-            entries.addAll(journal.subList(adjustedIndex, maxIndex));
+            return new ArrayList<>(journal.subList(adjustedIndex, maxIndex));
+        } else {
+            return Collections.emptyList();
         }
-        return entries;
     }
 
-
     @Override
     public long size() {
        return journal.size();
index 91a5dddb981b3f7b09e9a79dd44fa0c09d4448a2..7a690d3d18be84433f9e37c874a88b277b83f7cb 100644 (file)
@@ -10,53 +10,54 @@ package org.opendaylight.controller.cluster.raft;
 
 import com.google.common.base.Stopwatch;
 import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicLongFieldUpdater;
 import scala.concurrent.duration.FiniteDuration;
 
 public class FollowerLogInformationImpl implements FollowerLogInformation {
+    private static final AtomicLongFieldUpdater<FollowerLogInformationImpl> NEXT_INDEX_UPDATER = AtomicLongFieldUpdater.newUpdater(FollowerLogInformationImpl.class, "nextIndex");
+    private static final AtomicLongFieldUpdater<FollowerLogInformationImpl> MATCH_INDEX_UPDATER = AtomicLongFieldUpdater.newUpdater(FollowerLogInformationImpl.class, "matchIndex");
 
     private final String id;
 
-    private final AtomicLong nextIndex;
+    private final Stopwatch stopwatch = new Stopwatch();
 
-    private final AtomicLong matchIndex;
+    private final long followerTimeoutMillis;
 
-    private final Stopwatch stopwatch;
+    private volatile long nextIndex;
 
-    private final long followerTimeoutMillis;
+    private volatile long matchIndex;
 
     public FollowerLogInformationImpl(String id, long nextIndex,
         long matchIndex, FiniteDuration followerTimeoutDuration) {
         this.id = id;
-        this.nextIndex = new AtomicLong(nextIndex);
-        this.matchIndex = new AtomicLong(matchIndex);
-        this.stopwatch = new Stopwatch();
+        this.nextIndex = nextIndex;
+        this.matchIndex = matchIndex;
         this.followerTimeoutMillis = followerTimeoutDuration.toMillis();
     }
 
     @Override
     public long incrNextIndex(){
-        return nextIndex.incrementAndGet();
+        return NEXT_INDEX_UPDATER.incrementAndGet(this);
     }
 
     @Override
     public long decrNextIndex() {
-        return nextIndex.decrementAndGet();
+        return NEXT_INDEX_UPDATER.decrementAndGet(this);
     }
 
     @Override
     public void setNextIndex(long nextIndex) {
-        this.nextIndex.set(nextIndex);
+        this.nextIndex = nextIndex;
     }
 
     @Override
     public long incrMatchIndex(){
-        return matchIndex.incrementAndGet();
+        return MATCH_INDEX_UPDATER.incrementAndGet(this);
     }
 
     @Override
     public void setMatchIndex(long matchIndex) {
-        this.matchIndex.set(matchIndex);
+        this.matchIndex = matchIndex;
     }
 
     @Override
@@ -66,12 +67,12 @@ public class FollowerLogInformationImpl implements FollowerLogInformation {
 
     @Override
     public long getNextIndex() {
-        return nextIndex.get();
+        return nextIndex;
     }
 
     @Override
     public long getMatchIndex() {
-        return matchIndex.get();
+        return matchIndex;
     }
 
     @Override
index 8e97c5877cba4cb90d195a6bb33cd42c449179dc..164c2cea561349cf178d63965eccd0a313e29b4e 100644 (file)
@@ -204,7 +204,7 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
         timer.stop();
         LOG.info("Recovery snapshot applied for {} in {}: snapshotIndex={}, snapshotTerm={}, journal-size=" +
                 replicatedLog.size(), persistenceId(), timer.toString(),
-                replicatedLog.snapshotIndex, replicatedLog.snapshotTerm);
+                replicatedLog.getSnapshotIndex(), replicatedLog.getSnapshotTerm());
     }
 
     private void onRecoveredJournalLogEntry(ReplicatedLogEntry logEntry) {
@@ -268,8 +268,8 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
                 "Persistence Id =  " + persistenceId() +
                 " Last index in log={}, snapshotIndex={}, snapshotTerm={}, " +
                 "journal-size={}",
-            replicatedLog.lastIndex(), replicatedLog.snapshotIndex,
-            replicatedLog.snapshotTerm, replicatedLog.size());
+            replicatedLog.lastIndex(), replicatedLog.getSnapshotIndex(),
+            replicatedLog.getSnapshotTerm(), replicatedLog.size());
 
         initializeBehavior();
     }
index 0a31cde90fb2c0139b72ec805aa5e4f83483bcb4..462c94ec8a40736cc005c994b74520d9111c3430 100644 (file)
@@ -18,10 +18,11 @@ import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.ImmutableMap.Builder;
 import com.google.protobuf.ByteString;
 import java.io.IOException;
-import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
+import java.util.Iterator;
+import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
@@ -84,7 +85,7 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior {
 
     private Cancellable heartbeatSchedule = null;
 
-    private List<ClientRequestTracker> trackerList = new ArrayList<>();
+    private final Collection<ClientRequestTracker> trackerList = new LinkedList<>();
 
     protected final int minReplicationCount;
 
@@ -230,13 +231,16 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior {
 
     @Override
     protected ClientRequestTracker removeClientRequestTracker(long logIndex) {
-
-        ClientRequestTracker toRemove = findClientRequestTracker(logIndex);
-        if(toRemove != null) {
-            trackerList.remove(toRemove);
+        final Iterator<ClientRequestTracker> it = trackerList.iterator();
+        while (it.hasNext()) {
+            final ClientRequestTracker t = it.next();
+            if (t.getIndex() == logIndex) {
+                it.remove();
+                return t;
+            }
         }
 
-        return toRemove;
+        return null;
     }
 
     @Override
@@ -402,7 +406,6 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior {
                 FollowerLogInformation followerLogInformation = followerToLog.get(followerId);
                 long followerNextIndex = followerLogInformation.getNextIndex();
                 boolean isFollowerActive = followerLogInformation.isFollowerActive();
-                List<ReplicatedLogEntry> entries = null;
 
                 if (mapFollowerToSnapshot.get(followerId) != null) {
                     // if install snapshot is in process , then sent next chunk if possible
@@ -417,6 +420,7 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior {
                 } else {
                     long leaderLastIndex = context.getReplicatedLog().lastIndex();
                     long leaderSnapShotIndex = context.getReplicatedLog().getSnapshotIndex();
+                    final List<ReplicatedLogEntry> entries;
 
                     if (isFollowerActive &&
                         context.getReplicatedLog().isPresent(followerNextIndex)) {
index d95c9d502712159334171301868ee9af4cfa99d7..d53ccf25002dbbf407d2e4dc5dc35c0ec231c590 100644 (file)
@@ -154,16 +154,6 @@ public class AbstractReplicatedLogImplTest {
         public void removeFromAndPersist(final long index) {
         }
 
-        @Override
-        public void setSnapshotIndex(final long snapshotIndex) {
-            this.snapshotIndex = snapshotIndex;
-        }
-
-        @Override
-        public void setSnapshotTerm(final long snapshotTerm) {
-            this.snapshotTerm = snapshotTerm;
-        }
-
         @Override
         public int dataSize() {
             return -1;