Complete implementation of DataChangeListenerRegistration and related classes 17/8317/5
authorMoiz Raja <moraja@cisco.com>
Wed, 25 Jun 2014 00:58:34 +0000 (17:58 -0700)
committerMoiz Raja <moraja@cisco.com>
Tue, 1 Jul 2014 17:49:30 +0000 (10:49 -0700)
Change-Id: If1bb663145fd6dbbb2123d014c1ea91f65f8e028
Signed-off-by: Moiz Raja <moraja@cisco.com>
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DataChangeListenerRegistration.java [moved from opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ListenerRegistration.java with 60% similarity]
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DataChangeListenerRegistrationProxy.java [new file with mode: 0644]
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DistributedDataStore.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ListenerRegistrationProxy.java [deleted file]
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/CloseDataChangeListenerRegistration.java [moved from opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/CloseListenerRegistration.java with 88% similarity]
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/CloseDataChangeListenerRegistrationReply.java [moved from opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/CloseListenerRegistrationReply.java with 87% similarity]
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DataChangeListenerRegistrationProxyTest.java [new file with mode: 0644]
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DataChangeListenerRegistrationTest.java [moved from opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ListenerRegistrationTest.java with 82% similarity]

@@ -11,39 +11,40 @@ package org.opendaylight.controller.cluster.datastore;
 import akka.actor.Props;
 import akka.actor.UntypedActor;
 import akka.japi.Creator;
 import akka.actor.Props;
 import akka.actor.UntypedActor;
 import akka.japi.Creator;
-import org.opendaylight.controller.cluster.datastore.messages.CloseListenerRegistration;
-import org.opendaylight.controller.cluster.datastore.messages.CloseListenerRegistrationReply;
+import org.opendaylight.controller.cluster.datastore.messages.CloseDataChangeListenerRegistration;
+import org.opendaylight.controller.cluster.datastore.messages.CloseDataChangeListenerRegistrationReply;
 import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeListener;
 import org.opendaylight.yangtools.yang.data.api.InstanceIdentifier;
 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
 
 import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeListener;
 import org.opendaylight.yangtools.yang.data.api.InstanceIdentifier;
 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
 
-public class ListenerRegistration extends UntypedActor{
+public class DataChangeListenerRegistration extends UntypedActor{
 
   private final org.opendaylight.yangtools.concepts.ListenerRegistration<AsyncDataChangeListener<InstanceIdentifier, NormalizedNode<?, ?>>> registration;
 
 
   private final org.opendaylight.yangtools.concepts.ListenerRegistration<AsyncDataChangeListener<InstanceIdentifier, NormalizedNode<?, ?>>> registration;
 
-  public ListenerRegistration(org.opendaylight.yangtools.concepts.ListenerRegistration<AsyncDataChangeListener<InstanceIdentifier, NormalizedNode<?, ?>>> registration) {
+  public DataChangeListenerRegistration(
+      org.opendaylight.yangtools.concepts.ListenerRegistration<AsyncDataChangeListener<InstanceIdentifier, NormalizedNode<?, ?>>> registration) {
     this.registration = registration;
   }
 
   @Override
   public void onReceive(Object message) throws Exception {
     this.registration = registration;
   }
 
   @Override
   public void onReceive(Object message) throws Exception {
-    if(message instanceof CloseListenerRegistration){
-      closeListenerRegistration((CloseListenerRegistration) message);
+    if(message instanceof CloseDataChangeListenerRegistration){
+      closeListenerRegistration((CloseDataChangeListenerRegistration) message);
     }
   }
 
   public static Props props(final org.opendaylight.yangtools.concepts.ListenerRegistration<AsyncDataChangeListener<InstanceIdentifier, NormalizedNode<?, ?>>> registration){
     }
   }
 
   public static Props props(final org.opendaylight.yangtools.concepts.ListenerRegistration<AsyncDataChangeListener<InstanceIdentifier, NormalizedNode<?, ?>>> registration){
-    return Props.create(new Creator<ListenerRegistration>(){
+    return Props.create(new Creator<DataChangeListenerRegistration>(){
 
       @Override
 
       @Override
-      public ListenerRegistration create() throws Exception {
-        return new ListenerRegistration(registration);
+      public DataChangeListenerRegistration create() throws Exception {
+        return new DataChangeListenerRegistration(registration);
       }
     });
   }
 
       }
     });
   }
 
-  private void closeListenerRegistration(CloseListenerRegistration message){
+  private void closeListenerRegistration(CloseDataChangeListenerRegistration message){
     registration.close();
     registration.close();
-    getSender().tell(new CloseListenerRegistrationReply(), getSelf());
+    getSender().tell(new CloseDataChangeListenerRegistrationReply(), getSelf());
   }
 }
   }
 }
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DataChangeListenerRegistrationProxy.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DataChangeListenerRegistrationProxy.java
new file mode 100644 (file)
index 0000000..89cc969
--- /dev/null
@@ -0,0 +1,46 @@
+/*
+ * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.cluster.datastore;
+
+import akka.actor.ActorSelection;
+import org.opendaylight.controller.cluster.datastore.messages.CloseDataChangeListenerRegistration;
+import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeListener;
+import org.opendaylight.yangtools.concepts.ListenerRegistration;
+import org.opendaylight.yangtools.yang.data.api.InstanceIdentifier;
+import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
+
+/**
+ * ListenerRegistrationProxy acts as a proxy for a ListenerRegistration that was done on a remote shard
+ * <p>
+ * Registering a DataChangeListener on the Data Store creates a new instance of the ListenerRegistrationProxy
+ * The ListenerRegistrationProxy talks to a remote ListenerRegistration actor.
+ * </p>
+ */
+public class DataChangeListenerRegistrationProxy implements ListenerRegistration {
+    private final ActorSelection listenerRegistrationActor;
+    private final AsyncDataChangeListener listener;
+
+    public <L extends AsyncDataChangeListener<InstanceIdentifier, NormalizedNode<?, ?>>>
+    DataChangeListenerRegistrationProxy(
+        ActorSelection listenerRegistrationActor,
+        L listener) {
+        this.listenerRegistrationActor = listenerRegistrationActor;
+        this.listener = listener;
+    }
+
+    @Override
+    public Object getInstance() {
+        return listener;
+    }
+
+    @Override
+    public void close() {
+        listenerRegistrationActor.tell(new CloseDataChangeListenerRegistration(), null);
+    }
+}
index f64c6f1a8669888726f30bfe4099aa628365ccbb..4964b92ab75a0d3d7f85c5404c22e0df7c80fbff 100644 (file)
@@ -56,15 +56,17 @@ public class DistributedDataStore implements DOMStore, SchemaContextListener {
         InstanceIdentifier path, L listener,
         AsyncDataBroker.DataChangeScope scope) {
 
         InstanceIdentifier path, L listener,
         AsyncDataBroker.DataChangeScope scope) {
 
-        ActorRef dataChangeListenerActor = actorContext.getActorSystem().actorOf(DataChangeListener.props());
+        ActorRef dataChangeListenerActor = actorContext.getActorSystem().actorOf(
+            DataChangeListener.props());
 
         Object result = actorContext.executeShardOperation(Shard.DEFAULT_NAME,
             new RegisterChangeListener(path, dataChangeListenerActor.path(),
                 AsyncDataBroker.DataChangeScope.BASE),
 
         Object result = actorContext.executeShardOperation(Shard.DEFAULT_NAME,
             new RegisterChangeListener(path, dataChangeListenerActor.path(),
                 AsyncDataBroker.DataChangeScope.BASE),
-            ActorContext.ASK_DURATION);
+            ActorContext.ASK_DURATION
+        );
 
         RegisterChangeListenerReply reply = (RegisterChangeListenerReply) result;
 
         RegisterChangeListenerReply reply = (RegisterChangeListenerReply) result;
-        return new ListenerRegistrationProxy(reply.getListenerRegistrationPath());
+        return new DataChangeListenerRegistrationProxy(actorContext.actorSelection(reply.getListenerRegistrationPath()), listener);
     }
 
 
     }
 
 
@@ -90,6 +92,7 @@ public class DistributedDataStore implements DOMStore, SchemaContextListener {
     }
 
     @Override public void onGlobalContextUpdated(SchemaContext schemaContext) {
     }
 
     @Override public void onGlobalContextUpdated(SchemaContext schemaContext) {
-        actorContext.getShardManager().tell(new UpdateSchemaContext(schemaContext), null);
+        actorContext.getShardManager().tell(
+            new UpdateSchemaContext(schemaContext), null);
     }
 }
     }
 }
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ListenerRegistrationProxy.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ListenerRegistrationProxy.java
deleted file mode 100644 (file)
index a548a88..0000000
+++ /dev/null
@@ -1,37 +0,0 @@
-/*
- * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
- *
- * This program and the accompanying materials are made available under the
- * terms of the Eclipse Public License v1.0 which accompanies this distribution,
- * and is available at http://www.eclipse.org/legal/epl-v10.html
- */
-
-package org.opendaylight.controller.cluster.datastore;
-
-import akka.actor.ActorPath;
-import org.opendaylight.yangtools.concepts.ListenerRegistration;
-
-/**
- * ListenerRegistrationProxy acts as a proxy for a ListenerRegistration that was done on a remote shard
- *
- * Registering a DataChangeListener on the Data Store creates a new instance of the ListenerRegistrationProxy
- * The ListenerRegistrationProxy talks to a remote ListenerRegistration actor.
- */
-public class ListenerRegistrationProxy implements ListenerRegistration {
-    private final ActorPath listenerRegistrationPath;
-
-    public ListenerRegistrationProxy(ActorPath listenerRegistrationPath) {
-
-        this.listenerRegistrationPath = listenerRegistrationPath;
-    }
-
-    @Override
-    public Object getInstance() {
-        throw new UnsupportedOperationException("getInstance");
-    }
-
-    @Override
-    public void close() {
-        throw new UnsupportedOperationException("close");
-    }
-}
index 2a1e70b4ce639d71d1cd48df854f93afbd3f256a..b4ad089027c6ea25ae9cada361fc5bf36eb704aa 100644 (file)
@@ -141,7 +141,8 @@ public class Shard extends UntypedProcessor {
             store.registerChangeListener(registerChangeListener.getPath(),
                 listener, registerChangeListener.getScope());
         ActorRef listenerRegistration =
             store.registerChangeListener(registerChangeListener.getPath(),
                 listener, registerChangeListener.getScope());
         ActorRef listenerRegistration =
-            getContext().actorOf(ListenerRegistration.props(registration));
+            getContext().actorOf(
+                DataChangeListenerRegistration.props(registration));
         getSender()
             .tell(new RegisterChangeListenerReply(listenerRegistration.path()),
                 getSelf());
         getSender()
             .tell(new RegisterChangeListenerReply(listenerRegistration.path()),
                 getSelf());
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DataChangeListenerRegistrationProxyTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DataChangeListenerRegistrationProxyTest.java
new file mode 100644 (file)
index 0000000..be7be17
--- /dev/null
@@ -0,0 +1,74 @@
+package org.opendaylight.controller.cluster.datastore;
+
+import akka.actor.ActorRef;
+import akka.actor.Props;
+import junit.framework.Assert;
+import org.junit.Test;
+import org.opendaylight.controller.cluster.datastore.messages.CloseDataChangeListenerRegistration;
+import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
+import org.opendaylight.controller.cluster.datastore.utils.DoNothingActor;
+import org.opendaylight.controller.cluster.datastore.utils.MessageCollectorActor;
+import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeEvent;
+import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeListener;
+import org.opendaylight.yangtools.yang.data.api.InstanceIdentifier;
+import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
+
+import java.util.List;
+
+public class DataChangeListenerRegistrationProxyTest extends AbstractActorTest{
+
+    private static class MockDataChangeListener implements
+        AsyncDataChangeListener<InstanceIdentifier, NormalizedNode<?, ?>> {
+
+        @Override public void onDataChanged(
+            AsyncDataChangeEvent<InstanceIdentifier, NormalizedNode<?, ?>> change) {
+            throw new UnsupportedOperationException("onDataChanged");
+        }
+    }
+
+    @Test
+    public void testGetInstance() throws Exception {
+        final Props props = Props.create(MessageCollectorActor.class);
+        final ActorRef actorRef = getSystem().actorOf(props);
+
+        MockDataChangeListener listener =
+            new MockDataChangeListener();
+        DataChangeListenerRegistrationProxy proxy =
+            new DataChangeListenerRegistrationProxy(
+                getSystem().actorSelection(actorRef.path()),
+                listener);
+
+        Assert.assertEquals(listener, proxy.getInstance());
+
+    }
+
+    @Test
+    public void testClose() throws Exception {
+        final Props props = Props.create(MessageCollectorActor.class);
+        final ActorRef actorRef = getSystem().actorOf(props);
+
+        DataChangeListenerRegistrationProxy proxy =
+            new DataChangeListenerRegistrationProxy(
+                getSystem().actorSelection(actorRef.path()),
+                new MockDataChangeListener());
+
+        proxy.close();
+
+        //Check if it was received by the remote actor
+        ActorContext
+            testContext = new ActorContext(getSystem(), getSystem().actorOf(Props.create(DoNothingActor.class)));
+        Object messages = testContext
+            .executeLocalOperation(actorRef, "messages",
+                ActorContext.ASK_DURATION);
+
+        Assert.assertNotNull(messages);
+
+        Assert.assertTrue(messages instanceof List);
+
+        List<Object> listMessages = (List<Object>) messages;
+
+        Assert.assertEquals(1, listMessages.size());
+
+        Assert.assertTrue(listMessages.get(0) instanceof CloseDataChangeListenerRegistration);
+    }
+}
@@ -6,8 +6,8 @@ import akka.testkit.JavaTestKit;
 import com.google.common.util.concurrent.ListeningExecutorService;
 import com.google.common.util.concurrent.MoreExecutors;
 import org.junit.Test;
 import com.google.common.util.concurrent.ListeningExecutorService;
 import com.google.common.util.concurrent.MoreExecutors;
 import org.junit.Test;
-import org.opendaylight.controller.cluster.datastore.messages.CloseListenerRegistration;
-import org.opendaylight.controller.cluster.datastore.messages.CloseListenerRegistrationReply;
+import org.opendaylight.controller.cluster.datastore.messages.CloseDataChangeListenerRegistration;
+import org.opendaylight.controller.cluster.datastore.messages.CloseDataChangeListenerRegistrationReply;
 import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
 import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker;
 import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeEvent;
 import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
 import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker;
 import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeEvent;
@@ -18,7 +18,7 @@ import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
 
 import static org.junit.Assert.assertEquals;
 
 
 import static org.junit.Assert.assertEquals;
 
-public class ListenerRegistrationTest extends AbstractActorTest {
+public class DataChangeListenerRegistrationTest extends AbstractActorTest {
   private static ListeningExecutorService storeExecutor = MoreExecutors.listeningDecorator(MoreExecutors.sameThreadExecutor());
 
   private static final InMemoryDOMDataStore store = new InMemoryDOMDataStore("OPER", storeExecutor);
   private static ListeningExecutorService storeExecutor = MoreExecutors.listeningDecorator(MoreExecutors.sameThreadExecutor());
 
   private static final InMemoryDOMDataStore store = new InMemoryDOMDataStore("OPER", storeExecutor);
@@ -31,18 +31,20 @@ public class ListenerRegistrationTest extends AbstractActorTest {
   @Test
   public void testOnReceiveCloseListenerRegistration() throws Exception {
     new JavaTestKit(getSystem()) {{
   @Test
   public void testOnReceiveCloseListenerRegistration() throws Exception {
     new JavaTestKit(getSystem()) {{
-      final Props props = ListenerRegistration.props(store.registerChangeListener(TestModel.TEST_PATH, noOpDataChangeListener(), AsyncDataBroker.DataChangeScope.BASE));
+      final Props props = DataChangeListenerRegistration.props(store
+          .registerChangeListener(TestModel.TEST_PATH, noOpDataChangeListener(),
+              AsyncDataBroker.DataChangeScope.BASE));
       final ActorRef subject = getSystem().actorOf(props, "testCloseListenerRegistration");
 
       new Within(duration("1 seconds")) {
         protected void run() {
 
       final ActorRef subject = getSystem().actorOf(props, "testCloseListenerRegistration");
 
       new Within(duration("1 seconds")) {
         protected void run() {
 
-          subject.tell(new CloseListenerRegistration(), getRef());
+          subject.tell(new CloseDataChangeListenerRegistration(), getRef());
 
           final String out = new ExpectMsg<String>("match hint") {
             // do not put code outside this method, will run afterwards
             protected String match(Object in) {
 
           final String out = new ExpectMsg<String>("match hint") {
             // do not put code outside this method, will run afterwards
             protected String match(Object in) {
-              if (in instanceof CloseListenerRegistrationReply) {
+              if (in instanceof CloseDataChangeListenerRegistrationReply) {
                 return "match";
               } else {
                 throw noMatch();
                 return "match";
               } else {
                 throw noMatch();
@@ -69,4 +71,4 @@ public class ListenerRegistrationTest extends AbstractActorTest {
     };
   }
 
     };
   }
 
-}
\ No newline at end of file
+}