Initial implementation of the Shard Actor 89/8089/3
authorMoiz Raja <moraja@cisco.com>
Tue, 17 Jun 2014 20:41:01 +0000 (13:41 -0700)
committerMoiz Raja <moraja@cisco.com>
Tue, 17 Jun 2014 21:18:42 +0000 (14:18 -0700)
Things to note,
- Added a temporary dependency on sal-broker-impl. This is so that I could use the InMemoryDOMDataStore.
  Once InMemoryDOMDataStore is moved to it's own bundle then I will simply switch the dependency
- Shard has been only implemented to the point where it is using an InMemoryDOMDataStore and handling the CreateTransaction and RegisterChangeListener messages
  This commit is intended to give a feel for what kind of coding patterns will be used to implement Shard and related actors and their tests

Change-Id: I86f0d701399805185a0987bb1b97fe1358ce4cd9
Signed-off-by: Moiz Raja <moraja@cisco.com>
opendaylight/md-sal/sal-distributed-datastore/pom.xml
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ListenerRegistration.java [new file with mode: 0644]
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java [new file with mode: 0644]
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionChain.java [new file with mode: 0644]
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/CreateTransactionChain.java [new file with mode: 0644]
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/CreateTransactionChainReply.java [new file with mode: 0644]
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/RegisterChangeListener.java [new file with mode: 0644]
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/RegisterChangeListenerReply.java [new file with mode: 0644]
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/AbstractActorTest.java [new file with mode: 0644]
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTest.java [new file with mode: 0644]

index cf28d067ab541da548abb71d39aa54d96ce4e34e..ea686d966ffa63b60e1cf3cb839db2f2eb034f1e 100644 (file)
       <artifactId>sal-binding-config</artifactId>
     </dependency>
 
+    <!--
+      Adding a temporary dependency on the sal-broker-impl so that we can use InMemoryDOMDataStore
+
+      InMemoryDOMDataStore needs to be moved into its own module and be wired up using config subsystem before
+      this bundle can use it
+    -->
+    <dependency>
+      <groupId>org.opendaylight.controller</groupId>
+      <artifactId>sal-broker-impl</artifactId>
+    </dependency>
+
     <dependency>
       <groupId>org.opendaylight.controller</groupId>
       <artifactId>sal-common-api</artifactId>
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ListenerRegistration.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ListenerRegistration.java
new file mode 100644 (file)
index 0000000..922c195
--- /dev/null
@@ -0,0 +1,40 @@
+/*
+ * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.cluster.datastore;
+
+import akka.actor.Props;
+import akka.actor.UntypedActor;
+import akka.japi.Creator;
+import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeListener;
+import org.opendaylight.yangtools.yang.data.api.InstanceIdentifier;
+import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
+
+public class ListenerRegistration extends UntypedActor{
+
+  private final org.opendaylight.yangtools.concepts.ListenerRegistration<AsyncDataChangeListener<InstanceIdentifier, NormalizedNode<?, ?>>> registration;
+
+  public ListenerRegistration(org.opendaylight.yangtools.concepts.ListenerRegistration<AsyncDataChangeListener<InstanceIdentifier, NormalizedNode<?, ?>>> registration) {
+    this.registration = registration;
+  }
+
+  @Override
+  public void onReceive(Object message) throws Exception {
+    throw new UnsupportedOperationException("onReceive");
+  }
+
+  public static Props props(final org.opendaylight.yangtools.concepts.ListenerRegistration<AsyncDataChangeListener<InstanceIdentifier, NormalizedNode<?, ?>>> registration){
+    return Props.create(new Creator<ListenerRegistration>(){
+
+      @Override
+      public ListenerRegistration create() throws Exception {
+        return new ListenerRegistration(registration);
+      }
+    });
+  }
+}
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java
new file mode 100644 (file)
index 0000000..cab35dd
--- /dev/null
@@ -0,0 +1,61 @@
+/*
+ * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.cluster.datastore;
+
+import akka.actor.ActorRef;
+import akka.event.Logging;
+import akka.event.LoggingAdapter;
+import akka.persistence.UntypedProcessor;
+import com.google.common.util.concurrent.ListeningExecutorService;
+import com.google.common.util.concurrent.MoreExecutors;
+import org.opendaylight.controller.cluster.datastore.messages.CreateTransactionChain;
+import org.opendaylight.controller.cluster.datastore.messages.CreateTransactionChainReply;
+import org.opendaylight.controller.cluster.datastore.messages.RegisterChangeListener;
+import org.opendaylight.controller.cluster.datastore.messages.RegisterChangeListenerReply;
+import org.opendaylight.controller.md.sal.dom.store.impl.InMemoryDOMDataStore;
+import org.opendaylight.controller.sal.core.spi.data.DOMStoreTransactionChain;
+
+import java.util.concurrent.Executors;
+
+/**
+ * A Shard represents a portion of the logical data tree
+ * <p/>
+ * Our Shard uses InMemoryDataStore as it's internal representation and delegates all requests it
+ */
+public class Shard extends UntypedProcessor {
+
+  ListeningExecutorService storeExecutor = MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(2));
+
+  private final InMemoryDOMDataStore store = new InMemoryDOMDataStore("OPER", storeExecutor);
+
+  LoggingAdapter log = Logging.getLogger(getContext().system(), this);
+
+  @Override
+  public void onReceive(Object message) throws Exception {
+    if (message instanceof CreateTransactionChain) {
+      createTransactionChain();
+    } else if(message instanceof RegisterChangeListener){
+      registerChangeListener((RegisterChangeListener) message);
+    }
+  }
+
+  private void registerChangeListener(RegisterChangeListener registerChangeListener) {
+//    org.opendaylight.yangtools.concepts.ListenerRegistration<AsyncDataChangeListener<InstanceIdentifier, NormalizedNode<?, ?>>> registration =
+//            store.registerChangeListener(registerChangeListener.getPath(), registerChangeListener.getListener(), registerChangeListener.getScope());
+    // TODO: Construct a ListenerRegistration actor with the actual registration returned when registering a listener with the datastore
+    ActorRef listenerRegistration = getContext().actorOf(ListenerRegistration.props(null));
+    getSender().tell(new RegisterChangeListenerReply(listenerRegistration.path()), getSelf());
+  }
+
+  private void createTransactionChain() {
+    DOMStoreTransactionChain chain = store.createTransactionChain();
+    ActorRef transactionChain = getContext().actorOf(TransactionChain.props(chain));
+    getSender().tell(new CreateTransactionChainReply(transactionChain.path()), getSelf());
+  }
+}
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionChain.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionChain.java
new file mode 100644 (file)
index 0000000..1cce09f
--- /dev/null
@@ -0,0 +1,38 @@
+/*
+ * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.cluster.datastore;
+
+import akka.actor.Props;
+import akka.actor.UntypedActor;
+import akka.japi.Creator;
+import org.opendaylight.controller.sal.core.spi.data.DOMStoreTransactionChain;
+
+public class TransactionChain extends UntypedActor{
+
+  private final DOMStoreTransactionChain chain;
+
+  public TransactionChain(DOMStoreTransactionChain chain) {
+    this.chain = chain;
+  }
+
+  @Override
+  public void onReceive(Object message) throws Exception {
+    throw new UnsupportedOperationException("onReceive");
+  }
+
+  public static Props props(final DOMStoreTransactionChain chain){
+    return Props.create(new Creator<TransactionChain>(){
+
+      @Override
+      public TransactionChain create() throws Exception {
+        return new TransactionChain(chain);
+      }
+    });
+  }
+}
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/CreateTransactionChain.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/CreateTransactionChain.java
new file mode 100644 (file)
index 0000000..cdad332
--- /dev/null
@@ -0,0 +1,13 @@
+/*
+ * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.cluster.datastore.messages;
+
+public class CreateTransactionChain {
+
+}
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/CreateTransactionChainReply.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/CreateTransactionChainReply.java
new file mode 100644 (file)
index 0000000..49dd9b6
--- /dev/null
@@ -0,0 +1,23 @@
+/*
+ * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.cluster.datastore.messages;
+
+import akka.actor.ActorPath;
+
+public class CreateTransactionChainReply {
+  private final ActorPath transactionChainPath;
+
+  public CreateTransactionChainReply(ActorPath transactionChainPath) {
+    this.transactionChainPath = transactionChainPath;
+  }
+
+  public ActorPath getTransactionChainPath() {
+    return transactionChainPath;
+  }
+}
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/RegisterChangeListener.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/RegisterChangeListener.java
new file mode 100644 (file)
index 0000000..0123a70
--- /dev/null
@@ -0,0 +1,39 @@
+/*
+ * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.cluster.datastore.messages;
+
+import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker;
+import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeListener;
+import org.opendaylight.yangtools.yang.data.api.InstanceIdentifier;
+import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
+
+public class RegisterChangeListener {
+  private final InstanceIdentifier path;
+  private final AsyncDataChangeListener<InstanceIdentifier, NormalizedNode<?, ?>> listener;
+  private final AsyncDataBroker.DataChangeScope scope;
+
+
+  public RegisterChangeListener(InstanceIdentifier path, AsyncDataChangeListener<InstanceIdentifier, NormalizedNode<?, ?>> listener, AsyncDataBroker.DataChangeScope scope) {
+    this.path = path;
+    this.listener = listener;
+    this.scope = scope;
+  }
+
+  public InstanceIdentifier getPath() {
+    return path;
+  }
+
+  public AsyncDataChangeListener<InstanceIdentifier, NormalizedNode<?, ?>> getListener() {
+    return listener;
+  }
+
+  public AsyncDataBroker.DataChangeScope getScope() {
+    return scope;
+  }
+}
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/RegisterChangeListenerReply.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/RegisterChangeListenerReply.java
new file mode 100644 (file)
index 0000000..ae8bbbd
--- /dev/null
@@ -0,0 +1,23 @@
+/*
+ * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.cluster.datastore.messages;
+
+import akka.actor.ActorPath;
+
+public class RegisterChangeListenerReply {
+  private final ActorPath listenerRegistrationPath;
+
+  public RegisterChangeListenerReply(ActorPath listenerRegistrationPath) {
+    this.listenerRegistrationPath = listenerRegistrationPath;
+  }
+
+  public ActorPath getListenerRegistrationPath() {
+    return listenerRegistrationPath;
+  }
+}
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/AbstractActorTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/AbstractActorTest.java
new file mode 100644 (file)
index 0000000..ed928ec
--- /dev/null
@@ -0,0 +1,34 @@
+/*
+ * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.cluster.datastore;
+
+import akka.actor.ActorSystem;
+import akka.testkit.JavaTestKit;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+
+public class AbstractActorTest {
+  private static ActorSystem system;
+
+  @BeforeClass
+  public static void setUp(){
+    system = ActorSystem.create("test");
+  }
+
+  @AfterClass
+  public static void tearDown(){
+    JavaTestKit.shutdownActorSystem(system);
+    system = null;
+  }
+
+  protected ActorSystem getSystem(){
+    return system;
+  }
+
+}
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTest.java
new file mode 100644 (file)
index 0000000..934c064
--- /dev/null
@@ -0,0 +1,94 @@
+package org.opendaylight.controller.cluster.datastore;
+
+import akka.actor.ActorRef;
+import akka.actor.Props;
+import akka.testkit.JavaTestKit;
+import org.junit.Test;
+import org.opendaylight.controller.cluster.datastore.messages.CreateTransactionChain;
+import org.opendaylight.controller.cluster.datastore.messages.CreateTransactionChainReply;
+import org.opendaylight.controller.cluster.datastore.messages.RegisterChangeListener;
+import org.opendaylight.controller.cluster.datastore.messages.RegisterChangeListenerReply;
+import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker;
+import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeEvent;
+import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeListener;
+import org.opendaylight.yangtools.yang.data.api.InstanceIdentifier;
+import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
+
+import static org.junit.Assert.assertTrue;
+
+public class ShardTest extends AbstractActorTest{
+  @Test
+  public void testOnReceiveCreateTransaction() throws Exception {
+    new JavaTestKit(getSystem()) {{
+      final Props props = Props.create(Shard.class);
+      final ActorRef subject = getSystem().actorOf(props, "testCreateTransaction");
+
+      new Within(duration("1 seconds")) {
+        protected void run() {
+
+          subject.tell(new CreateTransactionChain(), getRef());
+
+          final String out = new ExpectMsg<String>("match hint") {
+            // do not put code outside this method, will run afterwards
+            protected String match(Object in) {
+              if (in instanceof CreateTransactionChainReply) {
+                CreateTransactionChainReply reply = (CreateTransactionChainReply) in;
+                return reply.getTransactionChainPath().toString();
+              } else {
+                throw noMatch();
+              }
+            }
+          }.get(); // this extracts the received message
+
+          assertTrue(out.matches("akka:\\/\\/test\\/user\\/testCreateTransaction\\/\\$.*"));
+          // Will wait for the rest of the 3 seconds
+          expectNoMsg();
+        }
+
+
+      };
+    }};
+  }
+
+  @Test
+  public void testOnReceiveRegisterListener() throws Exception {
+    new JavaTestKit(getSystem()) {{
+      final Props props = Props.create(Shard.class);
+      final ActorRef subject = getSystem().actorOf(props, "testRegisterChangeListener");
+
+      new Within(duration("1 seconds")) {
+        protected void run() {
+
+          subject.tell(new RegisterChangeListener(InstanceIdentifier.builder().build(), noOpDataChangeListener() , AsyncDataBroker.DataChangeScope.BASE), getRef());
+
+          final String out = new ExpectMsg<String>("match hint") {
+            // do not put code outside this method, will run afterwards
+            protected String match(Object in) {
+              if (in instanceof RegisterChangeListenerReply) {
+                RegisterChangeListenerReply reply = (RegisterChangeListenerReply) in;
+                return reply.getListenerRegistrationPath().toString();
+              } else {
+                throw noMatch();
+              }
+            }
+          }.get(); // this extracts the received message
+
+          assertTrue(out.matches("akka:\\/\\/test\\/user\\/testRegisterChangeListener\\/\\$.*"));
+          // Will wait for the rest of the 3 seconds
+          expectNoMsg();
+        }
+
+
+      };
+    }};
+  }
+
+  private  AsyncDataChangeListener<InstanceIdentifier, NormalizedNode<?, ?>> noOpDataChangeListener(){
+    return new AsyncDataChangeListener<InstanceIdentifier, NormalizedNode<?, ?>>() {
+      @Override
+      public void onDataChanged(AsyncDataChangeEvent<InstanceIdentifier, NormalizedNode<?, ?>> change) {
+
+      }
+    };
+  }
+}
\ No newline at end of file