Initial implementation of ListenerRegistration actor and all related messages 03/8103/1
authorMoiz Raja <moraja@cisco.com>
Wed, 18 Jun 2014 01:22:35 +0000 (18:22 -0700)
committerMoiz Raja <moraja@cisco.com>
Wed, 18 Jun 2014 01:22:35 +0000 (18:22 -0700)
Change-Id: I4f63532983a2e272b610a61f093242e34867ed70
Signed-off-by: Moiz Raja <moraja@cisco.com>
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ListenerRegistration.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/CloseListenerRegistration.java [new file with mode: 0644]
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/CloseListenerRegistrationReply.java [new file with mode: 0644]
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/AbstractActorTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ListenerRegistrationTest.java [new file with mode: 0644]

index 922c1950b8095b6d622c6118d0bf27306af3e293..fda429f7544a1121a107fe5aff77bd4bde78f113 100644 (file)
@@ -11,6 +11,8 @@ package org.opendaylight.controller.cluster.datastore;
 import akka.actor.Props;
 import akka.actor.UntypedActor;
 import akka.japi.Creator;
+import org.opendaylight.controller.cluster.datastore.messages.CloseListenerRegistration;
+import org.opendaylight.controller.cluster.datastore.messages.CloseListenerRegistrationReply;
 import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeListener;
 import org.opendaylight.yangtools.yang.data.api.InstanceIdentifier;
 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
@@ -25,7 +27,9 @@ public class ListenerRegistration extends UntypedActor{
 
   @Override
   public void onReceive(Object message) throws Exception {
-    throw new UnsupportedOperationException("onReceive");
+    if(message instanceof CloseListenerRegistration){
+      closeListenerRegistration((CloseListenerRegistration) message);
+    }
   }
 
   public static Props props(final org.opendaylight.yangtools.concepts.ListenerRegistration<AsyncDataChangeListener<InstanceIdentifier, NormalizedNode<?, ?>>> registration){
@@ -37,4 +41,9 @@ public class ListenerRegistration extends UntypedActor{
       }
     });
   }
+
+  private void closeListenerRegistration(CloseListenerRegistration message){
+    registration.close();
+    getSender().tell(new CloseListenerRegistrationReply(), getSelf());
+  }
 }
index f59e05ae9935f50f58a9481a46f2e8bfc2820baf..8365328669587b5e083c8e53ca819eb63857cd40 100644 (file)
@@ -31,6 +31,7 @@ import java.util.concurrent.Executors;
  * A Shard represents a portion of the logical data tree
  * <p/>
  * Our Shard uses InMemoryDataStore as it's internal representation and delegates all requests it
+ *
  */
 public class Shard extends UntypedProcessor {
 
@@ -47,15 +48,18 @@ public class Shard extends UntypedProcessor {
     } else if(message instanceof RegisterChangeListener){
       registerChangeListener((RegisterChangeListener) message);
     } else if(message instanceof UpdateSchemaContext){
-      store.onGlobalContextUpdated(((UpdateSchemaContext) message).getSchemaContext());
+      updateSchemaContext((UpdateSchemaContext) message);
     }
   }
 
+  private void updateSchemaContext(UpdateSchemaContext message) {
+    store.onGlobalContextUpdated(message.getSchemaContext());
+  }
+
   private void registerChangeListener(RegisterChangeListener registerChangeListener) {
     org.opendaylight.yangtools.concepts.ListenerRegistration<AsyncDataChangeListener<InstanceIdentifier, NormalizedNode<?, ?>>> registration =
             store.registerChangeListener(registerChangeListener.getPath(), registerChangeListener.getListener(), registerChangeListener.getScope());
-    // TODO: Construct a ListenerRegistration actor with the actual registration returned when registering a listener with the datastore
-    ActorRef listenerRegistration = getContext().actorOf(ListenerRegistration.props(null));
+    ActorRef listenerRegistration = getContext().actorOf(ListenerRegistration.props(registration));
     getSender().tell(new RegisterChangeListenerReply(listenerRegistration.path()), getSelf());
   }
 
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/CloseListenerRegistration.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/CloseListenerRegistration.java
new file mode 100644 (file)
index 0000000..d55ad28
--- /dev/null
@@ -0,0 +1,12 @@
+/*
+ * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.cluster.datastore.messages;
+
+public class CloseListenerRegistration {
+}
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/CloseListenerRegistrationReply.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/CloseListenerRegistrationReply.java
new file mode 100644 (file)
index 0000000..e195e4b
--- /dev/null
@@ -0,0 +1,12 @@
+/*
+ * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.cluster.datastore.messages;
+
+public class CloseListenerRegistrationReply {
+}
index ed928ec29c1e61b4be2daf1aa11db68c3b4bd512..2fe7b69cc9b349548f9b44ea36f5d9a9adf1bc2c 100644 (file)
@@ -13,7 +13,7 @@ import akka.testkit.JavaTestKit;
 import org.junit.AfterClass;
 import org.junit.BeforeClass;
 
-public class AbstractActorTest {
+public abstract class AbstractActorTest {
   private static ActorSystem system;
 
   @BeforeClass
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ListenerRegistrationTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ListenerRegistrationTest.java
new file mode 100644 (file)
index 0000000..0f155ef
--- /dev/null
@@ -0,0 +1,72 @@
+package org.opendaylight.controller.cluster.datastore;
+
+import akka.actor.ActorRef;
+import akka.actor.Props;
+import akka.testkit.JavaTestKit;
+import com.google.common.util.concurrent.ListeningExecutorService;
+import com.google.common.util.concurrent.MoreExecutors;
+import org.junit.Test;
+import org.opendaylight.controller.cluster.datastore.messages.CloseListenerRegistration;
+import org.opendaylight.controller.cluster.datastore.messages.CloseListenerRegistrationReply;
+import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
+import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker;
+import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeEvent;
+import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeListener;
+import org.opendaylight.controller.md.sal.dom.store.impl.InMemoryDOMDataStore;
+import org.opendaylight.yangtools.yang.data.api.InstanceIdentifier;
+import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
+
+import static org.junit.Assert.assertEquals;
+
+public class ListenerRegistrationTest extends AbstractActorTest {
+  private static ListeningExecutorService storeExecutor = MoreExecutors.listeningDecorator(MoreExecutors.sameThreadExecutor());
+
+  private static final InMemoryDOMDataStore store = new InMemoryDOMDataStore("OPER", storeExecutor);
+
+  static {
+    store.onGlobalContextUpdated(TestModel.createTestContext());
+  }
+
+
+  @Test
+  public void testOnReceiveCloseListenerRegistration() throws Exception {
+    new JavaTestKit(getSystem()) {{
+      final Props props = ListenerRegistration.props(store.registerChangeListener(TestModel.TEST_PATH, noOpDataChangeListener(), AsyncDataBroker.DataChangeScope.BASE));
+      final ActorRef subject = getSystem().actorOf(props, "testCloseListenerRegistration");
+
+      new Within(duration("1 seconds")) {
+        protected void run() {
+
+          subject.tell(new CloseListenerRegistration(), getRef());
+
+          final String out = new ExpectMsg<String>("match hint") {
+            // do not put code outside this method, will run afterwards
+            protected String match(Object in) {
+              if (in instanceof CloseListenerRegistrationReply) {
+                return "match";
+              } else {
+                throw noMatch();
+              }
+            }
+          }.get(); // this extracts the received message
+
+          assertEquals("match", out);
+
+          expectNoMsg();
+        }
+
+
+      };
+    }};
+  }
+
+  private  AsyncDataChangeListener<InstanceIdentifier, NormalizedNode<?, ?>> noOpDataChangeListener(){
+    return new AsyncDataChangeListener<InstanceIdentifier, NormalizedNode<?, ?>>() {
+      @Override
+      public void onDataChanged(AsyncDataChangeEvent<InstanceIdentifier, NormalizedNode<?, ?>> change) {
+
+      }
+    };
+  }
+
+}
\ No newline at end of file