Keep DataChange registrations and notifications local 27/9627/2
authorMoiz Raja <moraja@cisco.com>
Sun, 3 Aug 2014 13:36:13 +0000 (06:36 -0700)
committerMoiz Raja <moraja@cisco.com>
Sun, 3 Aug 2014 22:36:45 +0000 (15:36 -0700)
Here is how it works
--------------------
When a consumer registers for a datachange notification we first check if the Shard
to which this registration needs to be sent is local. If it is then we send the
registration to it. If it is not then we simply return a NoOpRegistration to the consumer.
This ensures that a DataChange registration stays local to a cluster member.

Now let's say we have 3 replicas and in all those replicas we do find a local shard for a
given module and we have 3 listener registrations. At this time we do not want all 3 members
to be notified of the change. We only want the node which is the Leader to notify it's listeners
of the change. To restrict that the DataChangeListener will only forward events to the listener
if the Shard sends it a message enabling notifications

Change-Id: I035bbdf34e2509047f4bcb475c443a761b08e68d
Signed-off-by: Moiz Raja <moraja@cisco.com>
19 files changed:
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/example/ExampleActor.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActor.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DataChangeListener.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DistributedDataStore.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/NoOpDataChangeListenerRegistration.java [new file with mode: 0644]
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardManager.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/EnableNotification.java [new file with mode: 0644]
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/FindLocalShard.java [new file with mode: 0644]
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/LocalShardFound.java [new file with mode: 0644]
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/LocalShardNotFound.java [new file with mode: 0644]
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/utils/ActorContext.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DataChangeListenerTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DistributedDataStoreTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardManagerTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/utils/ActorContextTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/utils/EchoActor.java [new file with mode: 0644]
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/utils/MockActorContext.java

index aa100df9d0517dfa014a78054e2319d8b1bd34fe..cbd7ca2d70f5dc090a1e842b75c200cb0c1976b9 100644 (file)
@@ -92,6 +92,10 @@ public class ExampleActor extends RaftActor {
         LOG.debug("Snapshot applied to state :" + ((HashMap) snapshot).size());
     }
 
         LOG.debug("Snapshot applied to state :" + ((HashMap) snapshot).size());
     }
 
+    @Override protected void onStateChanged() {
+
+    }
+
     @Override public void onReceiveRecover(Object message) {
         super.onReceiveRecover(message);
     }
     @Override public void onReceiveRecover(Object message) {
         super.onReceiveRecover(message);
     }
index 70b85b4627dc707b0223f8a7bdae37dfab1441f1..caa0e507c1b0ec408745150863920f720c94a1cb 100644 (file)
@@ -341,6 +341,13 @@ public abstract class RaftActor extends UntypedPersistentActor {
      */
     protected abstract void applySnapshot(Object snapshot);
 
      */
     protected abstract void applySnapshot(Object snapshot);
 
+    /**
+     * This method will be called by the RaftActor when the state of the
+     * RaftActor changes. The derived actor can then use methods like
+     * isLeader or getLeader to do something useful
+     */
+    protected abstract void onStateChanged();
+
     private RaftActorBehavior switchBehavior(RaftState state) {
         if (currentBehavior != null) {
             if (currentBehavior.state() == state) {
     private RaftActorBehavior switchBehavior(RaftState state) {
         if (currentBehavior != null) {
             if (currentBehavior.state() == state) {
@@ -367,6 +374,9 @@ public abstract class RaftActor extends UntypedPersistentActor {
         } else {
             behavior = new Leader(context);
         }
         } else {
             behavior = new Leader(context);
         }
+
+        onStateChanged();
+
         return behavior;
     }
 
         return behavior;
     }
 
index 3af6f56a2c78fe40ddd9cfa60ac5fe7bd60348c9..b435eda7a381560320825d44458580e3c165a403 100644 (file)
@@ -12,6 +12,7 @@ import akka.actor.Props;
 import akka.japi.Creator;
 import org.opendaylight.controller.cluster.datastore.messages.DataChanged;
 import org.opendaylight.controller.cluster.datastore.messages.DataChangedReply;
 import akka.japi.Creator;
 import org.opendaylight.controller.cluster.datastore.messages.DataChanged;
 import org.opendaylight.controller.cluster.datastore.messages.DataChangedReply;
+import org.opendaylight.controller.cluster.datastore.messages.EnableNotification;
 import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeEvent;
 import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeListener;
 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
 import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeEvent;
 import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeListener;
 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
@@ -22,6 +23,7 @@ public class DataChangeListener extends AbstractUntypedActor {
     private final AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>> listener;
     private final SchemaContext schemaContext;
     private final YangInstanceIdentifier pathId;
     private final AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>> listener;
     private final SchemaContext schemaContext;
     private final YangInstanceIdentifier pathId;
+    private boolean notificationsEnabled = false;
 
     public DataChangeListener(SchemaContext schemaContext,
                               AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>> listener, YangInstanceIdentifier pathId) {
 
     public DataChangeListener(SchemaContext schemaContext,
                               AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>> listener, YangInstanceIdentifier pathId) {
@@ -32,15 +34,30 @@ public class DataChangeListener extends AbstractUntypedActor {
 
     @Override public void handleReceive(Object message) throws Exception {
         if(message.getClass().equals(DataChanged.SERIALIZABLE_CLASS)){
 
     @Override public void handleReceive(Object message) throws Exception {
         if(message.getClass().equals(DataChanged.SERIALIZABLE_CLASS)){
-            DataChanged reply = DataChanged.fromSerialize(schemaContext,message, pathId);
-            AsyncDataChangeEvent<YangInstanceIdentifier, NormalizedNode<?, ?>>
-                change = reply.getChange();
-            this.listener.onDataChanged(change);
+            dataChanged(message);
+        } else if(message instanceof EnableNotification){
+            enableNotification((EnableNotification) message);
+        }
+    }
 
 
-            if(getSender() != null){
-                getSender().tell(new DataChangedReply().toSerializable(), getSelf());
-            }
+    private void enableNotification(EnableNotification message) {
+        notificationsEnabled = message.isEnabled();
+    }
+
+    public void dataChanged(Object message) {
+
+        // Do nothing if notifications are not enabled
+        if(!notificationsEnabled){
+            return;
+        }
+
+        DataChanged reply = DataChanged.fromSerialize(schemaContext,message, pathId);
+        AsyncDataChangeEvent<YangInstanceIdentifier, NormalizedNode<?, ?>>
+            change = reply.getChange();
+        this.listener.onDataChanged(change);
 
 
+        if(getSender() != null){
+            getSender().tell(new DataChangedReply().toSerializable(), getSelf());
         }
     }
 
         }
     }
 
index d21ea51c2a8feca7e9665dd1303f3288ec3de834..780f28f358ec327f5ec6ab9cace63f769695e55a 100644 (file)
@@ -86,18 +86,30 @@ public class DistributedDataStore implements DOMStore, SchemaContextListener, Au
 
         String shardName = ShardStrategyFactory.getStrategy(path).findShard(path);
 
 
         String shardName = ShardStrategyFactory.getStrategy(path).findShard(path);
 
-        Object result = actorContext.executeShardOperation(shardName,
+        Object result = actorContext.executeLocalShardOperation(shardName,
             new RegisterChangeListener(path, dataChangeListenerActor.path(),
                 scope).toSerializable(),
             ActorContext.ASK_DURATION
         );
 
             new RegisterChangeListener(path, dataChangeListenerActor.path(),
                 scope).toSerializable(),
             ActorContext.ASK_DURATION
         );
 
-        RegisterChangeListenerReply reply = RegisterChangeListenerReply.fromSerializable(actorContext.getActorSystem(),result);
-        return new DataChangeListenerRegistrationProxy(actorContext.actorSelection(reply.getListenerRegistrationPath()), listener, dataChangeListenerActor);
+        if (result != null) {
+            RegisterChangeListenerReply reply = RegisterChangeListenerReply
+                .fromSerializable(actorContext.getActorSystem(), result);
+            return new DataChangeListenerRegistrationProxy(actorContext
+                .actorSelection(reply.getListenerRegistrationPath()), listener,
+                dataChangeListenerActor);
+        }
+
+        LOG.debug(
+            "No local shard for shardName {} was found so returning a noop registration",
+            shardName);
+        return new NoOpDataChangeListenerRegistration(listener);
     }
 
 
 
     }
 
 
 
+
+
     @Override
     public DOMStoreTransactionChain createTransactionChain() {
         return new TransactionChainProxy(actorContext, executor, schemaContext);
     @Override
     public DOMStoreTransactionChain createTransactionChain() {
         return new TransactionChainProxy(actorContext, executor, schemaContext);
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/NoOpDataChangeListenerRegistration.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/NoOpDataChangeListenerRegistration.java
new file mode 100644 (file)
index 0000000..14af31e
--- /dev/null
@@ -0,0 +1,46 @@
+/*
+ * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.cluster.datastore;
+
+import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeListener;
+import org.opendaylight.yangtools.concepts.ListenerRegistration;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
+import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
+
+/**
+ * When a consumer registers a data change listener and no local shard is
+ * available to register that listener with then we return an instance of
+ * NoOpDataChangeListenerRegistration
+ *
+ * <p>
+ *
+ * The NoOpDataChangeListenerRegistration as it's name suggests does
+ * nothing when an operation is invoked on it
+ */
+public class NoOpDataChangeListenerRegistration
+    implements ListenerRegistration {
+
+    private final AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>>
+        listener;
+
+    public <L extends AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>>> NoOpDataChangeListenerRegistration(
+        AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>> listener) {
+
+        this.listener = listener;
+    }
+
+    @Override
+    public AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>> getInstance() {
+        return listener;
+    }
+
+    @Override public void close() {
+
+    }
+}
index 46f09217d04a3119f97b4ae25b66737a22690caa..23e27c9f5ff909b2c9b887aa5f7687b92d7cda10 100644 (file)
@@ -27,6 +27,7 @@ import org.opendaylight.controller.cluster.datastore.messages.CreateTransaction;
 import org.opendaylight.controller.cluster.datastore.messages.CreateTransactionChain;
 import org.opendaylight.controller.cluster.datastore.messages.CreateTransactionChainReply;
 import org.opendaylight.controller.cluster.datastore.messages.CreateTransactionReply;
 import org.opendaylight.controller.cluster.datastore.messages.CreateTransactionChain;
 import org.opendaylight.controller.cluster.datastore.messages.CreateTransactionChainReply;
 import org.opendaylight.controller.cluster.datastore.messages.CreateTransactionReply;
+import org.opendaylight.controller.cluster.datastore.messages.EnableNotification;
 import org.opendaylight.controller.cluster.datastore.messages.ForwardedCommitTransaction;
 import org.opendaylight.controller.cluster.datastore.messages.PeerAddressResolved;
 import org.opendaylight.controller.cluster.datastore.messages.RegisterChangeListener;
 import org.opendaylight.controller.cluster.datastore.messages.ForwardedCommitTransaction;
 import org.opendaylight.controller.cluster.datastore.messages.PeerAddressResolved;
 import org.opendaylight.controller.cluster.datastore.messages.RegisterChangeListener;
@@ -44,7 +45,9 @@ import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
 
 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
 
+import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Executors;
 import java.util.Map;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Executors;
@@ -80,6 +83,8 @@ public class Shard extends RaftActor {
 
     private final ShardStats shardMBean;
 
 
     private final ShardStats shardMBean;
 
+    private final List<ActorSelection> dataChangeListeners = new ArrayList<>();
+
     private Shard(String name, Map<String, String> peerAddresses) {
         super(name, peerAddresses);
 
     private Shard(String name, Map<String, String> peerAddresses) {
         super(name, peerAddresses);
 
@@ -228,6 +233,16 @@ public class Shard extends RaftActor {
             .system().actorSelection(
                 registerChangeListener.getDataChangeListenerPath());
 
             .system().actorSelection(
                 registerChangeListener.getDataChangeListenerPath());
 
+
+        // Notify the listener if notifications should be enabled or not
+        // If this shard is the leader then it will enable notifications else
+        // it will not
+        dataChangeListenerPath.tell(new EnableNotification(isLeader()), getSelf());
+
+        // Now store a reference to the data change listener so it can be notified
+        // at a later point if notifications should be enabled or disabled
+        dataChangeListeners.add(dataChangeListenerPath);
+
         AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>>
             listener = new DataChangeListenerProxy(schemaContext,dataChangeListenerPath);
 
         AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>>
             listener = new DataChangeListenerProxy(schemaContext,dataChangeListenerPath);
 
@@ -285,6 +300,12 @@ public class Shard extends RaftActor {
         throw new UnsupportedOperationException("applySnapshot");
     }
 
         throw new UnsupportedOperationException("applySnapshot");
     }
 
+    @Override protected void onStateChanged() {
+        for(ActorSelection dataChangeListener : dataChangeListeners){
+            dataChangeListener.tell(new EnableNotification(isLeader()), getSelf());
+        }
+    }
+
     @Override public String persistenceId() {
         return this.name;
     }
     @Override public String persistenceId() {
         return this.name;
     }
index 5fbce4cd98900be65053939ca7a51d4f7a929a98..64c6821120f94f99a389c12700757a7b8c7266f5 100644 (file)
@@ -18,7 +18,10 @@ import akka.cluster.ClusterEvent;
 import akka.japi.Creator;
 import akka.japi.Function;
 import com.google.common.base.Preconditions;
 import akka.japi.Creator;
 import akka.japi.Function;
 import com.google.common.base.Preconditions;
+import org.opendaylight.controller.cluster.datastore.messages.FindLocalShard;
 import org.opendaylight.controller.cluster.datastore.messages.FindPrimary;
 import org.opendaylight.controller.cluster.datastore.messages.FindPrimary;
+import org.opendaylight.controller.cluster.datastore.messages.LocalShardFound;
+import org.opendaylight.controller.cluster.datastore.messages.LocalShardNotFound;
 import org.opendaylight.controller.cluster.datastore.messages.PeerAddressResolved;
 import org.opendaylight.controller.cluster.datastore.messages.PrimaryFound;
 import org.opendaylight.controller.cluster.datastore.messages.PrimaryNotFound;
 import org.opendaylight.controller.cluster.datastore.messages.PeerAddressResolved;
 import org.opendaylight.controller.cluster.datastore.messages.PrimaryFound;
 import org.opendaylight.controller.cluster.datastore.messages.PrimaryNotFound;
@@ -31,35 +34,27 @@ import java.util.Map;
 
 /**
  * The ShardManager has the following jobs,
 
 /**
  * The ShardManager has the following jobs,
- * <p>
+ * <ul>
  * <li> Create all the local shard replicas that belong on this cluster member
  * <li> Create all the local shard replicas that belong on this cluster member
+ * <li> Find the address of the local shard
  * <li> Find the primary replica for any given shard
  * <li> Find the primary replica for any given shard
- * <li> Engage in shard replica elections which decide which replica should be the primary
- * </p>
- * <p/>
- * <h3>>Creation of Shard replicas</h3
- * <p>
- * When the ShardManager is constructed it reads the cluster configuration to find out which shard replicas
- * belong on this member. It finds out the name of the current cluster member from the Akka Clustering Service.
- * </p>
- * <p/>
- * <h3> Replica Elections </h3>
- * <p/>
- * <p>
- * The Shard Manager uses multiple cues to initiate election.
- * <li> When a member of the cluster dies
- * <li> When a local shard replica dies
- * <li> When a local shard replica comes alive
- * </p>
+ * <li> Monitor the cluster members and store their addresses
+ * <ul>
  */
 public class ShardManager extends AbstractUntypedActor {
 
     // Stores a mapping between a member name and the address of the member
  */
 public class ShardManager extends AbstractUntypedActor {
 
     // Stores a mapping between a member name and the address of the member
+    // Member names look like "member-1", "member-2" etc and are as specified
+    // in configuration
     private final Map<String, Address> memberNameToAddress = new HashMap<>();
 
     private final Map<String, Address> memberNameToAddress = new HashMap<>();
 
+    // Stores a mapping between a shard name and it's corresponding information
+    // Shard names look like inventory, topology etc and are as specified in
+    // configuration
     private final Map<String, ShardInformation> localShards = new HashMap<>();
 
     private final Map<String, ShardInformation> localShards = new HashMap<>();
 
-
+    // The type of a ShardManager reflects the type of the datastore itself
+    // A data store could be of type config/operational
     private final String type;
 
     private final ClusterWrapper cluster;
     private final String type;
 
     private final ClusterWrapper cluster;
@@ -102,7 +97,8 @@ public class ShardManager extends AbstractUntypedActor {
         if (message.getClass().equals(FindPrimary.SERIALIZABLE_CLASS)) {
             findPrimary(
                 FindPrimary.fromSerializable(message));
         if (message.getClass().equals(FindPrimary.SERIALIZABLE_CLASS)) {
             findPrimary(
                 FindPrimary.fromSerializable(message));
-
+        } else if(message instanceof FindLocalShard){
+            findLocalShard((FindLocalShard) message);
         } else if (message instanceof UpdateSchemaContext) {
             updateSchemaContext(message);
         } else if (message instanceof ClusterEvent.MemberUp){
         } else if (message instanceof UpdateSchemaContext) {
             updateSchemaContext(message);
         } else if (message instanceof ClusterEvent.MemberUp){
@@ -117,6 +113,18 @@ public class ShardManager extends AbstractUntypedActor {
 
     }
 
 
     }
 
+    private void findLocalShard(FindLocalShard message) {
+        ShardInformation shardInformation =
+            localShards.get(message.getShardName());
+
+        if(shardInformation != null){
+            getSender().tell(new LocalShardFound(shardInformation.getActor()), getSelf());
+            return;
+        }
+
+        getSender().tell(new LocalShardNotFound(message.getShardName()), getSelf());
+    }
+
     private void ignoreMessage(Object message){
         LOG.debug("Unhandled message : " + message);
     }
     private void ignoreMessage(Object message){
         LOG.debug("Unhandled message : " + message);
     }
@@ -137,6 +145,11 @@ public class ShardManager extends AbstractUntypedActor {
         }
     }
 
         }
     }
 
+    /**
+     * Notifies all the local shards of a change in the schema context
+     *
+     * @param message
+     */
     private void updateSchemaContext(Object message) {
         for(ShardInformation info : localShards.values()){
             info.getActor().tell(message,getSelf());
     private void updateSchemaContext(Object message) {
         for(ShardInformation info : localShards.values()){
             info.getActor().tell(message,getSelf());
@@ -180,10 +193,7 @@ public class ShardManager extends AbstractUntypedActor {
         getSender().tell(new PrimaryNotFound(shardName).toSerializable(), getSelf());
     }
 
         getSender().tell(new PrimaryNotFound(shardName).toSerializable(), getSelf());
     }
 
-    private String
-
-
-    getShardActorPath(String shardName, String memberName) {
+    private String getShardActorPath(String shardName, String memberName) {
         Address address = memberNameToAddress.get(memberName);
         if(address != null) {
             return address.toString() + "/user/shardmanager-" + this.type + "/"
         Address address = memberNameToAddress.get(memberName);
         if(address != null) {
             return address.toString() + "/user/shardmanager-" + this.type + "/"
@@ -193,11 +203,23 @@ public class ShardManager extends AbstractUntypedActor {
         return null;
     }
 
         return null;
     }
 
+    /**
+     * Construct the name of the shard actor given the name of the member on
+     * which the shard resides and the name of the shard
+     *
+     * @param memberName
+     * @param shardName
+     * @return
+     */
     private String getShardActorName(String memberName, String shardName){
         return memberName + "-shard-" + shardName + "-" + this.type;
     }
 
     private String getShardActorName(String memberName, String shardName){
         return memberName + "-shard-" + shardName + "-" + this.type;
     }
 
-    // Create the shards that are local to this member
+    /**
+     * Create shards that are local to the member on which the ShardManager
+     * runs
+     *
+     */
     private void createLocalShards() {
         String memberName = this.cluster.getCurrentMemberName();
         List<String> memberShardNames =
     private void createLocalShards() {
         String memberName = this.cluster.getCurrentMemberName();
         List<String> memberShardNames =
@@ -214,6 +236,12 @@ public class ShardManager extends AbstractUntypedActor {
 
     }
 
 
     }
 
+    /**
+     * Given the name of the shard find the addresses of all it's peers
+     *
+     * @param shardName
+     * @return
+     */
     private Map<String, String> getPeerAddresses(String shardName){
 
         Map<String, String> peerAddresses = new HashMap<>();
     private Map<String, String> getPeerAddresses(String shardName){
 
         Map<String, String> peerAddresses = new HashMap<>();
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/EnableNotification.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/EnableNotification.java
new file mode 100644 (file)
index 0000000..67dab7e
--- /dev/null
@@ -0,0 +1,21 @@
+/*
+ * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.cluster.datastore.messages;
+
+public class EnableNotification {
+    private final boolean enabled;
+
+    public EnableNotification(boolean enabled) {
+        this.enabled = enabled;
+    }
+
+    public boolean isEnabled() {
+        return enabled;
+    }
+}
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/FindLocalShard.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/FindLocalShard.java
new file mode 100644 (file)
index 0000000..c415db6
--- /dev/null
@@ -0,0 +1,25 @@
+/*
+ * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.cluster.datastore.messages;
+
+/**
+ * FindLocalShard is a message that should be sent to the {@link org.opendaylight.controller.cluster.datastore.ShardManager}
+ * when we need to find a reference to a LocalShard
+ */
+public class FindLocalShard {
+    private final String shardName;
+
+    public FindLocalShard(String shardName) {
+        this.shardName = shardName;
+    }
+
+    public String getShardName() {
+        return shardName;
+    }
+}
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/LocalShardFound.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/LocalShardFound.java
new file mode 100644 (file)
index 0000000..feea38f
--- /dev/null
@@ -0,0 +1,27 @@
+/*
+ * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.cluster.datastore.messages;
+
+import akka.actor.ActorRef;
+
+/**
+ * LocalShardFound is a message that is sent by the {@link org.opendaylight.controller.cluster.datastore.ShardManager}
+ * when it finds a shard with the specified name in it's local shard registry
+ */
+public class LocalShardFound {
+    private final ActorRef path;
+
+    public LocalShardFound(ActorRef path) {
+        this.path = path;
+    }
+
+    public ActorRef getPath() {
+        return path;
+    }
+}
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/LocalShardNotFound.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/LocalShardNotFound.java
new file mode 100644 (file)
index 0000000..f6c6634
--- /dev/null
@@ -0,0 +1,29 @@
+/*
+ * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.cluster.datastore.messages;
+
+/**
+ * LocalShardNotFound is a message that is sent by the {@link org.opendaylight.controller.cluster.datastore.ShardManager}
+ * when it cannot locate a shard in it's local registry with the shardName specified
+ */
+public class LocalShardNotFound {
+    private final String shardName;
+
+    /**
+     *
+     * @param shardName the name of the shard that could not be found
+     */
+    public LocalShardNotFound(String shardName) {
+        this.shardName = shardName;
+    }
+
+    public String getShardName() {
+        return shardName;
+    }
+}
index ac0893da5ad157c230fad2aab7401b27d0054751..4706c66e2594eae1384b465bf5d0b246c72d8223 100644 (file)
@@ -18,7 +18,9 @@ import org.opendaylight.controller.cluster.datastore.ClusterWrapper;
 import org.opendaylight.controller.cluster.datastore.Configuration;
 import org.opendaylight.controller.cluster.datastore.exceptions.PrimaryNotFoundException;
 import org.opendaylight.controller.cluster.datastore.exceptions.TimeoutException;
 import org.opendaylight.controller.cluster.datastore.Configuration;
 import org.opendaylight.controller.cluster.datastore.exceptions.PrimaryNotFoundException;
 import org.opendaylight.controller.cluster.datastore.exceptions.TimeoutException;
+import org.opendaylight.controller.cluster.datastore.messages.FindLocalShard;
 import org.opendaylight.controller.cluster.datastore.messages.FindPrimary;
 import org.opendaylight.controller.cluster.datastore.messages.FindPrimary;
+import org.opendaylight.controller.cluster.datastore.messages.LocalShardFound;
 import org.opendaylight.controller.cluster.datastore.messages.PrimaryFound;
 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
 import org.slf4j.Logger;
 import org.opendaylight.controller.cluster.datastore.messages.PrimaryFound;
 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
 import org.slf4j.Logger;
@@ -91,6 +93,29 @@ public class ActorContext {
         return actorSystem.actorSelection(path);
     }
 
         return actorSystem.actorSelection(path);
     }
 
+    /**
+     * Finds a local shard given it's shard name and return it's ActorRef
+     *
+     * @param shardName the name of the local shard that needs to be found
+     * @return a reference to a local shard actor which represents the shard
+     *         specified by the shardName
+     */
+    public ActorRef findLocalShard(String shardName) {
+        Object result = executeLocalOperation(shardManager,
+            new FindLocalShard(shardName), ASK_DURATION);
+
+        if (result instanceof LocalShardFound) {
+            LocalShardFound found = (LocalShardFound) result;
+
+            LOG.debug("Local shard found {}", found.getPath());
+
+            return found.getPath();
+        }
+
+        return null;
+    }
+
+
     public String findPrimaryPath(String shardName) {
         Object result = executeLocalOperation(shardManager,
             new FindPrimary(shardName).toSerializable(), ASK_DURATION);
     public String findPrimaryPath(String shardName) {
         Object result = executeLocalOperation(shardManager,
             new FindPrimary(shardName).toSerializable(), ASK_DURATION);
@@ -170,6 +195,34 @@ public class ActorContext {
         return executeRemoteOperation(primary, message, duration);
     }
 
         return executeRemoteOperation(primary, message, duration);
     }
 
+    /**
+     * Execute an operation on the the local shard only
+     * <p>
+     *     This method first finds the address of the local shard if any. It then
+     *     executes the operation on it.
+     * </p>
+     *
+     * @param shardName the name of the shard on which the operation needs to be executed
+     * @param message the message that needs to be sent to the shard
+     * @param duration the time duration in which this operation should complete
+     * @return the message that was returned by the local actor on which the
+     *         the operation was executed. If a local shard was not found then
+     *         null is returned
+     * @throws org.opendaylight.controller.cluster.datastore.exceptions.TimeoutException
+     *         if the operation does not complete in a specified time duration
+     */
+    public Object executeLocalShardOperation(String shardName, Object message,
+        FiniteDuration duration) {
+        ActorRef local = findLocalShard(shardName);
+
+        if(local != null) {
+            return executeLocalOperation(local, message, duration);
+        }
+
+        return null;
+    }
+
+
     public void shutdown() {
         shardManager.tell(PoisonPill.getInstance(), null);
         actorSystem.shutdown();
     public void shutdown() {
         shardManager.tell(PoisonPill.getInstance(), null);
         actorSystem.shutdown();
index fd610322201f1ffb168640d413f5afe0c8d743d7..c4ec8b45fc2726cfda7911287b0c59b6b82f9edd 100644 (file)
@@ -6,6 +6,7 @@ import akka.testkit.JavaTestKit;
 import org.junit.Test;
 import org.opendaylight.controller.cluster.datastore.messages.DataChanged;
 import org.opendaylight.controller.cluster.datastore.messages.DataChangedReply;
 import org.junit.Test;
 import org.opendaylight.controller.cluster.datastore.messages.DataChanged;
 import org.opendaylight.controller.cluster.datastore.messages.DataChangedReply;
+import org.opendaylight.controller.cluster.datastore.messages.EnableNotification;
 import org.opendaylight.controller.md.cluster.datastore.model.CompositeModel;
 import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeEvent;
 import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeListener;
 import org.opendaylight.controller.md.cluster.datastore.model.CompositeModel;
 import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeEvent;
 import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeListener;
@@ -86,24 +87,28 @@ public class DataChangeListenerTest extends AbstractActorTest {
     }
 
     @Test
     }
 
     @Test
-    public void testDataChanged(){
+    public void testDataChangedWhenNotificationsAreEnabled(){
         new JavaTestKit(getSystem()) {{
             final MockDataChangeListener listener = new MockDataChangeListener();
             final Props props = DataChangeListener.props(CompositeModel.createTestContext(),listener,CompositeModel.FAMILY_PATH );
             final ActorRef subject =
         new JavaTestKit(getSystem()) {{
             final MockDataChangeListener listener = new MockDataChangeListener();
             final Props props = DataChangeListener.props(CompositeModel.createTestContext(),listener,CompositeModel.FAMILY_PATH );
             final ActorRef subject =
-                getSystem().actorOf(props, "testDataChanged");
+                getSystem().actorOf(props, "testDataChangedNotificationsEnabled");
 
             new Within(duration("1 seconds")) {
                 protected void run() {
 
 
             new Within(duration("1 seconds")) {
                 protected void run() {
 
+                    // Let the DataChangeListener know that notifications should
+                    // be enabled
+                    subject.tell(new EnableNotification(true), getRef());
+
                     subject.tell(
                         new DataChanged(CompositeModel.createTestContext(),new MockDataChangedEvent()).toSerializable(),
                         getRef());
 
                     subject.tell(
                         new DataChanged(CompositeModel.createTestContext(),new MockDataChangedEvent()).toSerializable(),
                         getRef());
 
-                    final Boolean out = new ExpectMsg<Boolean>("dataChanged") {
+                    final Boolean out = new ExpectMsg<Boolean>(duration("800 millis"), "dataChanged") {
                         // do not put code outside this method, will run afterwards
                         protected Boolean match(Object in) {
                         // do not put code outside this method, will run afterwards
                         protected Boolean match(Object in) {
-                            if (in.getClass().equals(DataChangedReply.SERIALIZABLE_CLASS)) {
+                            if (in != null && in.getClass().equals(DataChangedReply.SERIALIZABLE_CLASS)) {
 
                                 return true;
                             } else {
 
                                 return true;
                             } else {
@@ -115,7 +120,30 @@ public class DataChangeListenerTest extends AbstractActorTest {
                     assertTrue(out);
                     assertTrue(listener.gotIt());
                     assertNotNull(listener.getChange().getCreatedData());
                     assertTrue(out);
                     assertTrue(listener.gotIt());
                     assertNotNull(listener.getChange().getCreatedData());
-                    // Will wait for the rest of the 3 seconds
+
+                    expectNoMsg();
+                }
+
+
+            };
+        }};
+    }
+
+    @Test
+    public void testDataChangedWhenNotificationsAreDisabled(){
+        new JavaTestKit(getSystem()) {{
+            final MockDataChangeListener listener = new MockDataChangeListener();
+            final Props props = DataChangeListener.props(CompositeModel.createTestContext(),listener,CompositeModel.FAMILY_PATH );
+            final ActorRef subject =
+                getSystem().actorOf(props, "testDataChangedNotificationsDisabled");
+
+            new Within(duration("1 seconds")) {
+                protected void run() {
+
+                    subject.tell(
+                        new DataChanged(CompositeModel.createTestContext(),new MockDataChangedEvent()).toSerializable(),
+                        getRef());
+
                     expectNoMsg();
                 }
 
                     expectNoMsg();
                 }
 
index 23a1ed49315ec827baa95c41b51dfe4d23159d4e..03191f70f1ab5979d8bcc6189b2ddece6616c867 100644 (file)
@@ -54,8 +54,8 @@ public class DistributedDataStoreTest extends AbstractActorTest{
     }
 
     @org.junit.Test
     }
 
     @org.junit.Test
-    public void testRegisterChangeListener() throws Exception {
-        mockActorContext.setExecuteShardOperationResponse(new RegisterChangeListenerReply(doNothingActorRef.path()).toSerializable());
+    public void testRegisterChangeListenerWhenShardIsNotLocal() throws Exception {
+
         ListenerRegistration registration =
                 distributedDataStore.registerChangeListener(TestModel.TEST_PATH, new AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>>() {
             @Override
         ListenerRegistration registration =
                 distributedDataStore.registerChangeListener(TestModel.TEST_PATH, new AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>>() {
             @Override
@@ -64,9 +64,31 @@ public class DistributedDataStoreTest extends AbstractActorTest{
             }
         }, AsyncDataBroker.DataChangeScope.BASE);
 
             }
         }, AsyncDataBroker.DataChangeScope.BASE);
 
+        // Since we do not expect the shard to be local registration will return a NoOpRegistration
+        Assert.assertTrue(registration instanceof NoOpDataChangeListenerRegistration);
+
+        Assert.assertNotNull(registration);
+    }
+
+    @org.junit.Test
+    public void testRegisterChangeListenerWhenShardIsLocal() throws Exception {
+
+        mockActorContext.setExecuteLocalShardOperationResponse(new RegisterChangeListenerReply(doNothingActorRef.path()).toSerializable());
+
+        ListenerRegistration registration =
+            distributedDataStore.registerChangeListener(TestModel.TEST_PATH, new AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>>() {
+                @Override
+                public void onDataChanged(AsyncDataChangeEvent<YangInstanceIdentifier, NormalizedNode<?, ?>> change) {
+                    throw new UnsupportedOperationException("onDataChanged");
+                }
+            }, AsyncDataBroker.DataChangeScope.BASE);
+
+        Assert.assertTrue(registration instanceof DataChangeListenerRegistrationProxy);
+
         Assert.assertNotNull(registration);
     }
 
         Assert.assertNotNull(registration);
     }
 
+
     @org.junit.Test
     public void testCreateTransactionChain() throws Exception {
         final DOMStoreTransactionChain transactionChain = distributedDataStore.createTransactionChain();
     @org.junit.Test
     public void testCreateTransactionChain() throws Exception {
         final DOMStoreTransactionChain transactionChain = distributedDataStore.createTransactionChain();
index 87d257a3f217bf0bfac0b3c8ade1cbe8d4555b04..268ed3c27383f3254eafec08e74c8f287456456a 100644 (file)
@@ -1,5 +1,6 @@
 package org.opendaylight.controller.cluster.datastore;
 
 package org.opendaylight.controller.cluster.datastore;
 
+import akka.actor.ActorRef;
 import akka.actor.ActorSystem;
 import akka.actor.Props;
 import akka.testkit.JavaTestKit;
 import akka.actor.ActorSystem;
 import akka.actor.Props;
 import akka.testkit.JavaTestKit;
@@ -8,13 +9,19 @@ import junit.framework.Assert;
 import org.junit.AfterClass;
 import org.junit.BeforeClass;
 import org.junit.Test;
 import org.junit.AfterClass;
 import org.junit.BeforeClass;
 import org.junit.Test;
+import org.opendaylight.controller.cluster.datastore.messages.FindLocalShard;
 import org.opendaylight.controller.cluster.datastore.messages.FindPrimary;
 import org.opendaylight.controller.cluster.datastore.messages.FindPrimary;
+import org.opendaylight.controller.cluster.datastore.messages.LocalShardFound;
+import org.opendaylight.controller.cluster.datastore.messages.LocalShardNotFound;
 import org.opendaylight.controller.cluster.datastore.messages.PrimaryFound;
 import org.opendaylight.controller.cluster.datastore.messages.PrimaryNotFound;
 import org.opendaylight.controller.cluster.datastore.utils.MockClusterWrapper;
 import org.opendaylight.controller.cluster.datastore.utils.MockConfiguration;
 import scala.concurrent.duration.Duration;
 
 import org.opendaylight.controller.cluster.datastore.messages.PrimaryFound;
 import org.opendaylight.controller.cluster.datastore.messages.PrimaryNotFound;
 import org.opendaylight.controller.cluster.datastore.utils.MockClusterWrapper;
 import org.opendaylight.controller.cluster.datastore.utils.MockConfiguration;
 import scala.concurrent.duration.Duration;
 
+import static junit.framework.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
 public class ShardManagerTest {
     private static ActorSystem system;
 
 public class ShardManagerTest {
     private static ActorSystem system;
 
@@ -47,7 +54,6 @@ public class ShardManagerTest {
                     expectMsgEquals(Duration.Zero(),
                         new PrimaryNotFound("inventory").toSerializable());
 
                     expectMsgEquals(Duration.Zero(),
                         new PrimaryNotFound("inventory").toSerializable());
 
-                    // Will wait for the rest of the 3 seconds
                     expectNoMsg();
                 }
             };
                     expectNoMsg();
                 }
             };
@@ -64,7 +70,6 @@ public class ShardManagerTest {
             final TestActorRef<ShardManager> subject =
                 TestActorRef.create(system, props);
 
             final TestActorRef<ShardManager> subject =
                 TestActorRef.create(system, props);
 
-            // the run() method needs to finish within 3 seconds
             new Within(duration("1 seconds")) {
                 protected void run() {
 
             new Within(duration("1 seconds")) {
                 protected void run() {
 
@@ -78,6 +83,75 @@ public class ShardManagerTest {
         }};
     }
 
         }};
     }
 
+    @Test
+    public void testOnReceiveFindLocalShardForNonExistentShard() throws Exception {
+
+        new JavaTestKit(system) {{
+            final Props props = ShardManager
+                .props("config", new MockClusterWrapper(),
+                    new MockConfiguration());
+            final TestActorRef<ShardManager> subject =
+                TestActorRef.create(system, props);
+
+            new Within(duration("1 seconds")) {
+                protected void run() {
+
+                    subject.tell(new FindLocalShard("inventory"), getRef());
+
+                    final String out = new ExpectMsg<String>(duration("1 seconds"), "find local") {
+                        protected String match(Object in) {
+                            if (in instanceof LocalShardNotFound) {
+                                return ((LocalShardNotFound) in).getShardName();
+                            } else {
+                                throw noMatch();
+                            }
+                        }
+                    }.get(); // this extracts the received message
+
+                    assertEquals("inventory", out);
+
+                    expectNoMsg();
+                }
+            };
+        }};
+    }
+
+    @Test
+    public void testOnReceiveFindLocalShardForExistentShard() throws Exception {
+
+        final MockClusterWrapper mockClusterWrapper = new MockClusterWrapper();
+
+        new JavaTestKit(system) {{
+            final Props props = ShardManager
+                .props("config", mockClusterWrapper,
+                    new MockConfiguration());
+            final TestActorRef<ShardManager> subject =
+                TestActorRef.create(system, props);
+
+            new Within(duration("1 seconds")) {
+                protected void run() {
+
+                    subject.tell(new FindLocalShard(Shard.DEFAULT_NAME), getRef());
+
+                    final ActorRef out = new ExpectMsg<ActorRef>(duration("1 seconds"), "find local") {
+                        protected ActorRef match(Object in) {
+                            if (in instanceof LocalShardFound) {
+                                return ((LocalShardFound) in).getPath();
+                            } else {
+                                throw noMatch();
+                            }
+                        }
+                    }.get(); // this extracts the received message
+
+                    assertTrue(out.path().toString(), out.path().toString().contains("member-1-shard-default-config"));
+
+
+                    expectNoMsg();
+                }
+            };
+        }};
+    }
+
     @Test
     public void testOnReceiveMemberUp() throws Exception {
 
     @Test
     public void testOnReceiveMemberUp() throws Exception {
 
index 7d57ea8284e90dc3f941b7b82385e5db28ed75f4..38920d86ca36b8889276b6f9823381d0cc1a4239 100644 (file)
@@ -7,6 +7,7 @@ import org.junit.Test;
 import org.opendaylight.controller.cluster.datastore.messages.CreateTransaction;
 import org.opendaylight.controller.cluster.datastore.messages.CreateTransactionChain;
 import org.opendaylight.controller.cluster.datastore.messages.CreateTransactionChainReply;
 import org.opendaylight.controller.cluster.datastore.messages.CreateTransaction;
 import org.opendaylight.controller.cluster.datastore.messages.CreateTransactionChain;
 import org.opendaylight.controller.cluster.datastore.messages.CreateTransactionChainReply;
+import org.opendaylight.controller.cluster.datastore.messages.EnableNotification;
 import org.opendaylight.controller.cluster.datastore.messages.PeerAddressResolved;
 import org.opendaylight.controller.cluster.datastore.messages.RegisterChangeListener;
 import org.opendaylight.controller.cluster.datastore.messages.RegisterChangeListenerReply;
 import org.opendaylight.controller.cluster.datastore.messages.PeerAddressResolved;
 import org.opendaylight.controller.cluster.datastore.messages.RegisterChangeListener;
 import org.opendaylight.controller.cluster.datastore.messages.RegisterChangeListenerReply;
@@ -24,6 +25,7 @@ import java.util.Collections;
 import java.util.HashMap;
 import java.util.Map;
 
 import java.util.HashMap;
 import java.util.Map;
 
+import static junit.framework.Assert.assertFalse;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
 
@@ -92,6 +94,19 @@ public class ShardTest extends AbstractActorTest {
                         getRef().path(), AsyncDataBroker.DataChangeScope.BASE).toSerializable(),
                         getRef());
 
                         getRef().path(), AsyncDataBroker.DataChangeScope.BASE).toSerializable(),
                         getRef());
 
+                    final Boolean notificationEnabled = new ExpectMsg<Boolean>("enable notification") {
+                        // do not put code outside this method, will run afterwards
+                        protected Boolean match(Object in) {
+                            if(in instanceof EnableNotification){
+                                return ((EnableNotification) in).isEnabled();
+                            } else {
+                                throw noMatch();
+                            }
+                        }
+                    }.get(); // this extracts the received message
+
+                    assertFalse(notificationEnabled);
+
                     final String out = new ExpectMsg<String>("match hint") {
                         // do not put code outside this method, will run afterwards
                         protected String match(Object in) {
                     final String out = new ExpectMsg<String>("match hint") {
                         // do not put code outside this method, will run afterwards
                         protected String match(Object in) {
@@ -108,8 +123,6 @@ public class ShardTest extends AbstractActorTest {
 
                     assertTrue(out.matches(
                         "akka:\\/\\/test\\/user\\/testRegisterChangeListener\\/\\$.*"));
 
                     assertTrue(out.matches(
                         "akka:\\/\\/test\\/user\\/testRegisterChangeListener\\/\\$.*"));
-                    // Will wait for the rest of the 3 seconds
-                    expectNoMsg();
                 }
 
 
                 }
 
 
index 3dd0214e9b213c49a4821c303362f13eebe11371..5874eccda40f4f2d08f6b829757206213a743696 100644 (file)
@@ -2,12 +2,20 @@ package org.opendaylight.controller.cluster.datastore.utils;
 
 import akka.actor.ActorRef;
 import akka.actor.ActorSystem;
 
 import akka.actor.ActorRef;
 import akka.actor.ActorSystem;
+import akka.actor.Props;
+import akka.actor.UntypedActor;
+import akka.japi.Creator;
+import akka.testkit.JavaTestKit;
 import org.junit.Test;
 import org.opendaylight.controller.cluster.datastore.AbstractActorTest;
 import org.opendaylight.controller.cluster.datastore.ClusterWrapper;
 import org.opendaylight.controller.cluster.datastore.Configuration;
 import org.junit.Test;
 import org.opendaylight.controller.cluster.datastore.AbstractActorTest;
 import org.opendaylight.controller.cluster.datastore.ClusterWrapper;
 import org.opendaylight.controller.cluster.datastore.Configuration;
+import org.opendaylight.controller.cluster.datastore.messages.FindLocalShard;
+import org.opendaylight.controller.cluster.datastore.messages.LocalShardFound;
+import org.opendaylight.controller.cluster.datastore.messages.LocalShardNotFound;
 
 import static org.junit.Assert.assertEquals;
 
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
 import static org.mockito.Mockito.mock;
 
 public class ActorContextTest extends AbstractActorTest{
 import static org.mockito.Mockito.mock;
 
 public class ActorContextTest extends AbstractActorTest{
@@ -44,4 +52,145 @@ public class ActorContextTest extends AbstractActorTest{
         System.out.println(actorContext
             .actorFor("akka://system/user/shardmanager/shard/transaction"));
     }
         System.out.println(actorContext
             .actorFor("akka://system/user/shardmanager/shard/transaction"));
     }
+
+
+    private static class MockShardManager extends UntypedActor {
+
+        private final boolean found;
+        private final ActorRef actorRef;
+
+        private MockShardManager(boolean found, ActorRef actorRef){
+
+            this.found = found;
+            this.actorRef = actorRef;
+        }
+
+        @Override public void onReceive(Object message) throws Exception {
+            if(found){
+                getSender().tell(new LocalShardFound(actorRef), getSelf());
+            } else {
+                getSender().tell(new LocalShardNotFound(((FindLocalShard) message).getShardName()), getSelf());
+            }
+        }
+
+        private static Props props(final boolean found, final ActorRef actorRef){
+            return Props.create(new Creator<MockShardManager>() {
+
+                @Override public MockShardManager create()
+                    throws Exception {
+                    return new MockShardManager(found,
+                        actorRef);
+                }
+            });
+        }
+    }
+
+    @Test
+    public void testExecuteLocalShardOperationWithShardFound(){
+        new JavaTestKit(getSystem()) {{
+
+            new Within(duration("1 seconds")) {
+                protected void run() {
+
+                    ActorRef shardActorRef = getSystem().actorOf(Props.create(EchoActor.class));
+
+                    ActorRef shardManagerActorRef = getSystem()
+                        .actorOf(MockShardManager.props(true, shardActorRef));
+
+                    ActorContext actorContext =
+                        new ActorContext(getSystem(), shardManagerActorRef , mock(ClusterWrapper.class),
+                            mock(Configuration.class));
+
+                    Object out = actorContext.executeLocalShardOperation("default", "hello", duration("1 seconds"));
+
+                    assertEquals("hello", out);
+
+
+                    expectNoMsg();
+                }
+            };
+        }};
+
+    }
+
+    @Test
+    public void testExecuteLocalShardOperationWithShardNotFound(){
+        new JavaTestKit(getSystem()) {{
+
+            new Within(duration("1 seconds")) {
+                protected void run() {
+
+                    ActorRef shardManagerActorRef = getSystem()
+                        .actorOf(MockShardManager.props(false, null));
+
+                    ActorContext actorContext =
+                        new ActorContext(getSystem(), shardManagerActorRef , mock(ClusterWrapper.class),
+                            mock(Configuration.class));
+
+                    Object out = actorContext.executeLocalShardOperation("default", "hello", duration("1 seconds"));
+
+                    assertNull(out);
+
+
+                    expectNoMsg();
+                }
+            };
+        }};
+
+    }
+
+
+    @Test
+    public void testFindLocalShardWithShardFound(){
+        new JavaTestKit(getSystem()) {{
+
+            new Within(duration("1 seconds")) {
+                protected void run() {
+
+                    ActorRef shardActorRef = getSystem().actorOf(Props.create(EchoActor.class));
+
+                    ActorRef shardManagerActorRef = getSystem()
+                        .actorOf(MockShardManager.props(true, shardActorRef));
+
+                    ActorContext actorContext =
+                        new ActorContext(getSystem(), shardManagerActorRef , mock(ClusterWrapper.class),
+                            mock(Configuration.class));
+
+                    Object out = actorContext.findLocalShard("default");
+
+                    assertEquals(shardActorRef, out);
+
+
+                    expectNoMsg();
+                }
+            };
+        }};
+
+    }
+
+    @Test
+    public void testFindLocalShardWithShardNotFound(){
+        new JavaTestKit(getSystem()) {{
+
+            new Within(duration("1 seconds")) {
+                protected void run() {
+
+                    ActorRef shardManagerActorRef = getSystem()
+                        .actorOf(MockShardManager.props(false, null));
+
+                    ActorContext actorContext =
+                        new ActorContext(getSystem(), shardManagerActorRef , mock(ClusterWrapper.class),
+                            mock(Configuration.class));
+
+                    Object out = actorContext.findLocalShard("default");
+
+                    assertNull(out);
+
+
+                    expectNoMsg();
+                }
+            };
+        }};
+
+    }
 }
 }
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/utils/EchoActor.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/utils/EchoActor.java
new file mode 100644 (file)
index 0000000..fe88afe
--- /dev/null
@@ -0,0 +1,21 @@
+/*
+ * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.cluster.datastore.utils;
+
+import akka.actor.UntypedActor;
+
+/**
+ * The EchoActor simply responds back with the same message that it receives
+ */
+public class EchoActor extends UntypedActor{
+
+    @Override public void onReceive(Object message) throws Exception {
+        getSender().tell(message, getSelf());
+    }
+}
index 1d1e6614886e32d5593112375cbefaab0a9c1507..5d3853f311880d791fc177c9a1082f026fb1c5de 100644 (file)
@@ -19,6 +19,7 @@ public class MockActorContext extends ActorContext {
     private Object executeShardOperationResponse;
     private Object executeRemoteOperationResponse;
     private Object executeLocalOperationResponse;
     private Object executeShardOperationResponse;
     private Object executeRemoteOperationResponse;
     private Object executeLocalOperationResponse;
+    private Object executeLocalShardOperationResponse;
 
     public MockActorContext(ActorSystem actorSystem) {
         super(actorSystem, null, new MockClusterWrapper(), new MockConfiguration());
 
     public MockActorContext(ActorSystem actorSystem) {
         super(actorSystem, null, new MockClusterWrapper(), new MockConfiguration());
@@ -56,8 +57,18 @@ public class MockActorContext extends ActorContext {
         this.executeLocalOperationResponse = executeLocalOperationResponse;
     }
 
         this.executeLocalOperationResponse = executeLocalOperationResponse;
     }
 
+    public void setExecuteLocalShardOperationResponse(
+        Object executeLocalShardOperationResponse) {
+        this.executeLocalShardOperationResponse = executeLocalShardOperationResponse;
+    }
+
     @Override public Object executeLocalOperation(ActorRef actor,
         Object message, FiniteDuration duration) {
         return this.executeLocalOperationResponse;
     }
     @Override public Object executeLocalOperation(ActorRef actor,
         Object message, FiniteDuration duration) {
         return this.executeLocalOperationResponse;
     }
+
+    @Override public Object executeLocalShardOperation(String shardName,
+        Object message, FiniteDuration duration) {
+        return this.executeLocalShardOperationResponse;
+    }
 }
 }