Merge "Bug 2265: Address comments from 12448"
authorMoiz Raja <moraja@cisco.com>
Wed, 28 Jan 2015 13:46:08 +0000 (13:46 +0000)
committerGerrit Code Review <gerrit@opendaylight.org>
Wed, 28 Jan 2015 13:46:09 +0000 (13:46 +0000)
31 files changed:
opendaylight/archetypes/opendaylight-startup/src/main/resources/archetype-resources/__artifactId__-features/pom.xml
opendaylight/config/config-parent/pom.xml
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/example/ExampleActor.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/AbstractReplicatedLogImpl.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/FollowerLogInformation.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/FollowerLogInformationImpl.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActor.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractLeader.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/Leader.java
opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/AbstractReplicatedLogImplTest.java
opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/FollowerLogInformationImplTest.java
opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/LeaderTest.java
opendaylight/md-sal/sal-binding-broker/src/main/java/org/opendaylight/controller/sal/binding/codegen/impl/AbstractRuntimeCodeGenerator.java
opendaylight/md-sal/sal-binding-broker/src/main/java/org/opendaylight/controller/sal/binding/impl/connect/dom/RpcInvocationStrategy.java
opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/datastore/node/utils/AugmentationIdentifierGenerator.java
opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/datastore/node/utils/PathUtils.java
opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/datastore/node/utils/QNameFactory.java
opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/datastore/node/utils/serialization/ValueSerializer.java
opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/datastore/node/utils/serialization/ValueType.java
opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/datastore/node/utils/stream/ValueTypes.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ConfigurationImpl.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionProxy.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/shardstrategy/DefaultShardStrategy.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/shardstrategy/ModuleShardStrategy.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/shardstrategy/ShardStrategyFactory.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/utils/ActorContext.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/TransactionChainProxyTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/TransactionProxyTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/shardstrategy/DefaultShardStrategyTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/utils/ActorContextTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/md/cluster/datastore/model/TestModel.java

index 880e2dc9d59543db777e11c76af628b1adef7fbe..1ee28b8d5a2ed48b7540f09d41f1172fc2a2cb08 100644 (file)
@@ -39,6 +39,30 @@ and is available at http://www.eclipse.org/legal/epl-v10.html INTERNAL
     </dependencies>
   </dependencyManagement>
   <dependencies>
+    <dependency>
+      <groupId>org.opendaylight.yangtools</groupId>
+      <artifactId>features-yangtools</artifactId>
+      <classifier>features</classifier>
+      <version>${yangtools.version}</version>
+      <type>xml</type>
+      <scope>runtime</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.opendaylight.controller</groupId>
+      <artifactId>features-mdsal</artifactId>
+      <classifier>features</classifier>
+      <version>${mdsal.version}</version>
+      <type>xml</type>
+      <scope>runtime</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.opendaylight.controller</groupId>
+      <artifactId>features-restconf</artifactId>
+      <classifier>features</classifier>
+      <version>${mdsal.version}</version>
+      <type>xml</type>
+      <scope>runtime</scope>
+    </dependency>
     <dependency>
       <groupId>${symbol_dollar}{groupId}</groupId>
       <artifactId>${artifactId}-impl</artifactId>
index 10c1824ebece2a24af7cc299dba79d88b03d18c3..af39b63447cdf6429371a40a8155de53272cba1d 100644 (file)
@@ -24,6 +24,7 @@ and is available at http://www.eclipse.org/legal/epl-v10.html
   <properties>
     <config.version>0.3.0-SNAPSHOT</config.version>
     <mdsal.version>1.2.0-SNAPSHOT</mdsal.version>
+    <yangtools.version>0.7.0-SNAPSHOT</yangtools.version>
     <jmxGeneratorPath>src/main/yang-gen-config</jmxGeneratorPath>
     <config.file>src/main/config/default-config.xml</config.file>
   </properties>
@@ -45,10 +46,21 @@ and is available at http://www.eclipse.org/legal/epl-v10.html
         <type>pom</type>
         <scope>import</scope>
       </dependency>
+      <dependency>
+        <groupId>org.opendaylight.yangtools</groupId>
+        <artifactId>yangtools-artifacts</artifactId>
+        <version>${yangtools.version}</version>
+        <type>pom</type>
+        <scope>import</scope>
+      </dependency>
     </dependencies>
   </dependencyManagement>
 
   <dependencies>
+    <dependency>
+      <groupId>org.opendaylight.yangtools</groupId>
+      <artifactId>yang-common</artifactId>
+    </dependency>
     <dependency>
       <groupId>org.opendaylight.controller</groupId>
       <artifactId>config-api</artifactId>
index 684c3ac30ecb5cbcb441f90bf430e63e0b5267a7..8f416b3abc45145e2f95307332052b66cdb4b5a1 100644 (file)
@@ -75,9 +75,8 @@ public class ExampleActor extends RaftActor {
 
         } else if (message instanceof PrintRole) {
             if(LOG.isDebugEnabled()) {
-                String followers = "";
                 if (getRaftState() == RaftState.Leader || getRaftState() == RaftState.IsolatedLeader) {
-                    followers = ((Leader)this.getCurrentBehavior()).printFollowerStates();
+                    final String followers = ((Leader)this.getCurrentBehavior()).printFollowerStates();
                     LOG.debug("{} = {}, Peers={}, followers={}", getId(), getRaftState(),
                         getRaftActorContext().getPeerAddresses().keySet(), followers);
                 } else {
index 653520c2e47db4be19de53c9e3a42099904298d8..d1c3fefee8309208a6df6fe9b539a10ee000ddef 100644 (file)
@@ -8,6 +8,7 @@
 package org.opendaylight.controller.cluster.raft;
 
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.List;
 
 /**
@@ -19,13 +20,13 @@ public abstract class AbstractReplicatedLogImpl implements ReplicatedLog {
     // We define this as ArrayList so we can use ensureCapacity.
     protected ArrayList<ReplicatedLogEntry> journal;
 
-    protected long snapshotIndex = -1;
-    protected long snapshotTerm = -1;
+    private long snapshotIndex = -1;
+    private long snapshotTerm = -1;
 
     // to be used for rollback during save snapshot failure
-    protected ArrayList<ReplicatedLogEntry> snapshottedJournal;
-    protected long previousSnapshotIndex = -1;
-    protected long previousSnapshotTerm = -1;
+    private ArrayList<ReplicatedLogEntry> snapshottedJournal;
+    private long previousSnapshotIndex = -1;
+    private long previousSnapshotTerm = -1;
     protected int dataSize = 0;
 
     public AbstractReplicatedLogImpl(long snapshotIndex,
@@ -36,7 +37,7 @@ public abstract class AbstractReplicatedLogImpl implements ReplicatedLog {
     }
 
     public AbstractReplicatedLogImpl() {
-        this.journal = new ArrayList<>();
+        this(-1L, -1L, Collections.<ReplicatedLogEntry>emptyList());
     }
 
     protected int adjustedIndex(long logEntryIndex) {
@@ -116,19 +117,18 @@ public abstract class AbstractReplicatedLogImpl implements ReplicatedLog {
     public List<ReplicatedLogEntry> getFrom(long logEntryIndex, int max) {
         int adjustedIndex = adjustedIndex(logEntryIndex);
         int size = journal.size();
-        List<ReplicatedLogEntry> entries = new ArrayList<>(100);
         if (adjustedIndex >= 0 && adjustedIndex < size) {
             // physical index should be less than list size and >= 0
             int maxIndex = adjustedIndex + max;
             if(maxIndex > size){
                 maxIndex = size;
             }
-            entries.addAll(journal.subList(adjustedIndex, maxIndex));
+            return new ArrayList<>(journal.subList(adjustedIndex, maxIndex));
+        } else {
+            return Collections.emptyList();
         }
-        return entries;
     }
 
-
     @Override
     public long size() {
        return journal.size();
index 4d2bad5c2effe1f55a7abb3e74786e30390abaab..6d0c14e733a8c81bb2e29a663915eb5776422a6d 100644 (file)
@@ -5,11 +5,8 @@
  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
  * and is available at http://www.eclipse.org/legal/epl-v10.html
  */
-
 package org.opendaylight.controller.cluster.raft;
 
-import java.util.concurrent.atomic.AtomicLong;
-
 /**
  * The state of the followers log as known by the Leader
  */
@@ -19,13 +16,13 @@ public interface FollowerLogInformation {
      * Increment the value of the nextIndex
      * @return
      */
-    public long incrNextIndex();
+    long incrNextIndex();
 
     /**
      * Decrement the value of the nextIndex
      * @return
      */
-    public long decrNextIndex();
+    long decrNextIndex();
 
     /**
      *
@@ -37,45 +34,43 @@ public interface FollowerLogInformation {
      * Increment the value of the matchIndex
      * @return
      */
-    public long incrMatchIndex();
+    long incrMatchIndex();
 
-    public void setMatchIndex(long matchIndex);
+    void setMatchIndex(long matchIndex);
 
     /**
      * The identifier of the follower
      * This could simply be the url of the remote actor
      */
-    public String getId();
+    String getId();
 
     /**
      * for each server, index of the next log entry
      * to send to that server (initialized to leader
      *    last log index + 1)
      */
-    public AtomicLong getNextIndex();
+    long getNextIndex();
 
     /**
      * for each server, index of highest log entry
      * known to be replicated on server
      *    (initialized to 0, increases monotonically)
      */
-    public AtomicLong getMatchIndex();
+    long getMatchIndex();
 
     /**
      * Checks if the follower is active by comparing the last updated with the duration
      * @return boolean
      */
-    public boolean isFollowerActive();
+    boolean isFollowerActive();
 
     /**
      * restarts the timeout clock of the follower
      */
-    public void markFollowerActive();
+    void markFollowerActive();
 
     /**
      * This will stop the timeout clock
      */
-    public void markFollowerInActive();
-
-
+    void markFollowerInActive();
 }
index 7df80af58a717e99c1a226c3bd6cbb17ccd92afc..7a690d3d18be84433f9e37c874a88b277b83f7cb 100644 (file)
@@ -9,61 +9,69 @@
 package org.opendaylight.controller.cluster.raft;
 
 import com.google.common.base.Stopwatch;
-import scala.concurrent.duration.FiniteDuration;
-
 import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicLongFieldUpdater;
+import scala.concurrent.duration.FiniteDuration;
 
-public class FollowerLogInformationImpl implements FollowerLogInformation{
+public class FollowerLogInformationImpl implements FollowerLogInformation {
+    private static final AtomicLongFieldUpdater<FollowerLogInformationImpl> NEXT_INDEX_UPDATER = AtomicLongFieldUpdater.newUpdater(FollowerLogInformationImpl.class, "nextIndex");
+    private static final AtomicLongFieldUpdater<FollowerLogInformationImpl> MATCH_INDEX_UPDATER = AtomicLongFieldUpdater.newUpdater(FollowerLogInformationImpl.class, "matchIndex");
 
     private final String id;
 
-    private final AtomicLong nextIndex;
+    private final Stopwatch stopwatch = new Stopwatch();
 
-    private final AtomicLong matchIndex;
+    private final long followerTimeoutMillis;
 
-    private final Stopwatch stopwatch;
+    private volatile long nextIndex;
 
-    private final long followerTimeoutMillis;
+    private volatile long matchIndex;
 
-    public FollowerLogInformationImpl(String id, AtomicLong nextIndex,
-        AtomicLong matchIndex, FiniteDuration followerTimeoutDuration) {
+    public FollowerLogInformationImpl(String id, long nextIndex,
+        long matchIndex, FiniteDuration followerTimeoutDuration) {
         this.id = id;
         this.nextIndex = nextIndex;
         this.matchIndex = matchIndex;
-        this.stopwatch = new Stopwatch();
         this.followerTimeoutMillis = followerTimeoutDuration.toMillis();
     }
 
+    @Override
     public long incrNextIndex(){
-        return nextIndex.incrementAndGet();
+        return NEXT_INDEX_UPDATER.incrementAndGet(this);
     }
 
-    @Override public long decrNextIndex() {
-        return nextIndex.decrementAndGet();
+    @Override
+    public long decrNextIndex() {
+        return NEXT_INDEX_UPDATER.decrementAndGet(this);
     }
 
-    @Override public void setNextIndex(long nextIndex) {
-        this.nextIndex.set(nextIndex);
+    @Override
+    public void setNextIndex(long nextIndex) {
+        this.nextIndex = nextIndex;
     }
 
+    @Override
     public long incrMatchIndex(){
-        return matchIndex.incrementAndGet();
+        return MATCH_INDEX_UPDATER.incrementAndGet(this);
     }
 
-    @Override public void setMatchIndex(long matchIndex) {
-        this.matchIndex.set(matchIndex);
+    @Override
+    public void setMatchIndex(long matchIndex) {
+        this.matchIndex = matchIndex;
     }
 
+    @Override
     public String getId() {
         return id;
     }
 
-    public AtomicLong getNextIndex() {
+    @Override
+    public long getNextIndex() {
         return nextIndex;
     }
 
-    public AtomicLong getMatchIndex() {
+    @Override
+    public long getMatchIndex() {
         return matchIndex;
     }
 
index 8e97c5877cba4cb90d195a6bb33cd42c449179dc..164c2cea561349cf178d63965eccd0a313e29b4e 100644 (file)
@@ -204,7 +204,7 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
         timer.stop();
         LOG.info("Recovery snapshot applied for {} in {}: snapshotIndex={}, snapshotTerm={}, journal-size=" +
                 replicatedLog.size(), persistenceId(), timer.toString(),
-                replicatedLog.snapshotIndex, replicatedLog.snapshotTerm);
+                replicatedLog.getSnapshotIndex(), replicatedLog.getSnapshotTerm());
     }
 
     private void onRecoveredJournalLogEntry(ReplicatedLogEntry logEntry) {
@@ -268,8 +268,8 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
                 "Persistence Id =  " + persistenceId() +
                 " Last index in log={}, snapshotIndex={}, snapshotTerm={}, " +
                 "journal-size={}",
-            replicatedLog.lastIndex(), replicatedLog.snapshotIndex,
-            replicatedLog.snapshotTerm, replicatedLog.size());
+            replicatedLog.lastIndex(), replicatedLog.getSnapshotIndex(),
+            replicatedLog.getSnapshotTerm(), replicatedLog.size());
 
         initializeBehavior();
     }
index e5c5dc752d3a257e9ce2f5852bf3350b006b8116..462c94ec8a40736cc005c994b74520d9111c3430 100644 (file)
@@ -14,16 +14,19 @@ import akka.actor.Cancellable;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Optional;
 import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.ImmutableMap.Builder;
 import com.google.protobuf.ByteString;
 import java.io.IOException;
-import java.util.ArrayList;
+import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
+import java.util.Iterator;
+import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
-import java.util.Set;
+import java.util.Map.Entry;
 import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicLong;
 import org.opendaylight.controller.cluster.raft.ClientRequestTracker;
 import org.opendaylight.controller.cluster.raft.ClientRequestTrackerImpl;
 import org.opendaylight.controller.cluster.raft.FollowerLogInformation;
@@ -77,14 +80,12 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior {
     // This would be passed as the hash code of the last chunk when sending the first chunk
     public static final int INITIAL_LAST_CHUNK_HASH_CODE = -1;
 
-    protected final Map<String, FollowerLogInformation> followerToLog = new HashMap<>();
-    protected final Map<String, FollowerToSnapshot> mapFollowerToSnapshot = new HashMap<>();
-
-    protected final Set<String> followers;
+    private final Map<String, FollowerLogInformation> followerToLog;
+    private final Map<String, FollowerToSnapshot> mapFollowerToSnapshot = new HashMap<>();
 
     private Cancellable heartbeatSchedule = null;
 
-    private List<ClientRequestTracker> trackerList = new ArrayList<>();
+    private final Collection<ClientRequestTracker> trackerList = new LinkedList<>();
 
     protected final int minReplicationCount;
 
@@ -95,25 +96,22 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior {
     public AbstractLeader(RaftActorContext context) {
         super(context);
 
-        followers = context.getPeerAddresses().keySet();
-
-        for (String followerId : followers) {
+        final Builder<String, FollowerLogInformation> ftlBuilder = ImmutableMap.builder();
+        for (String followerId : context.getPeerAddresses().keySet()) {
             FollowerLogInformation followerLogInformation =
                 new FollowerLogInformationImpl(followerId,
-                    new AtomicLong(context.getCommitIndex()),
-                    new AtomicLong(-1),
+                    context.getCommitIndex(), -1,
                     context.getConfigParams().getElectionTimeOutInterval());
 
-            followerToLog.put(followerId, followerLogInformation);
+            ftlBuilder.put(followerId, followerLogInformation);
         }
+        followerToLog = ftlBuilder.build();
 
         leaderId = context.getId();
 
-        if(LOG.isDebugEnabled()) {
-            LOG.debug("Election:Leader has following peers: {}", followers);
-        }
+        LOG.debug("Election:Leader has following peers: {}", getFollowerIds());
 
-        minReplicationCount = getMajorityVoteCount(followers.size());
+        minReplicationCount = getMajorityVoteCount(getFollowerIds().size());
 
         // the isolated Leader peer count will be 1 less than the majority vote count.
         // this is because the vote count has the self vote counted in it
@@ -132,6 +130,15 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior {
         scheduleHeartBeat(new FiniteDuration(0, TimeUnit.SECONDS));
     }
 
+    /**
+     * Return an immutable collection of follower identifiers.
+     *
+     * @return Collection of follower IDs
+     */
+    protected final Collection<String> getFollowerIds() {
+        return followerToLog.keySet();
+    }
+
     private Optional<ByteString> getSnapshot() {
         return snapshot;
     }
@@ -198,7 +205,7 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior {
             int replicatedCount = 1;
 
             for (FollowerLogInformation info : followerToLog.values()) {
-                if (info.getMatchIndex().get() >= N) {
+                if (info.getMatchIndex() >= N) {
                     replicatedCount++;
                 }
             }
@@ -222,16 +229,21 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior {
         return this;
     }
 
+    @Override
     protected ClientRequestTracker removeClientRequestTracker(long logIndex) {
-
-        ClientRequestTracker toRemove = findClientRequestTracker(logIndex);
-        if(toRemove != null) {
-            trackerList.remove(toRemove);
+        final Iterator<ClientRequestTracker> it = trackerList.iterator();
+        while (it.hasNext()) {
+            final ClientRequestTracker t = it.next();
+            if (t.getIndex() == logIndex) {
+                it.remove();
+                return t;
+            }
         }
 
-        return toRemove;
+        return null;
     }
 
+    @Override
     protected ClientRequestTracker findClientRequestTracker(long logIndex) {
         for (ClientRequestTracker tracker : trackerList) {
             if (tracker.getIndex() == logIndex) {
@@ -324,8 +336,8 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior {
                     mapFollowerToSnapshot.remove(followerId);
 
                     if(LOG.isDebugEnabled()) {
-                        LOG.debug("followerToLog.get(followerId).getNextIndex().get()=" +
-                            followerToLog.get(followerId).getNextIndex().get());
+                        LOG.debug("followerToLog.get(followerId).getNextIndex()=" +
+                            followerToLog.get(followerId).getNextIndex());
                     }
 
                     if (mapFollowerToSnapshot.isEmpty()) {
@@ -376,7 +388,7 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior {
                 logIndex)
         );
 
-        if (followers.size() == 0) {
+        if (followerToLog.isEmpty()) {
             context.setCommitIndex(logIndex);
             applyLogToStateMachine(logIndex);
         } else {
@@ -386,14 +398,14 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior {
 
     private void sendAppendEntries() {
         // Send an AppendEntries to all followers
-        for (String followerId : followers) {
+        for (Entry<String, FollowerLogInformation> e : followerToLog.entrySet()) {
+            final String followerId = e.getKey();
             ActorSelection followerActor = context.getPeerActorSelection(followerId);
 
             if (followerActor != null) {
                 FollowerLogInformation followerLogInformation = followerToLog.get(followerId);
-                long followerNextIndex = followerLogInformation.getNextIndex().get();
+                long followerNextIndex = followerLogInformation.getNextIndex();
                 boolean isFollowerActive = followerLogInformation.isFollowerActive();
-                List<ReplicatedLogEntry> entries = null;
 
                 if (mapFollowerToSnapshot.get(followerId) != null) {
                     // if install snapshot is in process , then sent next chunk if possible
@@ -408,6 +420,7 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior {
                 } else {
                     long leaderLastIndex = context.getReplicatedLog().lastIndex();
                     long leaderSnapShotIndex = context.getReplicatedLog().getSnapshotIndex();
+                    final List<ReplicatedLogEntry> entries;
 
                     if (isFollowerActive &&
                         context.getReplicatedLog().isPresent(followerNextIndex)) {
@@ -473,23 +486,19 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior {
      *
      */
     private void installSnapshotIfNeeded() {
-        for (String followerId : followers) {
-            ActorSelection followerActor =
-                context.getPeerActorSelection(followerId);
+        for (Entry<String, FollowerLogInformation> e : followerToLog.entrySet()) {
+            final ActorSelection followerActor = context.getPeerActorSelection(e.getKey());
 
-            if(followerActor != null) {
-                FollowerLogInformation followerLogInformation =
-                    followerToLog.get(followerId);
-
-                long nextIndex = followerLogInformation.getNextIndex().get();
+            if (followerActor != null) {
+                long nextIndex = e.getValue().getNextIndex();
 
                 if (!context.getReplicatedLog().isPresent(nextIndex) &&
                     context.getReplicatedLog().isInSnapshot(nextIndex)) {
-                    LOG.info("{} follower needs a snapshot install", followerId);
+                    LOG.info("{} follower needs a snapshot install", e.getKey());
                     if (snapshot.isPresent()) {
                         // if a snapshot is present in the memory, most likely another install is in progress
                         // no need to capture snapshot
-                        sendSnapshotChunk(followerActor, followerId);
+                        sendSnapshotChunk(followerActor, e.getKey());
 
                     } else {
                         initiateCaptureSnapshot();
@@ -528,16 +537,15 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior {
 
 
     private void sendInstallSnapshot() {
-        for (String followerId : followers) {
-            ActorSelection followerActor = context.getPeerActorSelection(followerId);
+        for (Entry<String, FollowerLogInformation> e : followerToLog.entrySet()) {
+            ActorSelection followerActor = context.getPeerActorSelection(e.getKey());
 
-            if(followerActor != null) {
-                FollowerLogInformation followerLogInformation = followerToLog.get(followerId);
-                long nextIndex = followerLogInformation.getNextIndex().get();
+            if (followerActor != null) {
+                long nextIndex = e.getValue().getNextIndex();
 
                 if (!context.getReplicatedLog().isPresent(nextIndex) &&
                     context.getReplicatedLog().isInSnapshot(nextIndex)) {
-                    sendSnapshotChunk(followerActor, followerId);
+                    sendSnapshotChunk(followerActor, e.getKey());
                 }
             }
         }
@@ -588,7 +596,7 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior {
     }
 
     private void sendHeartBeat() {
-        if (followers.size() > 0) {
+        if (!followerToLog.isEmpty()) {
             sendAppendEntries();
         }
     }
@@ -600,7 +608,7 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior {
     }
 
     private void scheduleHeartBeat(FiniteDuration interval) {
-        if(followers.size() == 0){
+        if (followerToLog.isEmpty()) {
             // Optimization - do not bother scheduling a heartbeat as there are
             // no followers
             return;
@@ -759,17 +767,38 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior {
 
     // called from example-actor for printing the follower-states
     public String printFollowerStates() {
-        StringBuilder sb = new StringBuilder();
-        for(FollowerLogInformation followerLogInformation : followerToLog.values()) {
-            boolean isFollowerActive = followerLogInformation.isFollowerActive();
-            sb.append("{"+followerLogInformation.getId() + " state:" + isFollowerActive + "},");
+        final StringBuilder sb = new StringBuilder();
 
+        sb.append('[');
+        for (FollowerLogInformation followerLogInformation : followerToLog.values()) {
+            sb.append('{');
+            sb.append(followerLogInformation.getId());
+            sb.append(" state:");
+            sb.append(followerLogInformation.isFollowerActive());
+            sb.append("},");
         }
-        return "[" + sb.toString() + "]";
+        sb.append(']');
+
+        return sb.toString();
+    }
+
+    @VisibleForTesting
+    public FollowerLogInformation getFollower(String followerId) {
+        return followerToLog.get(followerId);
+    }
+
+    @VisibleForTesting
+    protected void setFollowerSnapshot(String followerId, FollowerToSnapshot snapshot) {
+        mapFollowerToSnapshot.put(followerId, snapshot);
+    }
+
+    @VisibleForTesting
+    public int followerSnapshotSize() {
+        return mapFollowerToSnapshot.size();
     }
 
     @VisibleForTesting
-    void markFollowerActive(String followerId) {
-        followerToLog.get(followerId).markFollowerActive();
+    public int followerLogSize() {
+        return followerToLog.size();
     }
 }
index 97ecef370f08f00c7a2041b1897498a10d42fd60..ee3cc65dddb90815aecd50771c834d3ab461ecd4 100644 (file)
@@ -5,7 +5,6 @@
  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
  * and is available at http://www.eclipse.org/legal/epl-v10.html
  */
-
 package org.opendaylight.controller.cluster.raft.behaviors;
 
 import akka.actor.ActorRef;
@@ -74,7 +73,7 @@ public class Leader extends AbstractLeader {
     }
 
     protected void scheduleInstallSnapshotCheck(FiniteDuration interval) {
-        if(followers.size() == 0){
+        if (getFollowerIds().isEmpty()) {
             // Optimization - do not bother scheduling a heartbeat as there are
             // no followers
             return;
@@ -103,7 +102,8 @@ public class Leader extends AbstractLeader {
             context.getActorSystem().dispatcher(), context.getActor());
     }
 
-    @Override public void close() throws Exception {
+    @Override
+    public void close() throws Exception {
         stopInstallSnapshotSchedule();
         stopIsolatedLeaderCheckSchedule();
         super.close();
@@ -111,11 +111,11 @@ public class Leader extends AbstractLeader {
 
     @VisibleForTesting
     void markFollowerActive(String followerId) {
-        followerToLog.get(followerId).markFollowerActive();
+        getFollower(followerId).markFollowerActive();
     }
 
     @VisibleForTesting
     void markFollowerInActive(String followerId) {
-        followerToLog.get(followerId).markFollowerInActive();
+        getFollower(followerId).markFollowerInActive();
     }
 }
index d95c9d502712159334171301868ee9af4cfa99d7..d53ccf25002dbbf407d2e4dc5dc35c0ec231c590 100644 (file)
@@ -154,16 +154,6 @@ public class AbstractReplicatedLogImplTest {
         public void removeFromAndPersist(final long index) {
         }
 
-        @Override
-        public void setSnapshotIndex(final long snapshotIndex) {
-            this.snapshotIndex = snapshotIndex;
-        }
-
-        @Override
-        public void setSnapshotTerm(final long snapshotTerm) {
-            this.snapshotTerm = snapshotTerm;
-        }
-
         @Override
         public int dataSize() {
             return -1;
index 7df9f3713ffc302dcf100c45a93b8fd034b0fffa..a092c46533d251414ee7f22cf843c4fc3765f691 100644 (file)
@@ -7,14 +7,12 @@
  */
 package org.opendaylight.controller.cluster.raft;
 
-
 import com.google.common.base.Stopwatch;
 import com.google.common.util.concurrent.Uninterruptibles;
 import org.junit.Test;
 import scala.concurrent.duration.FiniteDuration;
 
 import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicLong;
 
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
@@ -29,7 +27,7 @@ public class FollowerLogInformationImplTest {
 
         FollowerLogInformation followerLogInformation =
             new FollowerLogInformationImpl(
-                "follower1", new AtomicLong(10), new AtomicLong(9), timeoutDuration);
+                "follower1", 10, 9, timeoutDuration);
 
 
 
index 895fe35bff7588526fac71e996e9120968234af2..151015e97ec785277beaf51b0bdb4fe0681f6582 100644 (file)
@@ -67,12 +67,12 @@ public class LeaderTest extends AbstractRaftActorBehaviorTest {
         }};
     }
 
-
     @Test
     public void testThatLeaderSendsAHeartbeatMessageToAllFollowers() {
         new JavaTestKit(getSystem()) {{
 
             new Within(duration("1 seconds")) {
+                @Override
                 protected void run() {
 
                     ActorRef followerActor = getTestActor();
@@ -92,6 +92,7 @@ public class LeaderTest extends AbstractRaftActorBehaviorTest {
                     final String out =
                         new ExpectMsg<String>(duration("1 seconds"), "match hint") {
                             // do not put code outside this method, will run afterwards
+                            @Override
                             protected String match(Object in) {
                                 Object msg = fromSerializableMessage(in);
                                 if (msg instanceof AppendEntries) {
@@ -117,6 +118,7 @@ public class LeaderTest extends AbstractRaftActorBehaviorTest {
         new JavaTestKit(getSystem()) {{
 
             new Within(duration("1 seconds")) {
+                @Override
                 protected void run() {
 
                     ActorRef followerActor = getTestActor();
@@ -145,6 +147,7 @@ public class LeaderTest extends AbstractRaftActorBehaviorTest {
                     final String out =
                         new ExpectMsg<String>(duration("1 seconds"), "match hint") {
                             // do not put code outside this method, will run afterwards
+                            @Override
                             protected String match(Object in) {
                                 Object msg = fromSerializableMessage(in);
                                 if (msg instanceof AppendEntries) {
@@ -169,6 +172,7 @@ public class LeaderTest extends AbstractRaftActorBehaviorTest {
         new JavaTestKit(getSystem()) {{
 
             new Within(duration("1 seconds")) {
+                @Override
                 protected void run() {
 
                     ActorRef raftActor = getTestActor();
@@ -195,6 +199,7 @@ public class LeaderTest extends AbstractRaftActorBehaviorTest {
                         new ExpectMsg<String>(duration("1 seconds"),
                             "match hint") {
                             // do not put code outside this method, will run afterwards
+                            @Override
                             protected String match(Object in) {
                                 if (in instanceof ApplyState) {
                                     if (((ApplyState) in).getIdentifier().equals("state-id")) {
@@ -482,6 +487,7 @@ public class LeaderTest extends AbstractRaftActorBehaviorTest {
             final String out =
                 new ExpectMsg<String>(duration("1 seconds"), "match hint") {
                     // do not put code outside this method, will run afterwards
+                    @Override
                     protected String match(Object in) {
                         if (in instanceof InstallSnapshotMessages.InstallSnapshot) {
                             InstallSnapshot is = (InstallSnapshot)
@@ -562,13 +568,13 @@ public class LeaderTest extends AbstractRaftActorBehaviorTest {
 
             assertTrue(raftBehavior instanceof Leader);
 
-            assertEquals(leader.mapFollowerToSnapshot.size(), 0);
-            assertEquals(leader.followerToLog.size(), 1);
-            assertNotNull(leader.followerToLog.get(followerActor.path().toString()));
-            FollowerLogInformation fli = leader.followerToLog.get(followerActor.path().toString());
-            assertEquals(snapshotIndex, fli.getMatchIndex().get());
-            assertEquals(snapshotIndex, fli.getMatchIndex().get());
-            assertEquals(snapshotIndex + 1, fli.getNextIndex().get());
+            assertEquals(0, leader.followerSnapshotSize());
+            assertEquals(1, leader.followerLogSize());
+            assertNotNull(leader.getFollower(followerActor.path().toString()));
+            FollowerLogInformation fli = leader.getFollower(followerActor.path().toString());
+            assertEquals(snapshotIndex, fli.getMatchIndex());
+            assertEquals(snapshotIndex, fli.getMatchIndex());
+            assertEquals(snapshotIndex + 1, fli.getNextIndex());
         }};
     }
 
@@ -779,6 +785,7 @@ public class LeaderTest extends AbstractRaftActorBehaviorTest {
         return createActorContext(leaderActor);
     }
 
+    @Override
     protected RaftActorContext createActorContext(ActorRef actorRef) {
         return new MockRaftActorContext("test", getSystem(), actorRef);
     }
@@ -1180,8 +1187,7 @@ public class LeaderTest extends AbstractRaftActorBehaviorTest {
 
         public void createFollowerToSnapshot(String followerId, ByteString bs ) {
             fts = new FollowerToSnapshot(bs);
-            mapFollowerToSnapshot.put(followerId, fts);
-
+            setFollowerSnapshot(followerId, fts);
         }
     }
 
index 677b6aa65ecd8220726e384e318968f9cfe1b116..6c02001fec78baf5a2790990d9bb9419bcc9ba00 100644 (file)
@@ -9,24 +9,23 @@ package org.opendaylight.controller.sal.binding.codegen.impl;
 
 import com.google.common.base.Supplier;
 import com.google.common.collect.Iterables;
+import java.util.Map;
+import java.util.WeakHashMap;
 import javassist.ClassPool;
 import javassist.CtClass;
 import javassist.CtMethod;
 import javassist.NotFoundException;
+import javax.annotation.concurrent.GuardedBy;
 import org.opendaylight.controller.sal.binding.api.rpc.RpcRouter;
 import org.opendaylight.controller.sal.binding.codegen.RpcIsNotRoutedException;
 import org.opendaylight.controller.sal.binding.spi.NotificationInvokerFactory;
 import org.opendaylight.yangtools.sal.binding.generator.util.JavassistUtils;
+import org.opendaylight.yangtools.util.ClassLoaderUtils;
 import org.opendaylight.yangtools.yang.binding.BindingMapping;
 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
 import org.opendaylight.yangtools.yang.binding.NotificationListener;
 import org.opendaylight.yangtools.yang.binding.RpcService;
 import org.opendaylight.yangtools.yang.binding.annotations.RoutingContext;
-import org.opendaylight.yangtools.yang.binding.util.ClassLoaderUtils;
-
-import javax.annotation.concurrent.GuardedBy;
-import java.util.Map;
-import java.util.WeakHashMap;
 
 abstract class AbstractRuntimeCodeGenerator implements org.opendaylight.controller.sal.binding.codegen.RuntimeCodeGenerator, NotificationInvokerFactory {
     @GuardedBy("this")
@@ -79,7 +78,7 @@ abstract class AbstractRuntimeCodeGenerator implements org.opendaylight.controll
                      */
                     Thread.currentThread().getContextClassLoader().loadClass(routingPair.getInputType().getName());
                 } else {
-                    throw new RpcIsNotRoutedException("RPC " + method.getName() + " from "+ iface.getName() +" is not routed");
+                    throw new RpcIsNotRoutedException(String.format("RPC %s from %s is not routed", method.getName(), iface.getName()));
                 }
             }
         }
@@ -164,7 +163,7 @@ abstract class AbstractRuntimeCodeGenerator implements org.opendaylight.controll
                 try {
                     return getRpcMetadata(utils.asCtClass(serviceType));
                 } catch (ClassNotFoundException | NotFoundException e) {
-                    throw new IllegalStateException(String.format("Failed to load metadata for class {}", serviceType), e);
+                    throw new IllegalStateException(String.format("Failed to load metadata for class %s", serviceType), e);
                 }
             }
         });
index f03d07eb99c0fc2ae212cc358402ba67c17a4a74..2a6de4af0a0d2ed5eb6f1887ad7e1a26aa166f4c 100644 (file)
@@ -8,11 +8,16 @@
 
 package org.opendaylight.controller.sal.binding.impl.connect.dom;
 
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Function;
+import com.google.common.base.Optional;
+import com.google.common.collect.ImmutableList;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
 import java.lang.ref.WeakReference;
 import java.lang.reflect.Method;
 import java.util.Collections;
 import java.util.concurrent.Future;
-
 import org.opendaylight.controller.sal.core.api.RpcProvisionRegistry;
 import org.opendaylight.yangtools.yang.binding.DataContainer;
 import org.opendaylight.yangtools.yang.binding.DataObject;
@@ -26,12 +31,6 @@ import org.opendaylight.yangtools.yang.data.api.Node;
 import org.opendaylight.yangtools.yang.data.impl.ImmutableCompositeNode;
 import org.opendaylight.yangtools.yang.data.impl.codec.BindingIndependentMappingService;
 
-import com.google.common.base.Function;
-import com.google.common.base.Optional;
-import com.google.common.collect.ImmutableList;
-import com.google.common.util.concurrent.Futures;
-import com.google.common.util.concurrent.ListenableFuture;
-
 /*
  * RPC's can have both input, output, one or the other, or neither.
  *
@@ -44,6 +43,20 @@ import com.google.common.util.concurrent.ListenableFuture;
  *
  */
 public class RpcInvocationStrategy {
+    private final Function<RpcResult<CompositeNode>, RpcResult<?>> transformationFunction = new Function<RpcResult<CompositeNode>, RpcResult<?>>() {
+        @SuppressWarnings("rawtypes")
+        @Override
+        public RpcResult<?> apply(final RpcResult<CompositeNode> result) {
+            final Object output;
+            if (getOutputClass() != null && result.getResult() != null) {
+                output = mappingService.dataObjectFromDataDom(getOutputClass().get(), result.getResult());
+            } else {
+                output = null;
+            }
+
+            return RpcResultBuilder.from( (RpcResult)result ).withResult( output ).build();
+        }
+    };
 
     private final BindingIndependentMappingService mappingService;
     private final RpcProvisionRegistry biRpcRegistry;
@@ -61,26 +74,24 @@ public class RpcInvocationStrategy {
                                  final Method targetMethod,
                                  final BindingIndependentMappingService mappingService,
                                  final RpcProvisionRegistry biRpcRegistry ) {
-
+        this.mappingService = mappingService;
+        this.biRpcRegistry = biRpcRegistry;
         this.targetMethod = targetMethod;
         this.rpc = rpc;
 
-        Optional<Class<?>> outputClassOption = BindingReflections.resolveRpcOutputClass(targetMethod);
-        Optional<Class<? extends DataContainer>> inputClassOption = BindingReflections.resolveRpcInputClass(targetMethod);
-
-        if ( outputClassOption != null && outputClassOption.isPresent() ) {
-            this.outputClass = new WeakReference(outputClassOption.get() ) ;
+        final Optional<Class<?>> outputClassOption = BindingReflections.resolveRpcOutputClass(targetMethod);
+        if (outputClassOption.isPresent()) {
+            this.outputClass = new WeakReference(outputClassOption.get());
         } else {
-            this.outputClass = null ;
+            this.outputClass = null;
         }
-        if ( inputClassOption != null && inputClassOption.isPresent() ) {
-            this.inputClass = new WeakReference(inputClassOption.get() ) ;
+
+        final Optional<Class<? extends DataContainer>> inputClassOption = BindingReflections.resolveRpcInputClass(targetMethod);
+        if (inputClassOption.isPresent() ) {
+            this.inputClass = new WeakReference(inputClassOption.get());
         } else {
-            this.inputClass = null ;
+            this.inputClass = null;
         }
-
-        this.mappingService = mappingService;
-        this.biRpcRegistry = biRpcRegistry;
     }
 
     @SuppressWarnings({ "unchecked" })
@@ -98,25 +109,6 @@ public class RpcInvocationStrategy {
             inputXml = ImmutableCompositeNode.create( rpc, Collections.<Node<?>>emptyList() );
         }
 
-        Function<RpcResult<CompositeNode>, RpcResult<?>> transformationFunction =
-                                       new Function<RpcResult<CompositeNode>, RpcResult<?>>() {
-            @SuppressWarnings("rawtypes")
-            @Override
-            public RpcResult<?> apply(RpcResult<CompositeNode> result) {
-
-                Object output = null;
-
-                if( getOutputClass() != null ) {
-                    if (result.getResult() != null) {
-                        output = mappingService.dataObjectFromDataDom(getOutputClass().get(),
-                                                                    result.getResult());
-                    }
-                }
-
-                return RpcResultBuilder.from( (RpcResult)result ).withResult( output ).build();
-            }
-        };
-
         return Futures.transform(biRpcRegistry.invokeRpc(rpc, inputXml), transformationFunction);
     }
 
@@ -153,7 +145,8 @@ public class RpcInvocationStrategy {
     }
 
     @SuppressWarnings("rawtypes")
-    public WeakReference<Class> getOutputClass() {
+    @VisibleForTesting
+    WeakReference<Class> getOutputClass() {
         return outputClass;
     }
 }
index 7f7cdc650ee70c5038a7159fb4d7ea4a81a80679..56ef8ea6fc2042c45d0558eb59e8fe2e22dc2f7c 100644 (file)
 
 package org.opendaylight.controller.cluster.datastore.node.utils;
 
-import org.opendaylight.yangtools.yang.common.QName;
-import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
-
+import com.google.common.base.Splitter;
 import java.util.HashSet;
 import java.util.Set;
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
+import org.opendaylight.yangtools.yang.common.QName;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
 
 public class AugmentationIdentifierGenerator {
+    private static final Pattern PATTERN = Pattern.compile("AugmentationIdentifier\\Q{\\EchildNames=\\Q[\\E(.*)\\Q]}\\E");
+    private static final Splitter COMMA_SPLITTER = Splitter.on(',').trimResults();
+
     private final String id;
-    private static final Pattern pattern = Pattern.compile("AugmentationIdentifier\\Q{\\EchildNames=\\Q[\\E(.*)\\Q]}\\E");
     private final Matcher matcher;
     private final boolean doesMatch;
 
-    public AugmentationIdentifierGenerator(String id){
+    public AugmentationIdentifierGenerator(String id) {
         this.id = id;
-        matcher = pattern.matcher(this.id);
+        matcher = PATTERN.matcher(this.id);
         doesMatch = matcher.matches();
     }
 
-    public boolean matches(){
+    public boolean matches() {
         return doesMatch;
     }
 
-    public YangInstanceIdentifier.AugmentationIdentifier getPathArgument(){
-        Set<QName> childNames = new HashSet<QName>();
+    public YangInstanceIdentifier.AugmentationIdentifier getPathArgument() {
         final String childQNames = matcher.group(1);
 
-        final String[] splitChildQNames = childQNames.split(",");
-
-        for(String name : splitChildQNames){
-            childNames.add(
-                org.opendaylight.controller.cluster.datastore.node.utils.QNameFactory
-                    .create(name.trim()));
+        final Set<QName> childNames = new HashSet<>();
+        for (String name : COMMA_SPLITTER.split(childQNames)) {
+            childNames.add(QNameFactory.create(name));
         }
 
         return new YangInstanceIdentifier.AugmentationIdentifier(childNames);
index 6cdddfd2716c1cf0e008f47853877174ba61d4b3..930dbceab44b83f98aa54d8a8613ec80d73304f9 100644 (file)
 
 package org.opendaylight.controller.cluster.datastore.node.utils;
 
-import org.opendaylight.yangtools.yang.common.QName;
-import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
-
+import com.google.common.base.Splitter;
 import java.util.ArrayList;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Set;
+import org.opendaylight.yangtools.yang.common.QName;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
 
 public class PathUtils {
+    private static final Splitter SLASH_SPLITTER = Splitter.on('/').omitEmptyStrings();
 
     /**
      * Given a YangInstanceIdentifier return a serialized version of the same
@@ -27,17 +28,22 @@ public class PathUtils {
      * @param path
      * @return
      */
-    public static String toString(YangInstanceIdentifier path){
-        StringBuilder sb = new StringBuilder();
-        Iterator<YangInstanceIdentifier.PathArgument> iterator =
+    public static String toString(YangInstanceIdentifier path) {
+        final Iterator<YangInstanceIdentifier.PathArgument> it =
             path.getPathArguments().iterator();
+        if (!it.hasNext()) {
+            return "";
+        }
 
-        while(iterator.hasNext()){
-            sb.append(toString(iterator.next()));
-            if(iterator.hasNext()){
-                sb.append("/");
+        final StringBuilder sb = new StringBuilder();
+        for (;;) {
+            sb.append(toString(it.next()));
+            if (!it.hasNext()) {
+                break;
             }
+            sb.append('/');
         }
+
         return sb.toString();
     }
 
@@ -70,13 +76,9 @@ public class PathUtils {
      * @return
      */
     public static YangInstanceIdentifier toYangInstanceIdentifier(String path){
-        String[] segments = path.split("/");
-
         List<YangInstanceIdentifier.PathArgument> pathArguments = new ArrayList<>();
-        for (String segment : segments) {
-            if (!"".equals(segment)) {
-                pathArguments.add(NodeIdentifierFactory.getArgument(segment));
-            }
+        for (String segment : SLASH_SPLITTER.split(path)) {
+            pathArguments.add(NodeIdentifierFactory.getArgument(segment));
         }
         return YangInstanceIdentifier.create(pathArguments);
     }
index 5a8f5228619e9f80a091da123aa734fb59dcd85f..c63266ff08e218fc6180fdf1cae61e990677eace 100644 (file)
@@ -19,11 +19,12 @@ public class QNameFactory {
 
     private static final int MAX_QNAME_CACHE_SIZE = 10000;
 
-    private static LoadingCache<String, QName> cache = CacheBuilder.newBuilder()
+    private static final LoadingCache<String, QName> CACHE = CacheBuilder.newBuilder()
         .maximumSize(MAX_QNAME_CACHE_SIZE)
         .softValues()
         .build(
             new CacheLoader<String, QName>() {
+                @Override
                 public QName load(String key) {
                     return QName.create(key);
                 }
@@ -32,6 +33,6 @@ public class QNameFactory {
 
 
     public static QName create(String name){
-        return cache.getUnchecked(name);
+        return CACHE.getUnchecked(name);
     }
 }
index f562d8b35dd09995ac03d9ce2b3f4436518c8c6f..71946b0a7abe476241e3f1775ffa8e110da1d559 100644 (file)
@@ -9,11 +9,8 @@
 package org.opendaylight.controller.cluster.datastore.node.utils.serialization;
 
 import com.google.protobuf.ByteString;
-import java.math.BigDecimal;
-import java.math.BigInteger;
 import java.util.HashSet;
 import java.util.Set;
-import org.opendaylight.controller.cluster.datastore.node.utils.QNameFactory;
 import org.opendaylight.controller.cluster.datastore.util.InstanceIdentifierUtils;
 import org.opendaylight.controller.protobuff.messages.common.NormalizedNodeMessages;
 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
@@ -101,35 +98,7 @@ public class ValueSerializer {
 
 
     private static Object deSerializeBasicTypes(int valueType, String value) {
-        switch(ValueType.values()[valueType]){
-           case SHORT_TYPE: {
-               return Short.valueOf(value);
-           }
-           case BOOL_TYPE: {
-               return Boolean.valueOf(value);
-           }
-           case BYTE_TYPE: {
-               return Byte.valueOf(value);
-           }
-           case INT_TYPE : {
-                return Integer.valueOf(value);
-           }
-           case LONG_TYPE: {
-               return Long.valueOf(value);
-           }
-           case QNAME_TYPE: {
-               return QNameFactory.create(value);
-           }
-           case BIG_INTEGER_TYPE: {
-               return new BigInteger(value);
-           }
-           case BIG_DECIMAL_TYPE: {
-               return new BigDecimal(value);
-           }
-           default: {
-               return value;
-           }
-        }
+        return ValueType.values()[valueType].deserialize(value);
     }
 
 }
index 2007544b7edd96d3498d303e3ad0ffd8469ada36..b9e46a3a570fd159d1c8b9284d2efc2ec95c214b 100644 (file)
 package org.opendaylight.controller.cluster.datastore.node.utils.serialization;
 
 import com.google.common.base.Preconditions;
-import org.opendaylight.yangtools.yang.common.QName;
-import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
-
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.ImmutableMap.Builder;
 import java.math.BigDecimal;
 import java.math.BigInteger;
-import java.util.HashMap;
 import java.util.Map;
 import java.util.Set;
+import org.opendaylight.controller.cluster.datastore.node.utils.QNameFactory;
+import org.opendaylight.yangtools.yang.common.QName;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
 
 public enum ValueType {
-    SHORT_TYPE,
-    BYTE_TYPE,
-    INT_TYPE,
-    LONG_TYPE,
-    BOOL_TYPE,
-    QNAME_TYPE,
-    BITS_TYPE,
-    YANG_IDENTIFIER_TYPE,
-    STRING_TYPE,
-    BIG_INTEGER_TYPE,
-    BIG_DECIMAL_TYPE,
-    BINARY_TYPE;
+    SHORT_TYPE {
+        @Override
+        Object deserialize(final String str) {
+            return Short.valueOf(str);
+        }
+    },
+    BYTE_TYPE {
+        @Override
+        Object deserialize(final String str) {
+            return Byte.valueOf(str);
+        }
+    },
+    INT_TYPE {
+        @Override
+        Object deserialize(final String str) {
+            return Integer.valueOf(str);
+        }
+    },
+    LONG_TYPE {
+        @Override
+        Object deserialize(final String str) {
+            return Long.valueOf(str);
+        }
+    },
+    BOOL_TYPE {
+        @Override
+        Object deserialize(final String str) {
+            return Boolean.valueOf(str);
+        }
+    },
+    QNAME_TYPE {
+        @Override
+        Object deserialize(final String str) {
+            return QNameFactory.create(str);
+        }
+    },
+    BITS_TYPE {
+        @Override
+        Object deserialize(final String str) {
+            throw new UnsupportedOperationException("Should have been caught by caller");
+        }
+    },
+    YANG_IDENTIFIER_TYPE {
+        @Override
+        Object deserialize(final String str) {
+            throw new UnsupportedOperationException("Should have been caught by caller");
+        }
+    },
+    STRING_TYPE {
+        @Override
+        Object deserialize(final String str) {
+            return str;
+        }
+    },
+    BIG_INTEGER_TYPE {
+        @Override
+        Object deserialize(final String str) {
+            return new BigInteger(str);
+        }
+    },
+    BIG_DECIMAL_TYPE {
+        @Override
+        Object deserialize(final String str) {
+            return new BigDecimal(str);
+        }
+    },
+    BINARY_TYPE {
+        @Override
+        Object deserialize(final String str) {
+            throw new UnsupportedOperationException("Should have been caught by caller");
+        }
+    };
 
-    private static Map<Class<?>, ValueType> types = new HashMap<>();
+    private static final Map<Class<?>, ValueType> TYPES;
 
     static {
-        types.put(String.class, STRING_TYPE);
-        types.put(Byte.class, BYTE_TYPE);
-        types.put(Integer.class, INT_TYPE);
-        types.put(Long.class, LONG_TYPE);
-        types.put(Boolean.class, BOOL_TYPE);
-        types.put(QName.class, QNAME_TYPE);
-        types.put(Set.class, BITS_TYPE);
-        types.put(YangInstanceIdentifier.class, YANG_IDENTIFIER_TYPE);
-        types.put(Short.class,SHORT_TYPE);
-        types.put(BigInteger.class, BIG_INTEGER_TYPE);
-        types.put(BigDecimal.class, BIG_DECIMAL_TYPE);
-        types.put(byte[].class, BINARY_TYPE);
+        final Builder<Class<?>, ValueType> b = ImmutableMap.builder();
+
+        b.put(String.class, STRING_TYPE);
+        b.put(Byte.class, BYTE_TYPE);
+        b.put(Integer.class, INT_TYPE);
+        b.put(Long.class, LONG_TYPE);
+        b.put(Boolean.class, BOOL_TYPE);
+        b.put(QName.class, QNAME_TYPE);
+        b.put(YangInstanceIdentifier.class, YANG_IDENTIFIER_TYPE);
+        b.put(Short.class,SHORT_TYPE);
+        b.put(BigInteger.class, BIG_INTEGER_TYPE);
+        b.put(BigDecimal.class, BIG_DECIMAL_TYPE);
+        b.put(byte[].class, BINARY_TYPE);
+
+        TYPES = b.build();
     }
 
-    public static final ValueType getSerializableType(Object node){
+    abstract Object deserialize(String str);
+
+    public static final ValueType getSerializableType(Object node) {
         Preconditions.checkNotNull(node, "node should not be null");
 
-        ValueType type = types.get(node.getClass());
-        if(type != null) {
+        final ValueType type = TYPES.get(node.getClass());
+        if (type != null) {
             return type;
-        } else if(node instanceof Set){
+        }
+        if (node instanceof Set) {
             return BITS_TYPE;
         }
 
index e75a454d394294795c633a1d57d83ccc5661ce98..3a2d2b49b3ac90c49a7855de4f64d3502d89c2e2 100644 (file)
@@ -9,15 +9,16 @@
 package org.opendaylight.controller.cluster.datastore.node.utils.stream;
 
 import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.ImmutableMap.Builder;
 import java.math.BigDecimal;
 import java.math.BigInteger;
-import java.util.HashMap;
 import java.util.Map;
 import java.util.Set;
 import org.opendaylight.yangtools.yang.common.QName;
 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
 
-public class ValueTypes {
+final class ValueTypes {
     public static final byte SHORT_TYPE = 1;
     public static final byte BYTE_TYPE = 2;
     public static final byte INT_TYPE = 3;
@@ -31,30 +32,38 @@ public class ValueTypes {
     public static final byte BIG_DECIMAL_TYPE = 11;
     public static final byte BINARY_TYPE = 12;
 
-    private static Map<Class<?>, Byte> types = new HashMap<>();
+    private static final Map<Class<?>, Byte> TYPES;
 
     static {
-        types.put(String.class, Byte.valueOf(STRING_TYPE));
-        types.put(Byte.class, Byte.valueOf(BYTE_TYPE));
-        types.put(Integer.class, Byte.valueOf(INT_TYPE));
-        types.put(Long.class, Byte.valueOf(LONG_TYPE));
-        types.put(Boolean.class, Byte.valueOf(BOOL_TYPE));
-        types.put(QName.class, Byte.valueOf(QNAME_TYPE));
-        types.put(Set.class, Byte.valueOf(BITS_TYPE));
-        types.put(YangInstanceIdentifier.class, Byte.valueOf(YANG_IDENTIFIER_TYPE));
-        types.put(Short.class, Byte.valueOf(SHORT_TYPE));
-        types.put(BigInteger.class, Byte.valueOf(BIG_INTEGER_TYPE));
-        types.put(BigDecimal.class, Byte.valueOf(BIG_DECIMAL_TYPE));
-        types.put(byte[].class, Byte.valueOf(BINARY_TYPE));
+        final Builder<Class<?>, Byte> b = ImmutableMap.builder();
+
+        b.put(String.class, Byte.valueOf(STRING_TYPE));
+        b.put(Byte.class, Byte.valueOf(BYTE_TYPE));
+        b.put(Integer.class, Byte.valueOf(INT_TYPE));
+        b.put(Long.class, Byte.valueOf(LONG_TYPE));
+        b.put(Boolean.class, Byte.valueOf(BOOL_TYPE));
+        b.put(QName.class, Byte.valueOf(QNAME_TYPE));
+        b.put(YangInstanceIdentifier.class, Byte.valueOf(YANG_IDENTIFIER_TYPE));
+        b.put(Short.class, Byte.valueOf(SHORT_TYPE));
+        b.put(BigInteger.class, Byte.valueOf(BIG_INTEGER_TYPE));
+        b.put(BigDecimal.class, Byte.valueOf(BIG_DECIMAL_TYPE));
+        b.put(byte[].class, Byte.valueOf(BINARY_TYPE));
+
+        TYPES = b.build();
     }
 
-    public static final byte getSerializableType(Object node){
+    private ValueTypes() {
+        throw new UnsupportedOperationException("Utility class");
+    }
+
+    public static final byte getSerializableType(Object node) {
         Preconditions.checkNotNull(node, "node should not be null");
 
-        Byte type = types.get(node.getClass());
-        if(type != null) {
+        final Byte type = TYPES.get(node.getClass());
+        if (type != null) {
             return type;
-        } else if(node instanceof Set){
+        }
+        if (node instanceof Set) {
             return BITS_TYPE;
         }
 
index 061e1ab448f97ce9ad08541be9b0b1b6d4e73c30..0541e3a48bb1aab6cef1843e429ff700949d7688 100644 (file)
@@ -10,6 +10,12 @@ package org.opendaylight.controller.cluster.datastore;
 
 import com.google.common.base.Optional;
 import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableList.Builder;
+import com.google.common.collect.ImmutableListMultimap;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.ListMultimap;
 import com.typesafe.config.Config;
 import com.typesafe.config.ConfigFactory;
 import com.typesafe.config.ConfigObject;
@@ -17,7 +23,6 @@ import java.io.File;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
-import java.util.LinkedHashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
@@ -29,9 +34,9 @@ import org.slf4j.LoggerFactory;
 
 public class ConfigurationImpl implements Configuration {
 
-    private final List<ModuleShard> moduleShards = new ArrayList<>();
+    private final List<ModuleShard> moduleShards;
 
-    private final List<Module> modules = new ArrayList<>();
+    private final List<Module> modules;
 
     private static final Logger
         LOG = LoggerFactory.getLogger(DistributedDataStore.class);
@@ -44,6 +49,10 @@ public class ConfigurationImpl implements Configuration {
     // key = shardName, value = list of replicaNames (replicaNames are the same as memberNames)
     private final Map<String, List<String>> shardReplicaNames = new HashMap<>();
 
+    private final ListMultimap<String, String> moduleNameToShardName;
+    private final Map<String, ShardStrategy> moduleNameToStrategy;
+    private final Map<String, String> namespaceToModuleName;
+    private final Set<String> allShardNames;
 
     public ConfigurationImpl(final String moduleShardsConfigPath,
 
@@ -74,9 +83,51 @@ public class ConfigurationImpl implements Configuration {
             modulesConfig = ConfigFactory.load(modulesConfigPath);
         }
 
-        readModuleShards(moduleShardsConfig);
+        this.moduleShards = readModuleShards(moduleShardsConfig);
+        this.modules = readModules(modulesConfig);
 
-        readModules(modulesConfig);
+        this.allShardNames = createAllShardNames(moduleShards);
+        this.moduleNameToShardName = createModuleNameToShardName(moduleShards);
+        this.moduleNameToStrategy = createModuleNameToStrategy(modules);
+        this.namespaceToModuleName = createNamespaceToModuleName(modules);
+    }
+
+    private static Set<String> createAllShardNames(Iterable<ModuleShard> moduleShards) {
+        final com.google.common.collect.ImmutableSet.Builder<String> b = ImmutableSet.builder();
+        for(ModuleShard ms : moduleShards){
+            for(Shard s : ms.getShards()) {
+                b.add(s.getName());
+            }
+        }
+        return b.build();
+    }
+
+    private static Map<String, ShardStrategy> createModuleNameToStrategy(Iterable<Module> modules) {
+        final com.google.common.collect.ImmutableMap.Builder<String, ShardStrategy> b = ImmutableMap.builder();
+        for (Module m : modules) {
+            b.put(m.getName(), m.getShardStrategy());
+        }
+        return b.build();
+    }
+
+    private static Map<String, String> createNamespaceToModuleName(Iterable<Module> modules) {
+        final com.google.common.collect.ImmutableMap.Builder<String, String> b = ImmutableMap.builder();
+        for (Module m : modules) {
+            b.put(m.getNameSpace(), m.getName());
+        }
+        return b.build();
+    }
+
+    private static ListMultimap<String, String> createModuleNameToShardName(Iterable<ModuleShard> moduleShards) {
+        final com.google.common.collect.ImmutableListMultimap.Builder<String, String> b = ImmutableListMultimap.builder();
+
+        for (ModuleShard m : moduleShards) {
+            for (Shard s : m.getShards()) {
+                b.put(m.getModuleName(), s.getName());
+            }
+        }
+
+        return b.build();
     }
 
     @Override public List<String> getMemberShardNames(final String memberName){
@@ -104,41 +155,21 @@ public class ConfigurationImpl implements Configuration {
 
     }
 
-    @Override public Optional<String> getModuleNameFromNameSpace(final String nameSpace) {
-
+    @Override
+    public Optional<String> getModuleNameFromNameSpace(final String nameSpace) {
         Preconditions.checkNotNull(nameSpace, "nameSpace should not be null");
-
-        for(Module m : modules){
-            if(m.getNameSpace().equals(nameSpace)){
-                return Optional.of(m.getName());
-            }
-        }
-        return Optional.absent();
+        return Optional.fromNullable(namespaceToModuleName.get(nameSpace));
     }
 
-    @Override public Map<String, ShardStrategy> getModuleNameToShardStrategyMap() {
-        Map<String, ShardStrategy> map = new HashMap<>();
-        for(Module m : modules){
-            map.put(m.getName(), m.getShardStrategy());
-        }
-        return map;
+    @Override
+    public Map<String, ShardStrategy> getModuleNameToShardStrategyMap() {
+        return moduleNameToStrategy;
     }
 
-    @Override public List<String> getShardNamesFromModuleName(final String moduleName) {
-
+    @Override
+    public List<String> getShardNamesFromModuleName(final String moduleName) {
         Preconditions.checkNotNull(moduleName, "moduleName should not be null");
-
-        for(ModuleShard m : moduleShards){
-            if(m.getModuleName().equals(moduleName)){
-                List<String> l = new ArrayList<>();
-                for(Shard s : m.getShards()){
-                    l.add(s.getName());
-                }
-                return l;
-            }
-        }
-
-        return Collections.emptyList();
+        return moduleNameToShardName.get(moduleName);
     }
 
     @Override public List<String> getMembersFromShardName(final String shardName) {
@@ -162,33 +193,30 @@ public class ConfigurationImpl implements Configuration {
         return Collections.emptyList();
     }
 
-    @Override public Set<String> getAllShardNames() {
-        Set<String> shardNames = new LinkedHashSet<>();
-        for(ModuleShard ms : moduleShards){
-            for(Shard s : ms.getShards()) {
-                shardNames.add(s.getName());
-            }
-        }
-        return shardNames;
+    @Override
+    public Set<String> getAllShardNames() {
+        return allShardNames;
     }
 
-
-
-    private void readModules(final Config modulesConfig) {
+    private List<Module> readModules(final Config modulesConfig) {
         List<? extends ConfigObject> modulesConfigObjectList =
             modulesConfig.getObjectList("modules");
 
+        final Builder<Module> b = ImmutableList.builder();
         for(ConfigObject o : modulesConfigObjectList){
             ConfigObjectWrapper w = new ConfigObjectWrapper(o);
-            modules.add(new Module(w.stringValue("name"), w.stringValue(
+            b.add(new Module(w.stringValue("name"), w.stringValue(
                 "namespace"), w.stringValue("shard-strategy")));
         }
+
+        return b.build();
     }
 
-    private void readModuleShards(final Config moduleShardsConfig) {
+    private static List<ModuleShard> readModuleShards(final Config moduleShardsConfig) {
         List<? extends ConfigObject> moduleShardsConfigObjectList =
             moduleShardsConfig.getObjectList("module-shards");
 
+        final Builder<ModuleShard> b = ImmutableList.builder();
         for(ConfigObject moduleShardConfigObject : moduleShardsConfigObjectList){
 
             String moduleName = moduleShardConfigObject.get("name").unwrapped().toString();
@@ -204,12 +232,13 @@ public class ConfigurationImpl implements Configuration {
                 shards.add(new Shard(shardName, replicas));
             }
 
-            this.moduleShards.add(new ModuleShard(moduleName, shards));
+            b.add(new ModuleShard(moduleName, shards));
         }
-    }
 
+        return b.build();
+    }
 
-    private class ModuleShard {
+    private static class ModuleShard {
         private final String moduleName;
         private final List<Shard> shards;
 
@@ -227,7 +256,7 @@ public class ConfigurationImpl implements Configuration {
         }
     }
 
-    private class Shard {
+    private static class Shard {
         private final String name;
         private final List<String> replicas;
 
@@ -257,7 +286,7 @@ public class ConfigurationImpl implements Configuration {
             if(ModuleShardStrategy.NAME.equals(shardStrategy)){
                 this.shardStrategy = new ModuleShardStrategy(name, ConfigurationImpl.this);
             } else {
-                this.shardStrategy = new DefaultShardStrategy();
+                this.shardStrategy = DefaultShardStrategy.getInstance();
             }
         }
 
index f34e88fb279c0262516dfb9fa6cdd8672c65003c..9f48ef96cf517e265970b744b87af014dc51cbca 100644 (file)
@@ -25,6 +25,7 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.Semaphore;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicLong;
@@ -185,6 +186,8 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction {
     private final String transactionChainId;
     private final SchemaContext schemaContext;
     private boolean inReadyState;
+    private final Semaphore operationLimiter;
+    private final OperationCompleter operationCompleter;
 
     public TransactionProxy(ActorContext actorContext, TransactionType transactionType) {
         this(actorContext, transactionType, "");
@@ -221,6 +224,10 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction {
             phantomReferenceCache.put(cleanup, cleanup);
         }
 
+        // Note : Currently mailbox-capacity comes from akka.conf and not from the config-subsystem
+        this.operationLimiter = new Semaphore(actorContext.getTransactionOutstandingOperationLimit());
+        this.operationCompleter = new OperationCompleter(operationLimiter);
+
         LOG.debug("Created txn {} of type {} on chain {}", identifier, transactionType, transactionChainId);
     }
 
@@ -257,6 +264,8 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction {
 
         LOG.debug("Tx {} read {}", identifier, path);
 
+        throttleOperation();
+
         TransactionFutureCallback txFutureCallback = getOrCreateTxFutureCallback(path);
         return txFutureCallback.enqueueReadOperation(new ReadOperation<Optional<NormalizedNode<?, ?>>>() {
             @Override
@@ -275,6 +284,8 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction {
 
         LOG.debug("Tx {} exists {}", identifier, path);
 
+        throttleOperation();
+
         TransactionFutureCallback txFutureCallback = getOrCreateTxFutureCallback(path);
         return txFutureCallback.enqueueReadOperation(new ReadOperation<Boolean>() {
             @Override
@@ -292,6 +303,25 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction {
                 "Transaction is sealed - further modifications are not allowed");
     }
 
+    private void throttleOperation() {
+        throttleOperation(1);
+    }
+
+    private void throttleOperation(int acquirePermits) {
+        try {
+            if(!operationLimiter.tryAcquire(acquirePermits, actorContext.getDatastoreContext().getOperationTimeoutInSeconds(), TimeUnit.SECONDS)){
+                LOG.warn("Failed to acquire operation permit for transaction {}", getIdentifier());
+            }
+        } catch (InterruptedException e) {
+            if(LOG.isDebugEnabled()) {
+                LOG.debug("Interrupted when trying to acquire operation permit for transaction " + getIdentifier().toString(), e);
+            } else {
+                LOG.warn("Interrupted when trying to acquire operation permit for transaction {}", getIdentifier());
+            }
+        }
+    }
+
+
     @Override
     public void write(final YangInstanceIdentifier path, final NormalizedNode<?, ?> data) {
 
@@ -299,6 +329,8 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction {
 
         LOG.debug("Tx {} write {}", identifier, path);
 
+        throttleOperation();
+
         TransactionFutureCallback txFutureCallback = getOrCreateTxFutureCallback(path);
         txFutureCallback.enqueueModifyOperation(new TransactionOperation() {
             @Override
@@ -315,6 +347,8 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction {
 
         LOG.debug("Tx {} merge {}", identifier, path);
 
+        throttleOperation();
+
         TransactionFutureCallback txFutureCallback = getOrCreateTxFutureCallback(path);
         txFutureCallback.enqueueModifyOperation(new TransactionOperation() {
             @Override
@@ -331,6 +365,8 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction {
 
         LOG.debug("Tx {} delete {}", identifier, path);
 
+        throttleOperation();
+
         TransactionFutureCallback txFutureCallback = getOrCreateTxFutureCallback(path);
         txFutureCallback.enqueueModifyOperation(new TransactionOperation() {
             @Override
@@ -345,6 +381,8 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction {
 
         checkModificationState();
 
+        throttleOperation(txFutureCallbackMap.size());
+
         inReadyState = true;
 
         LOG.debug("Tx {} Readying {} transactions for commit", identifier,
@@ -668,7 +706,7 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction {
                     LOG.debug("Tx {} Creating NoOpTransaction because of error: {}", identifier,
                             failure.getMessage());
 
-                    localTransactionContext = new NoOpTransactionContext(failure, identifier);
+                    localTransactionContext = new NoOpTransactionContext(failure, identifier, operationLimiter);
                 } else if (response.getClass().equals(CreateTransactionReply.SERIALIZABLE_CLASS)) {
                     localTransactionContext = createValidTransactionContext(
                             CreateTransactionReply.fromSerializable(response));
@@ -676,7 +714,7 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction {
                     IllegalArgumentException exception = new IllegalArgumentException(String.format(
                         "Invalid reply type %s for CreateTransaction", response.getClass()));
 
-                    localTransactionContext = new NoOpTransactionContext(exception, identifier);
+                    localTransactionContext = new NoOpTransactionContext(exception, identifier, operationLimiter);
                 }
 
                 for(TransactionOperation oper: txOperationsOnComplete) {
@@ -713,7 +751,7 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction {
             boolean isTxActorLocal = actorContext.isPathLocal(transactionPath);
 
             return new TransactionContextImpl(transactionPath, transactionActor, identifier,
-                actorContext, schemaContext, isTxActorLocal, reply.getVersion());
+                actorContext, schemaContext, isTxActorLocal, reply.getVersion(), operationCompleter);
         }
     }
 
@@ -755,35 +793,42 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction {
         private final Logger LOG = LoggerFactory.getLogger(TransactionContextImpl.class);
 
         private final ActorContext actorContext;
-        private final SchemaContext schemaContext;
         private final String transactionPath;
         private final ActorSelection actor;
         private final boolean isTxActorLocal;
         private final short remoteTransactionVersion;
+        private final OperationCompleter operationCompleter;
+
 
         private TransactionContextImpl(String transactionPath, ActorSelection actor, TransactionIdentifier identifier,
                 ActorContext actorContext, SchemaContext schemaContext,
-                boolean isTxActorLocal, short remoteTransactionVersion) {
+                boolean isTxActorLocal, short remoteTransactionVersion, OperationCompleter operationCompleter) {
             super(identifier);
             this.transactionPath = transactionPath;
             this.actor = actor;
             this.actorContext = actorContext;
-            this.schemaContext = schemaContext;
             this.isTxActorLocal = isTxActorLocal;
             this.remoteTransactionVersion = remoteTransactionVersion;
+            this.operationCompleter = operationCompleter;
         }
 
+        private Future<Object> completeOperation(Future<Object> operationFuture){
+            operationFuture.onComplete(this.operationCompleter, actorContext.getActorSystem().dispatcher());
+            return operationFuture;
+        }
+
+
         private ActorSelection getActor() {
             return actor;
         }
 
         private Future<Object> executeOperationAsync(SerializableMessage msg) {
-            return actorContext.executeOperationAsync(getActor(), isTxActorLocal ? msg : msg.toSerializable());
+            return completeOperation(actorContext.executeOperationAsync(getActor(), isTxActorLocal ? msg : msg.toSerializable()));
         }
 
         private Future<Object> executeOperationAsync(VersionedSerializableMessage msg) {
-            return actorContext.executeOperationAsync(getActor(), isTxActorLocal ? msg :
-                msg.toSerializable(remoteTransactionVersion));
+            return completeOperation(actorContext.executeOperationAsync(getActor(), isTxActorLocal ? msg :
+                    msg.toSerializable(remoteTransactionVersion)));
         }
 
         @Override
@@ -1057,10 +1102,12 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction {
         private final Logger LOG = LoggerFactory.getLogger(NoOpTransactionContext.class);
 
         private final Throwable failure;
+        private final Semaphore operationLimiter;
 
-        public NoOpTransactionContext(Throwable failure, TransactionIdentifier identifier){
+        public NoOpTransactionContext(Throwable failure, TransactionIdentifier identifier, Semaphore operationLimiter){
             super(identifier);
             this.failure = failure;
+            this.operationLimiter = operationLimiter;
         }
 
         @Override
@@ -1071,28 +1118,33 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction {
         @Override
         public Future<ActorSelection> readyTransaction() {
             LOG.debug("Tx {} readyTransaction called", identifier);
+            operationLimiter.release();
             return akka.dispatch.Futures.failed(failure);
         }
 
         @Override
         public void deleteData(YangInstanceIdentifier path) {
             LOG.debug("Tx {} deleteData called path = {}", identifier, path);
+            operationLimiter.release();
         }
 
         @Override
         public void mergeData(YangInstanceIdentifier path, NormalizedNode<?, ?> data) {
             LOG.debug("Tx {} mergeData called path = {}", identifier, path);
+            operationLimiter.release();
         }
 
         @Override
         public void writeData(YangInstanceIdentifier path, NormalizedNode<?, ?> data) {
             LOG.debug("Tx {} writeData called path = {}", identifier, path);
+            operationLimiter.release();
         }
 
         @Override
         public CheckedFuture<Optional<NormalizedNode<?, ?>>, ReadFailedException> readData(
                 YangInstanceIdentifier path) {
             LOG.debug("Tx {} readData called path = {}", identifier, path);
+            operationLimiter.release();
             return Futures.immediateFailedCheckedFuture(new ReadFailedException(
                     "Error reading data for path " + path, failure));
         }
@@ -1101,8 +1153,21 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction {
         public CheckedFuture<Boolean, ReadFailedException> dataExists(
                 YangInstanceIdentifier path) {
             LOG.debug("Tx {} dataExists called path = {}", identifier, path);
+            operationLimiter.release();
             return Futures.immediateFailedCheckedFuture(new ReadFailedException(
                     "Error checking exists for path " + path, failure));
         }
     }
+
+    private static class OperationCompleter extends OnComplete<Object> {
+        private final Semaphore operationLimiter;
+        OperationCompleter(Semaphore operationLimiter){
+            this.operationLimiter = operationLimiter;
+        }
+
+        @Override
+        public void onComplete(Throwable throwable, Object o){
+            this.operationLimiter.release();
+        }
+    }
 }
index 55c682b86000187e10e2376b3f69e94a87772d73..6a3de4256b0b968345b8c01764f15146923e0f9a 100644 (file)
@@ -16,13 +16,21 @@ import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
  *   The default shard stores data for all modules for which a specific set of shards has not been configured
  * </p>
  */
-public class DefaultShardStrategy implements ShardStrategy{
+public final class DefaultShardStrategy implements ShardStrategy {
+    public static final String NAME = "default";
+    public static final String DEFAULT_SHARD = "default";
+    private static final DefaultShardStrategy INSTANCE = new DefaultShardStrategy();
 
-  public static final String NAME = "default";
-  public static final String DEFAULT_SHARD = "default";
+    private DefaultShardStrategy() {
+        // Hidden to force a singleton instnace
+    }
 
-  @Override
-  public String findShard(YangInstanceIdentifier path) {
-    return DEFAULT_SHARD;
-  }
+    public static DefaultShardStrategy getInstance() {
+        return INSTANCE;
+    }
+
+    @Override
+    public String findShard(YangInstanceIdentifier path) {
+        return DEFAULT_SHARD;
+    }
 }
index fc7ebd94dd481c909ba6e4d65d205e41cbfdebbf..e9ecf7eac30f3e64851e2225ad4a8838d02060e6 100644 (file)
@@ -8,11 +8,10 @@
 
 package org.opendaylight.controller.cluster.datastore.shardstrategy;
 
+import java.util.List;
 import org.opendaylight.controller.cluster.datastore.Configuration;
 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
 
-import java.util.List;
-
 public class ModuleShardStrategy implements ShardStrategy {
 
     public static final String NAME = "module";
@@ -26,10 +25,11 @@ public class ModuleShardStrategy implements ShardStrategy {
         this.configuration = configuration;
     }
 
-    @Override public String findShard(YangInstanceIdentifier path) {
+    @Override
+    public String findShard(YangInstanceIdentifier path) {
         List<String> shardNames =
             configuration.getShardNamesFromModuleName(moduleName);
-        if(shardNames.size() == 0){
+        if (shardNames.isEmpty()) {
             return DefaultShardStrategy.DEFAULT_SHARD;
         }
         return shardNames.get(0);
index 62fb65482b09d366289107f93df14050340688c4..fd786c903e87e4bc44136bf629977316250f79a3 100644 (file)
@@ -36,7 +36,7 @@ public class ShardStrategyFactory {
         String moduleName = getModuleName(path);
         ShardStrategy shardStrategy = moduleNameToStrategyMap.get(moduleName);
         if (shardStrategy == null) {
-            return new DefaultShardStrategy();
+            return DefaultShardStrategy.getInstance();
         }
 
         return shardStrategy;
index f217d05bb21a12e6f92add47da5536c6f6fe12d9..c9fdf389311f73c70ca2e0f16dae8e86b7cc0a05 100644 (file)
@@ -8,6 +8,7 @@
 
 package org.opendaylight.controller.cluster.datastore.utils;
 
+import static akka.pattern.Patterns.ask;
 import akka.actor.ActorPath;
 import akka.actor.ActorRef;
 import akka.actor.ActorSelection;
@@ -21,6 +22,7 @@ import com.google.common.base.Optional;
 import com.google.common.base.Preconditions;
 import com.google.common.base.Strings;
 import java.util.concurrent.TimeUnit;
+import org.opendaylight.controller.cluster.common.actor.CommonConfig;
 import org.opendaylight.controller.cluster.datastore.ClusterWrapper;
 import org.opendaylight.controller.cluster.datastore.Configuration;
 import org.opendaylight.controller.cluster.datastore.DatastoreContext;
@@ -45,8 +47,6 @@ import scala.concurrent.Future;
 import scala.concurrent.duration.Duration;
 import scala.concurrent.duration.FiniteDuration;
 
-import static akka.pattern.Patterns.ask;
-
 /**
  * The ActorContext class contains utility methods which could be used by
  * non-actors (like DistributedDataStore) to work with actors a little more
@@ -84,6 +84,7 @@ public class ActorContext {
     private final FiniteDuration operationDuration;
     private final Timeout operationTimeout;
     private final String selfAddressHostPort;
+    private final int transactionOutstandingOperationLimit;
 
     public ActorContext(ActorSystem actorSystem, ActorRef shardManager,
             ClusterWrapper clusterWrapper, Configuration configuration) {
@@ -110,6 +111,8 @@ public class ActorContext {
         } else {
             selfAddressHostPort = null;
         }
+
+        transactionOutstandingOperationLimit = new CommonConfig(this.getActorSystem().settings().config()).getMailBoxCapacity();
     }
 
     public DatastoreContext getDatastoreContext() {
@@ -431,4 +434,16 @@ public class ActorContext {
 
         return builder.toString();
     }
+
+    /**
+     * Get the maximum number of operations that are to be permitted within a transaction before the transaction
+     * should begin throttling the operations
+     *
+     * Parking reading this configuration here because we need to get to the actor system settings
+     *
+     * @return
+     */
+    public int getTransactionOutstandingOperationLimit(){
+        return transactionOutstandingOperationLimit;
+    }
 }
index ce0547c3883d19a12c01b8d9a9ce355770aead3b..dd37371a4510c622b4ee0fdb3b5ab5cf67b5bf7f 100644 (file)
@@ -11,7 +11,6 @@
 package org.opendaylight.controller.cluster.datastore;
 
 import static org.mockito.Matchers.anyObject;
-import static org.mockito.Mockito.doReturn;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
@@ -19,19 +18,21 @@ import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
 import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
+import org.opendaylight.controller.cluster.datastore.utils.MockActorContext;
 import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadTransaction;
 import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadWriteTransaction;
 import org.opendaylight.controller.sal.core.spi.data.DOMStoreTransaction;
 import org.opendaylight.controller.sal.core.spi.data.DOMStoreWriteTransaction;
 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
 
-public class TransactionChainProxyTest {
-    ActorContext actorContext = mock(ActorContext.class);
+public class TransactionChainProxyTest extends AbstractActorTest{
+    ActorContext actorContext = null;
     SchemaContext schemaContext = mock(SchemaContext.class);
 
     @Before
     public void setUp() {
-        doReturn(schemaContext).when(actorContext).getSchemaContext();
+        actorContext = new MockActorContext(getSystem());
+        actorContext.setSchemaContext(schemaContext);
     }
 
     @SuppressWarnings("resource")
index 5e53b29db13f7fff0accf1397dc691a1f071d8a6..79edd19bba3328034ea313baa28333ba398226af 100644 (file)
@@ -10,6 +10,7 @@ import static org.mockito.Matchers.argThat;
 import static org.mockito.Matchers.eq;
 import static org.mockito.Matchers.isA;
 import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
 import static org.opendaylight.controller.cluster.datastore.TransactionProxy.TransactionType.READ_ONLY;
@@ -30,6 +31,7 @@ import java.io.IOException;
 import java.util.List;
 import java.util.concurrent.TimeUnit;
 import org.junit.AfterClass;
+import org.junit.Assert;
 import org.junit.Before;
 import org.junit.BeforeClass;
 import org.junit.Test;
@@ -59,6 +61,7 @@ import org.opendaylight.controller.cluster.datastore.shardstrategy.ShardStrategy
 import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
 import org.opendaylight.controller.cluster.datastore.utils.DoNothingActor;
 import org.opendaylight.controller.cluster.datastore.utils.MockConfiguration;
+import org.opendaylight.controller.md.cluster.datastore.model.CarsModel;
 import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
 import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
 import org.opendaylight.controller.protobuff.messages.transaction.ShardTransactionMessages;
@@ -118,7 +121,7 @@ public class TransactionProxyTest {
 
         schemaContext = TestModel.createTestContext();
 
-        DatastoreContext dataStoreContext = DatastoreContext.newBuilder().build();
+        DatastoreContext dataStoreContext = DatastoreContext.newBuilder().operationTimeoutInSeconds(2).build();
 
         doReturn(getSystem()).when(mockActorContext).getActorSystem();
         doReturn(memberName).when(mockActorContext).getCurrentMemberName();
@@ -126,6 +129,7 @@ public class TransactionProxyTest {
         doReturn(mockClusterWrapper).when(mockActorContext).getClusterWrapper();
         doReturn(mockClusterWrapper).when(mockActorContext).getClusterWrapper();
         doReturn(dataStoreContext).when(mockActorContext).getDatastoreContext();
+        doReturn(10).when(mockActorContext).getTransactionOutstandingOperationLimit();
 
         ShardStrategyFactory.setConfiguration(configuration);
     }
@@ -358,6 +362,10 @@ public class TransactionProxyTest {
         return mergeSerializedDataReply(DataStoreVersions.CURRENT_VERSION);
     }
 
+    private Future<Object> incompleteFuture(){
+        return mock(Future.class);
+    }
+
     private Future<MergeDataReply> mergeDataReply() {
         return Futures.successful(new MergeDataReply());
     }
@@ -395,6 +403,10 @@ public class TransactionProxyTest {
                 executeOperationAsync(eq(actorSystem.actorSelection(actorRef.path())),
                         eqCreateTransaction(memberName, type));
 
+        doReturn(false).when(mockActorContext).isPathLocal(actorRef.path().toString());
+
+        doReturn(10).when(mockActorContext).getTransactionOutstandingOperationLimit();
+
         return actorRef;
     }
 
@@ -1222,4 +1234,425 @@ public class TransactionProxyTest {
 
         verifyCohortFutures(proxy, getSystem().actorSelection(shardActorRef.path()));
     }
+
+    private static interface TransactionProxyOperation {
+        void run(TransactionProxy transactionProxy);
+    }
+
+    private void throttleOperation(TransactionProxyOperation operation) {
+        throttleOperation(operation, 1, true);
+    }
+
+    private void throttleOperation(TransactionProxyOperation operation, int outstandingOpsLimit, boolean shardFound){
+        ActorSystem actorSystem = getSystem();
+        ActorRef shardActorRef = actorSystem.actorOf(Props.create(DoNothingActor.class));
+
+        doReturn(outstandingOpsLimit).when(mockActorContext).getTransactionOutstandingOperationLimit();
+
+        doReturn(actorSystem.actorSelection(shardActorRef.path())).
+                when(mockActorContext).actorSelection(shardActorRef.path().toString());
+
+        if(shardFound) {
+            doReturn(Futures.successful(actorSystem.actorSelection(shardActorRef.path()))).
+                    when(mockActorContext).findPrimaryShardAsync(eq(DefaultShardStrategy.DEFAULT_SHARD));
+        } else {
+            doReturn(Futures.failed(new Exception("not found")))
+                    .when(mockActorContext).findPrimaryShardAsync(eq(DefaultShardStrategy.DEFAULT_SHARD));
+        }
+
+        String actorPath = "akka.tcp://system@127.0.0.1:2550/user/tx-actor";
+        CreateTransactionReply createTransactionReply = CreateTransactionReply.newBuilder()
+                .setTransactionId("txn-1")
+                .setTransactionActorPath(actorPath)
+                .build();
+
+        doReturn(Futures.successful(createTransactionReply)).when(mockActorContext).
+                executeOperationAsync(eq(actorSystem.actorSelection(shardActorRef.path())),
+                        eqCreateTransaction(memberName, READ_WRITE));
+
+        doReturn(true).when(mockActorContext).isPathLocal(actorPath);
+
+        TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, READ_WRITE);
+
+        long start = System.currentTimeMillis();
+
+        operation.run(transactionProxy);
+
+        long end = System.currentTimeMillis();
+
+        Assert.assertTrue(String.format("took less time than expected %s was %s",
+                mockActorContext.getDatastoreContext().getOperationTimeoutInSeconds()*1000,
+                (end-start)), (end - start) > mockActorContext.getDatastoreContext().getOperationTimeoutInSeconds()*1000);
+
+    }
+
+    private void completeOperation(TransactionProxyOperation operation){
+        completeOperation(operation, true);
+    }
+
+    private void completeOperation(TransactionProxyOperation operation, boolean shardFound){
+        ActorSystem actorSystem = getSystem();
+        ActorRef shardActorRef = actorSystem.actorOf(Props.create(DoNothingActor.class));
+
+        doReturn(1).when(mockActorContext).getTransactionOutstandingOperationLimit();
+
+        doReturn(actorSystem.actorSelection(shardActorRef.path())).
+                when(mockActorContext).actorSelection(shardActorRef.path().toString());
+
+        if(shardFound) {
+            doReturn(Futures.successful(actorSystem.actorSelection(shardActorRef.path()))).
+                    when(mockActorContext).findPrimaryShardAsync(eq(DefaultShardStrategy.DEFAULT_SHARD));
+        } else {
+            doReturn(Futures.failed(new Exception("not found")))
+                    .when(mockActorContext).findPrimaryShardAsync(eq(DefaultShardStrategy.DEFAULT_SHARD));
+        }
+
+        String actorPath = "akka.tcp://system@127.0.0.1:2550/user/tx-actor";
+        CreateTransactionReply createTransactionReply = CreateTransactionReply.newBuilder()
+                .setTransactionId("txn-1")
+                .setTransactionActorPath(actorPath)
+                .build();
+
+        doReturn(Futures.successful(createTransactionReply)).when(mockActorContext).
+                executeOperationAsync(eq(actorSystem.actorSelection(shardActorRef.path())),
+                        eqCreateTransaction(memberName, READ_WRITE));
+
+        doReturn(true).when(mockActorContext).isPathLocal(actorPath);
+
+        TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, READ_WRITE);
+
+        long start = System.currentTimeMillis();
+
+        operation.run(transactionProxy);
+
+        long end = System.currentTimeMillis();
+
+        Assert.assertTrue(String.format("took more time than expected %s was %s",
+                mockActorContext.getDatastoreContext().getOperationTimeoutInSeconds()*1000,
+                (end-start)), (end - start) <= mockActorContext.getDatastoreContext().getOperationTimeoutInSeconds()*1000);
+    }
+
+    public void testWriteThrottling(boolean shardFound){
+
+        throttleOperation(new TransactionProxyOperation() {
+            @Override
+            public void run(TransactionProxy transactionProxy) {
+                NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
+
+                doReturn(incompleteFuture()).when(mockActorContext).executeOperationAsync(
+                        any(ActorSelection.class), eqWriteData(nodeToWrite));
+
+                transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
+
+                transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
+            }
+        }, 1, shardFound);
+    }
+
+    @Test
+    public void testWriteThrottlingWhenShardFound(){
+        throttleOperation(new TransactionProxyOperation() {
+            @Override
+            public void run(TransactionProxy transactionProxy) {
+                NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
+
+                doReturn(incompleteFuture()).when(mockActorContext).executeOperationAsync(
+                        any(ActorSelection.class), eqWriteData(nodeToWrite));
+
+                transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
+
+                transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
+            }
+        });
+
+    }
+
+    @Test
+    public void testWriteThrottlingWhenShardNotFound(){
+        // Confirm that there is no throttling when the Shard is not found
+        completeOperation(new TransactionProxyOperation() {
+            @Override
+            public void run(TransactionProxy transactionProxy) {
+                NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
+
+                doReturn(incompleteFuture()).when(mockActorContext).executeOperationAsync(
+                        any(ActorSelection.class), eqWriteData(nodeToWrite));
+
+                transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
+
+                transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
+            }
+        }, false);
+
+    }
+
+
+    @Test
+    public void testWriteCompletion(){
+        completeOperation(new TransactionProxyOperation() {
+            @Override
+            public void run(TransactionProxy transactionProxy) {
+                NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
+
+                doReturn(writeSerializedDataReply()).when(mockActorContext).executeOperationAsync(
+                        any(ActorSelection.class), eqSerializedWriteData(nodeToWrite));
+
+                transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
+
+                transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
+            }
+        });
+
+    }
+
+    @Test
+    public void testMergeThrottlingWhenShardFound(){
+
+        throttleOperation(new TransactionProxyOperation() {
+            @Override
+            public void run(TransactionProxy transactionProxy) {
+                NormalizedNode<?, ?> nodeToMerge = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
+
+                doReturn(incompleteFuture()).when(mockActorContext).executeOperationAsync(
+                        any(ActorSelection.class), eqMergeData(nodeToMerge));
+
+                transactionProxy.merge(TestModel.TEST_PATH, nodeToMerge);
+
+                transactionProxy.merge(TestModel.TEST_PATH, nodeToMerge);
+            }
+        });
+    }
+
+    @Test
+    public void testMergeThrottlingWhenShardNotFound(){
+
+        completeOperation(new TransactionProxyOperation() {
+            @Override
+            public void run(TransactionProxy transactionProxy) {
+                NormalizedNode<?, ?> nodeToMerge = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
+
+                doReturn(incompleteFuture()).when(mockActorContext).executeOperationAsync(
+                        any(ActorSelection.class), eqMergeData(nodeToMerge));
+
+                transactionProxy.merge(TestModel.TEST_PATH, nodeToMerge);
+
+                transactionProxy.merge(TestModel.TEST_PATH, nodeToMerge);
+            }
+        }, false);
+    }
+
+    @Test
+    public void testMergeCompletion(){
+        completeOperation(new TransactionProxyOperation() {
+            @Override
+            public void run(TransactionProxy transactionProxy) {
+                NormalizedNode<?, ?> nodeToMerge = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
+
+                doReturn(mergeDataReply()).when(mockActorContext).executeOperationAsync(
+                        any(ActorSelection.class), eqMergeData(nodeToMerge));
+
+                transactionProxy.merge(TestModel.TEST_PATH, nodeToMerge);
+
+                transactionProxy.merge(TestModel.TEST_PATH, nodeToMerge);
+            }
+        });
+
+    }
+
+    @Test
+    public void testDeleteThrottlingWhenShardFound(){
+
+        throttleOperation(new TransactionProxyOperation() {
+            @Override
+            public void run(TransactionProxy transactionProxy) {
+                doReturn(incompleteFuture()).when(mockActorContext).executeOperationAsync(
+                        any(ActorSelection.class), eqDeleteData());
+
+                transactionProxy.delete(TestModel.TEST_PATH);
+
+                transactionProxy.delete(TestModel.TEST_PATH);
+            }
+        });
+    }
+
+
+    @Test
+    public void testDeleteThrottlingWhenShardNotFound(){
+
+        completeOperation(new TransactionProxyOperation() {
+            @Override
+            public void run(TransactionProxy transactionProxy) {
+                doReturn(incompleteFuture()).when(mockActorContext).executeOperationAsync(
+                        any(ActorSelection.class), eqDeleteData());
+
+                transactionProxy.delete(TestModel.TEST_PATH);
+
+                transactionProxy.delete(TestModel.TEST_PATH);
+            }
+        }, false);
+    }
+
+    @Test
+    public void testDeleteCompletion(){
+        completeOperation(new TransactionProxyOperation() {
+            @Override
+            public void run(TransactionProxy transactionProxy) {
+                doReturn(deleteDataReply()).when(mockActorContext).executeOperationAsync(
+                        any(ActorSelection.class), eqDeleteData());
+
+                transactionProxy.delete(TestModel.TEST_PATH);
+
+                transactionProxy.delete(TestModel.TEST_PATH);
+            }
+        });
+
+    }
+
+    @Test
+    public void testReadThrottlingWhenShardFound(){
+
+        throttleOperation(new TransactionProxyOperation() {
+            @Override
+            public void run(TransactionProxy transactionProxy) {
+                doReturn(incompleteFuture()).when(mockActorContext).executeOperationAsync(
+                        any(ActorSelection.class), eqReadData());
+
+                transactionProxy.read(TestModel.TEST_PATH);
+
+                transactionProxy.read(TestModel.TEST_PATH);
+            }
+        });
+    }
+
+    @Test
+    public void testReadThrottlingWhenShardNotFound(){
+
+        completeOperation(new TransactionProxyOperation() {
+            @Override
+            public void run(TransactionProxy transactionProxy) {
+                doReturn(incompleteFuture()).when(mockActorContext).executeOperationAsync(
+                        any(ActorSelection.class), eqReadData());
+
+                transactionProxy.read(TestModel.TEST_PATH);
+
+                transactionProxy.read(TestModel.TEST_PATH);
+            }
+        }, false);
+    }
+
+
+    @Test
+    public void testReadCompletion(){
+        completeOperation(new TransactionProxyOperation() {
+            @Override
+            public void run(TransactionProxy transactionProxy) {
+                NormalizedNode<?, ?> nodeToRead = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
+
+                doReturn(readDataReply(nodeToRead)).when(mockActorContext).executeOperationAsync(
+                        any(ActorSelection.class), eqReadData());
+
+                transactionProxy.read(TestModel.TEST_PATH);
+
+                transactionProxy.read(TestModel.TEST_PATH);
+            }
+        });
+
+    }
+
+    @Test
+    public void testExistsThrottlingWhenShardFound(){
+
+        throttleOperation(new TransactionProxyOperation() {
+            @Override
+            public void run(TransactionProxy transactionProxy) {
+                doReturn(incompleteFuture()).when(mockActorContext).executeOperationAsync(
+                        any(ActorSelection.class), eqDataExists());
+
+                transactionProxy.exists(TestModel.TEST_PATH);
+
+                transactionProxy.exists(TestModel.TEST_PATH);
+            }
+        });
+    }
+
+    @Test
+    public void testExistsThrottlingWhenShardNotFound(){
+
+        completeOperation(new TransactionProxyOperation() {
+            @Override
+            public void run(TransactionProxy transactionProxy) {
+                doReturn(incompleteFuture()).when(mockActorContext).executeOperationAsync(
+                        any(ActorSelection.class), eqDataExists());
+
+                transactionProxy.exists(TestModel.TEST_PATH);
+
+                transactionProxy.exists(TestModel.TEST_PATH);
+            }
+        }, false);
+    }
+
+
+    @Test
+    public void testExistsCompletion(){
+        completeOperation(new TransactionProxyOperation() {
+            @Override
+            public void run(TransactionProxy transactionProxy) {
+                doReturn(dataExistsReply(true)).when(mockActorContext).executeOperationAsync(
+                        any(ActorSelection.class), eqDataExists());
+
+                transactionProxy.exists(TestModel.TEST_PATH);
+
+                transactionProxy.exists(TestModel.TEST_PATH);
+            }
+        });
+
+    }
+
+    @Test
+    public void testReadyThrottling(){
+
+        throttleOperation(new TransactionProxyOperation() {
+            @Override
+            public void run(TransactionProxy transactionProxy) {
+                NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
+
+                doReturn(incompleteFuture()).when(mockActorContext).executeOperationAsync(
+                        any(ActorSelection.class), eqWriteData(nodeToWrite));
+
+                doReturn(incompleteFuture()).when(mockActorContext).executeOperationAsync(
+                        any(ActorSelection.class), any(ReadyTransaction.class));
+
+                transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
+
+                transactionProxy.ready();
+            }
+        });
+    }
+
+    @Test
+    public void testReadyThrottlingWithTwoTransactionContexts(){
+
+        throttleOperation(new TransactionProxyOperation() {
+            @Override
+            public void run(TransactionProxy transactionProxy) {
+                NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
+                NormalizedNode<?, ?> carsNode = ImmutableNodes.containerNode(CarsModel.BASE_QNAME);
+
+                doReturn(writeDataReply()).when(mockActorContext).executeOperationAsync(
+                        any(ActorSelection.class), eqWriteData(nodeToWrite));
+
+                doReturn(incompleteFuture()).when(mockActorContext).executeOperationAsync(
+                        any(ActorSelection.class), eqWriteData(carsNode));
+
+                doReturn(incompleteFuture()).when(mockActorContext).executeOperationAsync(
+                        any(ActorSelection.class), any(ReadyTransaction.class));
+
+                transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
+
+                transactionProxy.write(TestModel.TEST_PATH, carsNode);
+
+                transactionProxy.ready();
+            }
+        }, 2, true);
+    }
 }
index c065782af5056c8e0f57b973f8a099e761d5a322..5842d566ab2bbca843214288e6450d3850694778 100644 (file)
@@ -5,10 +5,9 @@ import org.junit.Test;
 import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
 
 public class DefaultShardStrategyTest {
-
-  @Test
-  public void testFindShard() throws Exception {
-    String shard = new DefaultShardStrategy().findShard(TestModel.TEST_PATH);
-    Assert.assertEquals(DefaultShardStrategy.DEFAULT_SHARD, shard);
-  }
+    @Test
+    public void testFindShard() throws Exception {
+        String shard = DefaultShardStrategy.getInstance().findShard(TestModel.TEST_PATH);
+        Assert.assertEquals(DefaultShardStrategy.DEFAULT_SHARD, shard);
+    }
 }
\ No newline at end of file
index fcb0324bea77e1608dbd8b6d3d7f2077d2c27c4c..e4ab969f5c4351c0e5b3894d3d3115aa6322337a 100644 (file)
@@ -1,8 +1,10 @@
 package org.opendaylight.controller.cluster.datastore.utils;
 
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.mock;
 import akka.actor.ActorRef;
 import akka.actor.ActorSelection;
-import akka.actor.ActorSystem;
 import akka.actor.Address;
 import akka.actor.Props;
 import akka.actor.UntypedActor;
@@ -21,10 +23,6 @@ import scala.concurrent.Await;
 import scala.concurrent.Future;
 import scala.concurrent.duration.Duration;
 
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-import static org.mockito.Mockito.mock;
-
 public class ActorContextTest extends AbstractActorTest{
 
     private static class MockShardManager extends UntypedActor {
@@ -224,7 +222,7 @@ public class ActorContextTest extends AbstractActorTest{
     @Test
     public void testResolvePathForRemoteActor() {
         ActorContext actorContext =
-                new ActorContext(mock(ActorSystem.class), mock(ActorRef.class), mock(
+                new ActorContext(getSystem(), mock(ActorRef.class), mock(
                         ClusterWrapper.class),
                         mock(Configuration.class));
 
index e571e3a715a7b0b845fbb1ea52b7fda096da1f32..67fa0960cbcb96cc2f617148aab2c7a7a7bab71d 100644 (file)
@@ -20,6 +20,7 @@ public class TestModel {
 
   public static final QName TEST_QNAME = QName.create("urn:opendaylight:params:xml:ns:yang:controller:md:sal:dom:store:test", "2014-03-13",
           "test");
+
   public static final QName OUTER_LIST_QNAME = QName.create(TEST_QNAME, "outer-list");
   public static final QName INNER_LIST_QNAME = QName.create(TEST_QNAME, "inner-list");
   public static final QName OUTER_CHOICE_QNAME = QName.create(TEST_QNAME, "outer-choice");