Merge changes I0df1f18f,I4f635329
authorEd Warnicke <eaw@cisco.com>
Mon, 23 Jun 2014 11:24:02 +0000 (11:24 +0000)
committerGerrit Code Review <gerrit@opendaylight.org>
Mon, 23 Jun 2014 11:24:02 +0000 (11:24 +0000)
* changes:
  Introduce ShardStrategy and related code
  Initial implementation of ListenerRegistration actor and all related messages

opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ListenerRegistration.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/CloseListenerRegistration.java [new file with mode: 0644]
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/CloseListenerRegistrationReply.java [new file with mode: 0644]
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/shardstrategy/DefaultShardStrategy.java [new file with mode: 0644]
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/shardstrategy/ShardStrategy.java [new file with mode: 0644]
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/shardstrategy/ShardStrategyFactory.java [new file with mode: 0644]
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/AbstractActorTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ListenerRegistrationTest.java [new file with mode: 0644]
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/shardstrategy/DefaultShardStrategyTest.java [new file with mode: 0644]
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/shardstrategy/ShardStrategyFactoryTest.java [new file with mode: 0644]

index 922c1950b8095b6d622c6118d0bf27306af3e293..fda429f7544a1121a107fe5aff77bd4bde78f113 100644 (file)
@@ -11,6 +11,8 @@ package org.opendaylight.controller.cluster.datastore;
 import akka.actor.Props;
 import akka.actor.UntypedActor;
 import akka.japi.Creator;
+import org.opendaylight.controller.cluster.datastore.messages.CloseListenerRegistration;
+import org.opendaylight.controller.cluster.datastore.messages.CloseListenerRegistrationReply;
 import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeListener;
 import org.opendaylight.yangtools.yang.data.api.InstanceIdentifier;
 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
@@ -25,7 +27,9 @@ public class ListenerRegistration extends UntypedActor{
 
   @Override
   public void onReceive(Object message) throws Exception {
-    throw new UnsupportedOperationException("onReceive");
+    if(message instanceof CloseListenerRegistration){
+      closeListenerRegistration((CloseListenerRegistration) message);
+    }
   }
 
   public static Props props(final org.opendaylight.yangtools.concepts.ListenerRegistration<AsyncDataChangeListener<InstanceIdentifier, NormalizedNode<?, ?>>> registration){
@@ -37,4 +41,9 @@ public class ListenerRegistration extends UntypedActor{
       }
     });
   }
+
+  private void closeListenerRegistration(CloseListenerRegistration message){
+    registration.close();
+    getSender().tell(new CloseListenerRegistrationReply(), getSelf());
+  }
 }
index f59e05ae9935f50f58a9481a46f2e8bfc2820baf..8365328669587b5e083c8e53ca819eb63857cd40 100644 (file)
@@ -31,6 +31,7 @@ import java.util.concurrent.Executors;
  * A Shard represents a portion of the logical data tree
  * <p/>
  * Our Shard uses InMemoryDataStore as it's internal representation and delegates all requests it
+ *
  */
 public class Shard extends UntypedProcessor {
 
@@ -47,15 +48,18 @@ public class Shard extends UntypedProcessor {
     } else if(message instanceof RegisterChangeListener){
       registerChangeListener((RegisterChangeListener) message);
     } else if(message instanceof UpdateSchemaContext){
-      store.onGlobalContextUpdated(((UpdateSchemaContext) message).getSchemaContext());
+      updateSchemaContext((UpdateSchemaContext) message);
     }
   }
 
+  private void updateSchemaContext(UpdateSchemaContext message) {
+    store.onGlobalContextUpdated(message.getSchemaContext());
+  }
+
   private void registerChangeListener(RegisterChangeListener registerChangeListener) {
     org.opendaylight.yangtools.concepts.ListenerRegistration<AsyncDataChangeListener<InstanceIdentifier, NormalizedNode<?, ?>>> registration =
             store.registerChangeListener(registerChangeListener.getPath(), registerChangeListener.getListener(), registerChangeListener.getScope());
-    // TODO: Construct a ListenerRegistration actor with the actual registration returned when registering a listener with the datastore
-    ActorRef listenerRegistration = getContext().actorOf(ListenerRegistration.props(null));
+    ActorRef listenerRegistration = getContext().actorOf(ListenerRegistration.props(registration));
     getSender().tell(new RegisterChangeListenerReply(listenerRegistration.path()), getSelf());
   }
 
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/CloseListenerRegistration.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/CloseListenerRegistration.java
new file mode 100644 (file)
index 0000000..d55ad28
--- /dev/null
@@ -0,0 +1,12 @@
+/*
+ * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.cluster.datastore.messages;
+
+public class CloseListenerRegistration {
+}
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/CloseListenerRegistrationReply.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/CloseListenerRegistrationReply.java
new file mode 100644 (file)
index 0000000..e195e4b
--- /dev/null
@@ -0,0 +1,12 @@
+/*
+ * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.cluster.datastore.messages;
+
+public class CloseListenerRegistrationReply {
+}
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/shardstrategy/DefaultShardStrategy.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/shardstrategy/DefaultShardStrategy.java
new file mode 100644 (file)
index 0000000..a8ab5c4
--- /dev/null
@@ -0,0 +1,28 @@
+/*
+ * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.cluster.datastore.shardstrategy;
+
+import org.opendaylight.yangtools.yang.data.api.InstanceIdentifier;
+
+/**
+ * The DefaultShardStrategy basically puts all data into the default Shard
+ * <p>
+ *   The default shard stores data for all modules for which a specific set of shards has not been configured
+ * </p>
+ */
+public class DefaultShardStrategy implements ShardStrategy{
+
+  public static final String NAME = "default";
+  public static final String DEFAULT_SHARD = "default";
+
+  @Override
+  public String findShard(InstanceIdentifier path) {
+    return DEFAULT_SHARD;
+  }
+}
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/shardstrategy/ShardStrategy.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/shardstrategy/ShardStrategy.java
new file mode 100644 (file)
index 0000000..f75eb2d
--- /dev/null
@@ -0,0 +1,24 @@
+/*
+ * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.cluster.datastore.shardstrategy;
+
+import org.opendaylight.yangtools.yang.data.api.InstanceIdentifier;
+
+/**
+ * The role of ShardStrategy is to figure out which Shards a given piece of data belongs to
+ */
+public interface ShardStrategy {
+  /**
+   * Find the name of the shard in which the data pointed to by the specified path belongs in
+   *
+   * @param path The location of the data in the logical tree
+   * @return
+   */
+  String findShard(InstanceIdentifier path);
+}
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/shardstrategy/ShardStrategyFactory.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/shardstrategy/ShardStrategyFactory.java
new file mode 100644 (file)
index 0000000..2105379
--- /dev/null
@@ -0,0 +1,48 @@
+/*
+ * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.cluster.datastore.shardstrategy;
+
+import com.google.common.base.Preconditions;
+import org.opendaylight.yangtools.yang.data.api.InstanceIdentifier;
+
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+public class ShardStrategyFactory {
+  private static final Map<String, ShardStrategy> moduleNameToStrategyMap = new ConcurrentHashMap();
+
+  private static final String UNKNOWN_MODULE_NAME = "unknown";
+
+  public static ShardStrategy getStrategy(InstanceIdentifier path){
+    Preconditions.checkNotNull(path, "path should not be null");
+
+    String moduleName = getModuleName(path);
+    ShardStrategy shardStrategy = moduleNameToStrategyMap.get(moduleName);
+    if(shardStrategy == null){
+      return new DefaultShardStrategy();
+    }
+
+    return shardStrategy;
+  }
+
+
+  private static String getModuleName(InstanceIdentifier path){
+    return UNKNOWN_MODULE_NAME;
+  }
+
+  /**
+   * This is to be used in the future to register a custom shard strategy
+   *
+   * @param moduleName
+   * @param shardStrategy
+   */
+  public static void registerShardStrategy(String moduleName, ShardStrategy shardStrategy){
+    throw new UnsupportedOperationException("registering a custom shard strategy not supported yet");
+  }
+}
index ed928ec29c1e61b4be2daf1aa11db68c3b4bd512..2fe7b69cc9b349548f9b44ea36f5d9a9adf1bc2c 100644 (file)
@@ -13,7 +13,7 @@ import akka.testkit.JavaTestKit;
 import org.junit.AfterClass;
 import org.junit.BeforeClass;
 
-public class AbstractActorTest {
+public abstract class AbstractActorTest {
   private static ActorSystem system;
 
   @BeforeClass
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ListenerRegistrationTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ListenerRegistrationTest.java
new file mode 100644 (file)
index 0000000..0f155ef
--- /dev/null
@@ -0,0 +1,72 @@
+package org.opendaylight.controller.cluster.datastore;
+
+import akka.actor.ActorRef;
+import akka.actor.Props;
+import akka.testkit.JavaTestKit;
+import com.google.common.util.concurrent.ListeningExecutorService;
+import com.google.common.util.concurrent.MoreExecutors;
+import org.junit.Test;
+import org.opendaylight.controller.cluster.datastore.messages.CloseListenerRegistration;
+import org.opendaylight.controller.cluster.datastore.messages.CloseListenerRegistrationReply;
+import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
+import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker;
+import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeEvent;
+import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeListener;
+import org.opendaylight.controller.md.sal.dom.store.impl.InMemoryDOMDataStore;
+import org.opendaylight.yangtools.yang.data.api.InstanceIdentifier;
+import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
+
+import static org.junit.Assert.assertEquals;
+
+public class ListenerRegistrationTest extends AbstractActorTest {
+  private static ListeningExecutorService storeExecutor = MoreExecutors.listeningDecorator(MoreExecutors.sameThreadExecutor());
+
+  private static final InMemoryDOMDataStore store = new InMemoryDOMDataStore("OPER", storeExecutor);
+
+  static {
+    store.onGlobalContextUpdated(TestModel.createTestContext());
+  }
+
+
+  @Test
+  public void testOnReceiveCloseListenerRegistration() throws Exception {
+    new JavaTestKit(getSystem()) {{
+      final Props props = ListenerRegistration.props(store.registerChangeListener(TestModel.TEST_PATH, noOpDataChangeListener(), AsyncDataBroker.DataChangeScope.BASE));
+      final ActorRef subject = getSystem().actorOf(props, "testCloseListenerRegistration");
+
+      new Within(duration("1 seconds")) {
+        protected void run() {
+
+          subject.tell(new CloseListenerRegistration(), getRef());
+
+          final String out = new ExpectMsg<String>("match hint") {
+            // do not put code outside this method, will run afterwards
+            protected String match(Object in) {
+              if (in instanceof CloseListenerRegistrationReply) {
+                return "match";
+              } else {
+                throw noMatch();
+              }
+            }
+          }.get(); // this extracts the received message
+
+          assertEquals("match", out);
+
+          expectNoMsg();
+        }
+
+
+      };
+    }};
+  }
+
+  private  AsyncDataChangeListener<InstanceIdentifier, NormalizedNode<?, ?>> noOpDataChangeListener(){
+    return new AsyncDataChangeListener<InstanceIdentifier, NormalizedNode<?, ?>>() {
+      @Override
+      public void onDataChanged(AsyncDataChangeEvent<InstanceIdentifier, NormalizedNode<?, ?>> change) {
+
+      }
+    };
+  }
+
+}
\ No newline at end of file
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/shardstrategy/DefaultShardStrategyTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/shardstrategy/DefaultShardStrategyTest.java
new file mode 100644 (file)
index 0000000..d3ba9b1
--- /dev/null
@@ -0,0 +1,14 @@
+package org.opendaylight.controller.cluster.datastore.shardstrategy;
+
+import junit.framework.Assert;
+import org.junit.Test;
+import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
+
+public class DefaultShardStrategyTest {
+
+  @Test
+  public void testFindShard() throws Exception {
+    String shard = new DefaultShardStrategy().findShard(TestModel.TEST_PATH);
+    Assert.assertEquals(DefaultShardStrategy.DEFAULT_SHARD, shard);
+  }
+}
\ No newline at end of file
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/shardstrategy/ShardStrategyFactoryTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/shardstrategy/ShardStrategyFactoryTest.java
new file mode 100644 (file)
index 0000000..2cff981
--- /dev/null
@@ -0,0 +1,29 @@
+package org.opendaylight.controller.cluster.datastore.shardstrategy;
+
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
+
+import static junit.framework.Assert.assertNotNull;
+
+public class ShardStrategyFactoryTest {
+
+  @Rule
+  public ExpectedException expectedEx = ExpectedException.none();
+
+  @Test
+  public void testGetStrategy(){
+    ShardStrategy strategy = ShardStrategyFactory.getStrategy(TestModel.TEST_PATH);
+    assertNotNull(strategy);
+  }
+
+  @Test
+  public void testGetStrategyNullPointerExceptionWhenPathIsNull(){
+    expectedEx.expect(NullPointerException.class);
+    expectedEx.expectMessage("path should not be null");
+
+    ShardStrategyFactory.getStrategy(null);
+  }
+
+}
\ No newline at end of file