Merge "Implement finding a primary based on the shard name and do basic wiring of...
authorEd Warnicke <eaw@cisco.com>
Mon, 30 Jun 2014 16:36:12 +0000 (16:36 +0000)
committerGerrit Code Review <gerrit@opendaylight.org>
Mon, 30 Jun 2014 16:36:12 +0000 (16:36 +0000)
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DistributedDataStore.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardManager.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/PrimaryFound.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/config/yang/config/distributed_datastore_provider/DistributedDataStoreProviderModule.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/AbstractActorTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DistributedDataStoreTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardManagerTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTest.java

index 8d5b0c2..c87f1ab 100644 (file)
@@ -8,6 +8,8 @@
 
 package org.opendaylight.controller.cluster.datastore;
 
+import akka.actor.ActorRef;
+import akka.actor.ActorSystem;
 import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker;
 import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeListener;
 import org.opendaylight.controller.sal.core.spi.data.DOMStore;
@@ -23,29 +25,34 @@ import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
  *
  */
 public class DistributedDataStore implements DOMStore {
-
-    @Override
-    public <L extends AsyncDataChangeListener<InstanceIdentifier, NormalizedNode<?, ?>>> ListenerRegistration<L> registerChangeListener(InstanceIdentifier path, L listener, AsyncDataBroker.DataChangeScope scope) {
-        return new ListenerRegistrationProxy();
-    }
-
-    @Override
-    public DOMStoreTransactionChain createTransactionChain() {
-        return new TransactionChainProxy();
-    }
-
-    @Override
-    public DOMStoreReadTransaction newReadOnlyTransaction() {
-        return new TransactionProxy();
-    }
-
-    @Override
-    public DOMStoreWriteTransaction newWriteOnlyTransaction() {
-        return new TransactionProxy();
-    }
-
-    @Override
-    public DOMStoreReadWriteTransaction newReadWriteTransaction() {
-        return new TransactionProxy();
-    }
+  private final ActorRef shardManager;
+
+  public DistributedDataStore(ActorSystem actorSystem, String type) {
+    shardManager = actorSystem.actorOf(ShardManager.props(type));
+  }
+
+  @Override
+  public <L extends AsyncDataChangeListener<InstanceIdentifier, NormalizedNode<?, ?>>> ListenerRegistration<L> registerChangeListener(InstanceIdentifier path, L listener, AsyncDataBroker.DataChangeScope scope) {
+    return new ListenerRegistrationProxy();
+  }
+
+  @Override
+  public DOMStoreTransactionChain createTransactionChain() {
+    return new TransactionChainProxy();
+  }
+
+  @Override
+  public DOMStoreReadTransaction newReadOnlyTransaction() {
+    return new TransactionProxy();
+  }
+
+  @Override
+  public DOMStoreWriteTransaction newWriteOnlyTransaction() {
+    return new TransactionProxy();
+  }
+
+  @Override
+  public DOMStoreReadWriteTransaction newReadWriteTransaction() {
+    return new TransactionProxy();
+  }
 }
index 8365328..f96cb14 100644 (file)
@@ -9,8 +9,10 @@
 package org.opendaylight.controller.cluster.datastore;
 
 import akka.actor.ActorRef;
+import akka.actor.Props;
 import akka.event.Logging;
 import akka.event.LoggingAdapter;
+import akka.japi.Creator;
 import akka.persistence.UntypedProcessor;
 import com.google.common.util.concurrent.ListeningExecutorService;
 import com.google.common.util.concurrent.MoreExecutors;
@@ -28,26 +30,43 @@ import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
 import java.util.concurrent.Executors;
 
 /**
- * A Shard represents a portion of the logical data tree
- * <p/>
+ * A Shard represents a portion of the logical data tree <br/>
+ * <p>
  * Our Shard uses InMemoryDataStore as it's internal representation and delegates all requests it
- *
+ * </p>
  */
 public class Shard extends UntypedProcessor {
 
-  ListeningExecutorService storeExecutor = MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(2));
+  public static final String DEFAULT_NAME = "default";
+
+  private final ListeningExecutorService storeExecutor = MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(2));
+
+  private final InMemoryDOMDataStore store;
+
+  private final LoggingAdapter log = Logging.getLogger(getContext().system(), this);
 
-  private final InMemoryDOMDataStore store = new InMemoryDOMDataStore("OPER", storeExecutor);
+  private Shard(String name){
+    store = new InMemoryDOMDataStore(name, storeExecutor);
+  }
+
+  public static Props props(final String name) {
+    return Props.create(new Creator<Shard>() {
 
-  LoggingAdapter log = Logging.getLogger(getContext().system(), this);
+      @Override
+      public Shard create() throws Exception {
+        return new Shard(name);
+      }
+
+    });
+  }
 
   @Override
   public void onReceive(Object message) throws Exception {
     if (message instanceof CreateTransactionChain) {
       createTransactionChain();
-    } else if(message instanceof RegisterChangeListener){
+    } else if (message instanceof RegisterChangeListener) {
       registerChangeListener((RegisterChangeListener) message);
-    } else if(message instanceof UpdateSchemaContext){
+    } else if (message instanceof UpdateSchemaContext) {
       updateSchemaContext((UpdateSchemaContext) message);
     }
   }
index 63266d6..8d8527a 100644 (file)
@@ -8,11 +8,16 @@
 
 package org.opendaylight.controller.cluster.datastore;
 
+import akka.actor.ActorPath;
+import akka.actor.ActorRef;
 import akka.actor.Address;
+import akka.actor.Props;
 import akka.actor.UntypedActor;
 import akka.event.Logging;
 import akka.event.LoggingAdapter;
+import akka.japi.Creator;
 import org.opendaylight.controller.cluster.datastore.messages.FindPrimary;
+import org.opendaylight.controller.cluster.datastore.messages.PrimaryFound;
 import org.opendaylight.controller.cluster.datastore.messages.PrimaryNotFound;
 
 import java.util.HashMap;
@@ -21,41 +26,74 @@ import java.util.Map;
 
 /**
  * The ShardManager has the following jobs,
- *
- *  - Create all the local shard replicas that belong on this cluster member
- *  - Find the primary replica for any given shard
- *  - Engage in shard replica elections which decide which replica should be the primary
- *
- * Creation of Shard replicas
- * ==========================
- *  When the ShardManager is constructed it reads the cluster configuration to find out which shard replicas
- *  belong on this member. It finds out the name of the current cluster member from the Akka Clustering Service.
- *
- * Replica Elections
- * =================
- *  The Shard Manager uses multiple cues to initiate election.
- *      - When a member of the cluster dies
- *      - When a local shard replica dies
- *      - When a local shard replica comes alive
+ * <p>
+ * <li> Create all the local shard replicas that belong on this cluster member
+ * <li> Find the primary replica for any given shard
+ * <li> Engage in shard replica elections which decide which replica should be the primary
+ * </p>
+ * <p/>
+ * <h3>>Creation of Shard replicas</h3
+ * <p>
+ * When the ShardManager is constructed it reads the cluster configuration to find out which shard replicas
+ * belong on this member. It finds out the name of the current cluster member from the Akka Clustering Service.
+ * </p>
+ * <p/>
+ * <h3> Replica Elections </h3>
+ * <p/>
+ * <p>
+ * The Shard Manager uses multiple cues to initiate election.
+ * <li> When a member of the cluster dies
+ * <li> When a local shard replica dies
+ * <li> When a local shard replica comes alive
+ * </p>
  */
 public class ShardManager extends UntypedActor {
 
-    // Stores a mapping between a shard name and the address of the current primary
-    private final Map<String, Address> shardNameToPrimaryAddress = new HashMap<>();
+  // Stores a mapping between a shard name and the address of the current primary
+  private final Map<String, Address> shardNameToPrimaryAddress = new HashMap<>();
+
+  // Stores a mapping between a member name and the address of the member
+  private final Map<String, Address> memberNameToAddress = new HashMap<>();
+
+  // Stores a mapping between the shard name and all the members on which a replica of that shard are available
+  private final Map<String, List<String>> shardNameToMembers = new HashMap<>();
 
-    // Stores a mapping between a member name and the address of the member
-    private final Map<String, Address> memberNameToAddress = new HashMap<>();
+  private final LoggingAdapter log = Logging.getLogger(getContext().system(), this);
 
-    // Stores a mapping between the shard name and all the members on which a replica of that shard are available
-    private final Map<String, List<String>> shardNameToMembers = new HashMap<>();
+  private final ActorPath defaultShardPath;
 
-    LoggingAdapter log = Logging.getLogger(getContext().system(), this);
+  /**
+   *
+   * @param type defines the kind of data that goes into shards created by this shard manager. Examples of type would be
+   *             configuration or operational
+   */
+  private ShardManager(String type){
+    ActorRef actor = getContext().actorOf(Shard.props(Shard.DEFAULT_NAME + "-" + type));
+    defaultShardPath = actor.path();
+  }
 
-    @Override
-    public void onReceive(Object message) throws Exception {
-        if(message instanceof FindPrimary ){
-            FindPrimary msg = ((FindPrimary) message);
-            getSender().tell(new PrimaryNotFound(msg.getShardName()), getSelf());
-        }
+  public static Props props(final String type){
+    return Props.create(new Creator<ShardManager>(){
+
+      @Override
+      public ShardManager create() throws Exception {
+        return new ShardManager(type);
+      }
+    });
+  }
+
+  @Override
+  public void onReceive(Object message) throws Exception {
+    if (message instanceof FindPrimary) {
+      FindPrimary msg = ((FindPrimary) message);
+      String shardName = msg.getShardName();
+      if(Shard.DEFAULT_NAME.equals(shardName)){
+        getSender().tell(new PrimaryFound(defaultShardPath.toString()), getSelf());
+      } else {
+        getSender().tell(new PrimaryNotFound(shardName), getSelf());
+      }
     }
+  }
+
+
 }
index 1326898..d6aae37 100644 (file)
@@ -9,4 +9,39 @@
 package org.opendaylight.controller.cluster.datastore.messages;
 
 public class PrimaryFound {
+  private final String primaryPath;
+
+  public PrimaryFound(String primaryPath) {
+    this.primaryPath = primaryPath;
+  }
+
+  public String getPrimaryPath() {
+    return primaryPath;
+  }
+
+  @Override
+  public boolean equals(Object o) {
+    if (this == o) return true;
+    if (o == null || getClass() != o.getClass()) return false;
+
+    PrimaryFound that = (PrimaryFound) o;
+
+    if (!primaryPath.equals(that.primaryPath)) return false;
+
+    return true;
+  }
+
+  @Override
+  public int hashCode() {
+    return primaryPath.hashCode();
+  }
+
+  @Override
+  public String toString() {
+    return "PrimaryFound{" +
+            "primaryPath='" + primaryPath + '\'' +
+            '}';
+  }
+
+
 }
index 241bcb0..3a78f93 100644 (file)
@@ -1,34 +1,36 @@
 package org.opendaylight.controller.config.yang.config.distributed_datastore_provider;
 
+import akka.actor.ActorSystem;
 import org.opendaylight.controller.cluster.datastore.DistributedDataStore;
 
 public class DistributedDataStoreProviderModule extends org.opendaylight.controller.config.yang.config.distributed_datastore_provider.AbstractDistributedDataStoreProviderModule {
-    public DistributedDataStoreProviderModule(org.opendaylight.controller.config.api.ModuleIdentifier identifier, org.opendaylight.controller.config.api.DependencyResolver dependencyResolver) {
-        super(identifier, dependencyResolver);
+  public DistributedDataStoreProviderModule(org.opendaylight.controller.config.api.ModuleIdentifier identifier, org.opendaylight.controller.config.api.DependencyResolver dependencyResolver) {
+    super(identifier, dependencyResolver);
+  }
+
+  public DistributedDataStoreProviderModule(org.opendaylight.controller.config.api.ModuleIdentifier identifier, org.opendaylight.controller.config.api.DependencyResolver dependencyResolver, org.opendaylight.controller.config.yang.config.distributed_datastore_provider.DistributedDataStoreProviderModule oldModule, java.lang.AutoCloseable oldInstance) {
+    super(identifier, dependencyResolver, oldModule, oldInstance);
+  }
+
+  @Override
+  public void customValidation() {
+    // add custom validation form module attributes here.
+  }
+
+  @Override
+  public java.lang.AutoCloseable createInstance() {
+    ActorSystem actorSystem = ActorSystem.create("opendaylight-cluster");
+    final DistributedDataStore configurationStore = new DistributedDataStore(actorSystem, "config");
+    final DistributedDataStore operationalStore = new DistributedDataStore(actorSystem, "operational");
+
+    final class AutoCloseableDistributedDataStore implements AutoCloseable {
+
+      @Override
+      public void close() throws Exception {
+      }
     }
 
-    public DistributedDataStoreProviderModule(org.opendaylight.controller.config.api.ModuleIdentifier identifier, org.opendaylight.controller.config.api.DependencyResolver dependencyResolver, org.opendaylight.controller.config.yang.config.distributed_datastore_provider.DistributedDataStoreProviderModule oldModule, java.lang.AutoCloseable oldInstance) {
-        super(identifier, dependencyResolver, oldModule, oldInstance);
-    }
-
-    @Override
-    public void customValidation() {
-        // add custom validation form module attributes here.
-    }
-
-    @Override
-    public java.lang.AutoCloseable createInstance() {
-        new DistributedDataStore();
-
-        final class AutoCloseableDistributedDataStore implements AutoCloseable {
-
-            @Override
-            public void close() throws Exception {
-
-            }
-        }
-
-        return new AutoCloseableDistributedDataStore();
-    }
+    return new AutoCloseableDistributedDataStore();
+  }
 
 }
index 2fe7b69..45ef32f 100644 (file)
@@ -17,12 +17,12 @@ public abstract class AbstractActorTest {
   private static ActorSystem system;
 
   @BeforeClass
-  public static void setUp(){
+  public static void setUpClass(){
     system = ActorSystem.create("test");
   }
 
   @AfterClass
-  public static void tearDown(){
+  public static void tearDownClass(){
     JavaTestKit.shutdownActorSystem(system);
     system = null;
   }
index 6544f33..45492fd 100644 (file)
@@ -12,13 +12,13 @@ import org.opendaylight.yangtools.concepts.ListenerRegistration;
 import org.opendaylight.yangtools.yang.data.api.InstanceIdentifier;
 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
 
-public class DistributedDataStoreTest {
+public class DistributedDataStoreTest extends AbstractActorTest{
 
     private DistributedDataStore distributedDataStore;
 
     @org.junit.Before
     public void setUp() throws Exception {
-        distributedDataStore = new DistributedDataStore();
+        distributedDataStore = new DistributedDataStore(getSystem(), "config");
     }
 
     @org.junit.After
index 9c1ea70..fa436c1 100644 (file)
@@ -8,6 +8,7 @@ import org.junit.AfterClass;
 import org.junit.BeforeClass;
 import org.junit.Test;
 import org.opendaylight.controller.cluster.datastore.messages.FindPrimary;
+import org.opendaylight.controller.cluster.datastore.messages.PrimaryFound;
 import org.opendaylight.controller.cluster.datastore.messages.PrimaryNotFound;
 import scala.concurrent.duration.Duration;
 
@@ -26,17 +27,13 @@ public class ShardManagerTest {
     }
 
     @Test
-    public void testOnReceiveFindPrimary() throws Exception {
+    public void testOnReceiveFindPrimaryForNonExistentShard() throws Exception {
 
         new JavaTestKit(system) {{
-            final Props props = Props.create(ShardManager.class);
-            final TestActorRef<ShardManager> subject = TestActorRef.create(system, props, "test");
+            final Props props = ShardManager.props("config");
+            final TestActorRef<ShardManager> subject = TestActorRef.create(system, props);
 
-            // can also use JavaTestKit “from the outside”
-            final JavaTestKit probe = new JavaTestKit(system);
-
-            // the run() method needs to finish within 3 seconds
-            new Within(duration("3 seconds")) {
+            new Within(duration("1 seconds")) {
                 protected void run() {
 
                     subject.tell(new FindPrimary("inventory"), getRef());
@@ -49,4 +46,25 @@ public class ShardManagerTest {
             };
         }};
     }
+
+  @Test
+  public void testOnReceiveFindPrimaryForExistentShard() throws Exception {
+
+    new JavaTestKit(system) {{
+      final Props props = ShardManager.props("config");
+      final TestActorRef<ShardManager> subject = TestActorRef.create(system, props);
+
+      // the run() method needs to finish within 3 seconds
+      new Within(duration("1 seconds")) {
+        protected void run() {
+
+          subject.tell(new FindPrimary(Shard.DEFAULT_NAME), getRef());
+
+          expectMsgClass(PrimaryFound.class);
+
+          expectNoMsg();
+        }
+      };
+    }};
+  }
 }
\ No newline at end of file
index b5a341d..7f2a836 100644 (file)
@@ -22,7 +22,7 @@ public class ShardTest extends AbstractActorTest{
   @Test
   public void testOnReceiveCreateTransactionChain() throws Exception {
     new JavaTestKit(getSystem()) {{
-      final Props props = Props.create(Shard.class);
+      final Props props = Shard.props("config");
       final ActorRef subject = getSystem().actorOf(props, "testCreateTransactionChain");
 
       new Within(duration("1 seconds")) {
@@ -55,7 +55,7 @@ public class ShardTest extends AbstractActorTest{
   @Test
   public void testOnReceiveRegisterListener() throws Exception {
     new JavaTestKit(getSystem()) {{
-      final Props props = Props.create(Shard.class);
+      final Props props = Shard.props("config");
       final ActorRef subject = getSystem().actorOf(props, "testRegisterChangeListener");
 
       new Within(duration("1 seconds")) {

©2013 OpenDaylight, A Linux Foundation Collaborative Project. All Rights Reserved.
OpenDaylight is a registered trademark of The OpenDaylight Project, Inc.
Linux Foundation and OpenDaylight are registered trademarks of the Linux Foundation.
Linux is a registered trademark of Linus Torvalds.