From e22159a0e53bf93a94908e2c3988ceeba9ed8183 Mon Sep 17 00:00:00 2001
From: Moiz Raja
Date: Thu, 19 Jun 2014 17:45:15 -0700
Subject: [PATCH] Implement finding a primary based on the shard name and do
basic wiring of DistributedDataStore and ShardManager
Change-Id: I98d22e68335a6f901e0584a1a497519e28c7d241
Signed-off-by: Moiz Raja
---
.../datastore/DistributedDataStore.java | 57 ++++++-----
.../controller/cluster/datastore/Shard.java | 35 +++++--
.../cluster/datastore/ShardManager.java | 96 +++++++++++++------
.../datastore/messages/PrimaryFound.java | 35 +++++++
.../DistributedDataStoreProviderModule.java | 52 +++++-----
.../cluster/datastore/AbstractActorTest.java | 4 +-
.../datastore/DistributedDataStoreTest.java | 4 +-
.../cluster/datastore/ShardManagerTest.java | 34 +++++--
.../cluster/datastore/ShardTest.java | 4 +-
9 files changed, 220 insertions(+), 101 deletions(-)
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DistributedDataStore.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DistributedDataStore.java
index 8d5b0c2f4a..c87f1abb21 100644
--- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DistributedDataStore.java
+++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DistributedDataStore.java
@@ -8,6 +8,8 @@
package org.opendaylight.controller.cluster.datastore;
+import akka.actor.ActorRef;
+import akka.actor.ActorSystem;
import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker;
import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeListener;
import org.opendaylight.controller.sal.core.spi.data.DOMStore;
@@ -23,29 +25,34 @@ import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
*
*/
public class DistributedDataStore implements DOMStore {
-
- @Override
- public >> ListenerRegistration registerChangeListener(InstanceIdentifier path, L listener, AsyncDataBroker.DataChangeScope scope) {
- return new ListenerRegistrationProxy();
- }
-
- @Override
- public DOMStoreTransactionChain createTransactionChain() {
- return new TransactionChainProxy();
- }
-
- @Override
- public DOMStoreReadTransaction newReadOnlyTransaction() {
- return new TransactionProxy();
- }
-
- @Override
- public DOMStoreWriteTransaction newWriteOnlyTransaction() {
- return new TransactionProxy();
- }
-
- @Override
- public DOMStoreReadWriteTransaction newReadWriteTransaction() {
- return new TransactionProxy();
- }
+ private final ActorRef shardManager;
+
+ public DistributedDataStore(ActorSystem actorSystem, String type) {
+ shardManager = actorSystem.actorOf(ShardManager.props(type));
+ }
+
+ @Override
+ public >> ListenerRegistration registerChangeListener(InstanceIdentifier path, L listener, AsyncDataBroker.DataChangeScope scope) {
+ return new ListenerRegistrationProxy();
+ }
+
+ @Override
+ public DOMStoreTransactionChain createTransactionChain() {
+ return new TransactionChainProxy();
+ }
+
+ @Override
+ public DOMStoreReadTransaction newReadOnlyTransaction() {
+ return new TransactionProxy();
+ }
+
+ @Override
+ public DOMStoreWriteTransaction newWriteOnlyTransaction() {
+ return new TransactionProxy();
+ }
+
+ @Override
+ public DOMStoreReadWriteTransaction newReadWriteTransaction() {
+ return new TransactionProxy();
+ }
}
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java
index 8365328669..f96cb14510 100644
--- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java
+++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java
@@ -9,8 +9,10 @@
package org.opendaylight.controller.cluster.datastore;
import akka.actor.ActorRef;
+import akka.actor.Props;
import akka.event.Logging;
import akka.event.LoggingAdapter;
+import akka.japi.Creator;
import akka.persistence.UntypedProcessor;
import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
@@ -28,26 +30,43 @@ import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
import java.util.concurrent.Executors;
/**
- * A Shard represents a portion of the logical data tree
- *
+ * A Shard represents a portion of the logical data tree
+ *
* Our Shard uses InMemoryDataStore as it's internal representation and delegates all requests it
- *
+ *
*/
public class Shard extends UntypedProcessor {
- ListeningExecutorService storeExecutor = MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(2));
+ public static final String DEFAULT_NAME = "default";
+
+ private final ListeningExecutorService storeExecutor = MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(2));
+
+ private final InMemoryDOMDataStore store;
+
+ private final LoggingAdapter log = Logging.getLogger(getContext().system(), this);
- private final InMemoryDOMDataStore store = new InMemoryDOMDataStore("OPER", storeExecutor);
+ private Shard(String name){
+ store = new InMemoryDOMDataStore(name, storeExecutor);
+ }
+
+ public static Props props(final String name) {
+ return Props.create(new Creator() {
- LoggingAdapter log = Logging.getLogger(getContext().system(), this);
+ @Override
+ public Shard create() throws Exception {
+ return new Shard(name);
+ }
+
+ });
+ }
@Override
public void onReceive(Object message) throws Exception {
if (message instanceof CreateTransactionChain) {
createTransactionChain();
- } else if(message instanceof RegisterChangeListener){
+ } else if (message instanceof RegisterChangeListener) {
registerChangeListener((RegisterChangeListener) message);
- } else if(message instanceof UpdateSchemaContext){
+ } else if (message instanceof UpdateSchemaContext) {
updateSchemaContext((UpdateSchemaContext) message);
}
}
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardManager.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardManager.java
index 63266d6308..8d8527a240 100644
--- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardManager.java
+++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardManager.java
@@ -8,11 +8,16 @@
package org.opendaylight.controller.cluster.datastore;
+import akka.actor.ActorPath;
+import akka.actor.ActorRef;
import akka.actor.Address;
+import akka.actor.Props;
import akka.actor.UntypedActor;
import akka.event.Logging;
import akka.event.LoggingAdapter;
+import akka.japi.Creator;
import org.opendaylight.controller.cluster.datastore.messages.FindPrimary;
+import org.opendaylight.controller.cluster.datastore.messages.PrimaryFound;
import org.opendaylight.controller.cluster.datastore.messages.PrimaryNotFound;
import java.util.HashMap;
@@ -21,41 +26,74 @@ import java.util.Map;
/**
* The ShardManager has the following jobs,
- *
- * - Create all the local shard replicas that belong on this cluster member
- * - Find the primary replica for any given shard
- * - Engage in shard replica elections which decide which replica should be the primary
- *
- * Creation of Shard replicas
- * ==========================
- * When the ShardManager is constructed it reads the cluster configuration to find out which shard replicas
- * belong on this member. It finds out the name of the current cluster member from the Akka Clustering Service.
- *
- * Replica Elections
- * =================
- * The Shard Manager uses multiple cues to initiate election.
- * - When a member of the cluster dies
- * - When a local shard replica dies
- * - When a local shard replica comes alive
+ *
+ *
Create all the local shard replicas that belong on this cluster member
+ * Find the primary replica for any given shard
+ * Engage in shard replica elections which decide which replica should be the primary
+ *
+ *
+ * >Creation of Shard replicas
+ * When the ShardManager is constructed it reads the cluster configuration to find out which shard replicas
+ * belong on this member. It finds out the name of the current cluster member from the Akka Clustering Service.
+ *
+ *
+ * Replica Elections
+ *
+ *
+ * The Shard Manager uses multiple cues to initiate election.
+ *
When a member of the cluster dies
+ * When a local shard replica dies
+ * When a local shard replica comes alive
+ *
*/
public class ShardManager extends UntypedActor {
- // Stores a mapping between a shard name and the address of the current primary
- private final Map shardNameToPrimaryAddress = new HashMap<>();
+ // Stores a mapping between a shard name and the address of the current primary
+ private final Map shardNameToPrimaryAddress = new HashMap<>();
+
+ // Stores a mapping between a member name and the address of the member
+ private final Map memberNameToAddress = new HashMap<>();
+
+ // Stores a mapping between the shard name and all the members on which a replica of that shard are available
+ private final Map> shardNameToMembers = new HashMap<>();
- // Stores a mapping between a member name and the address of the member
- private final Map memberNameToAddress = new HashMap<>();
+ private final LoggingAdapter log = Logging.getLogger(getContext().system(), this);
- // Stores a mapping between the shard name and all the members on which a replica of that shard are available
- private final Map> shardNameToMembers = new HashMap<>();
+ private final ActorPath defaultShardPath;
- LoggingAdapter log = Logging.getLogger(getContext().system(), this);
+ /**
+ *
+ * @param type defines the kind of data that goes into shards created by this shard manager. Examples of type would be
+ * configuration or operational
+ */
+ private ShardManager(String type){
+ ActorRef actor = getContext().actorOf(Shard.props(Shard.DEFAULT_NAME + "-" + type));
+ defaultShardPath = actor.path();
+ }
- @Override
- public void onReceive(Object message) throws Exception {
- if(message instanceof FindPrimary ){
- FindPrimary msg = ((FindPrimary) message);
- getSender().tell(new PrimaryNotFound(msg.getShardName()), getSelf());
- }
+ public static Props props(final String type){
+ return Props.create(new Creator(){
+
+ @Override
+ public ShardManager create() throws Exception {
+ return new ShardManager(type);
+ }
+ });
+ }
+
+ @Override
+ public void onReceive(Object message) throws Exception {
+ if (message instanceof FindPrimary) {
+ FindPrimary msg = ((FindPrimary) message);
+ String shardName = msg.getShardName();
+ if(Shard.DEFAULT_NAME.equals(shardName)){
+ getSender().tell(new PrimaryFound(defaultShardPath.toString()), getSelf());
+ } else {
+ getSender().tell(new PrimaryNotFound(shardName), getSelf());
+ }
}
+ }
+
+
}
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/PrimaryFound.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/PrimaryFound.java
index 1326898b0f..d6aae3786f 100644
--- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/PrimaryFound.java
+++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/PrimaryFound.java
@@ -9,4 +9,39 @@
package org.opendaylight.controller.cluster.datastore.messages;
public class PrimaryFound {
+ private final String primaryPath;
+
+ public PrimaryFound(String primaryPath) {
+ this.primaryPath = primaryPath;
+ }
+
+ public String getPrimaryPath() {
+ return primaryPath;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) return true;
+ if (o == null || getClass() != o.getClass()) return false;
+
+ PrimaryFound that = (PrimaryFound) o;
+
+ if (!primaryPath.equals(that.primaryPath)) return false;
+
+ return true;
+ }
+
+ @Override
+ public int hashCode() {
+ return primaryPath.hashCode();
+ }
+
+ @Override
+ public String toString() {
+ return "PrimaryFound{" +
+ "primaryPath='" + primaryPath + '\'' +
+ '}';
+ }
+
+
}
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/config/yang/config/distributed_datastore_provider/DistributedDataStoreProviderModule.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/config/yang/config/distributed_datastore_provider/DistributedDataStoreProviderModule.java
index 241bcb0a41..3a78f93d8d 100644
--- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/config/yang/config/distributed_datastore_provider/DistributedDataStoreProviderModule.java
+++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/config/yang/config/distributed_datastore_provider/DistributedDataStoreProviderModule.java
@@ -1,34 +1,36 @@
package org.opendaylight.controller.config.yang.config.distributed_datastore_provider;
+import akka.actor.ActorSystem;
import org.opendaylight.controller.cluster.datastore.DistributedDataStore;
public class DistributedDataStoreProviderModule extends org.opendaylight.controller.config.yang.config.distributed_datastore_provider.AbstractDistributedDataStoreProviderModule {
- public DistributedDataStoreProviderModule(org.opendaylight.controller.config.api.ModuleIdentifier identifier, org.opendaylight.controller.config.api.DependencyResolver dependencyResolver) {
- super(identifier, dependencyResolver);
+ public DistributedDataStoreProviderModule(org.opendaylight.controller.config.api.ModuleIdentifier identifier, org.opendaylight.controller.config.api.DependencyResolver dependencyResolver) {
+ super(identifier, dependencyResolver);
+ }
+
+ public DistributedDataStoreProviderModule(org.opendaylight.controller.config.api.ModuleIdentifier identifier, org.opendaylight.controller.config.api.DependencyResolver dependencyResolver, org.opendaylight.controller.config.yang.config.distributed_datastore_provider.DistributedDataStoreProviderModule oldModule, java.lang.AutoCloseable oldInstance) {
+ super(identifier, dependencyResolver, oldModule, oldInstance);
+ }
+
+ @Override
+ public void customValidation() {
+ // add custom validation form module attributes here.
+ }
+
+ @Override
+ public java.lang.AutoCloseable createInstance() {
+ ActorSystem actorSystem = ActorSystem.create("opendaylight-cluster");
+ final DistributedDataStore configurationStore = new DistributedDataStore(actorSystem, "config");
+ final DistributedDataStore operationalStore = new DistributedDataStore(actorSystem, "operational");
+
+ final class AutoCloseableDistributedDataStore implements AutoCloseable {
+
+ @Override
+ public void close() throws Exception {
+ }
}
- public DistributedDataStoreProviderModule(org.opendaylight.controller.config.api.ModuleIdentifier identifier, org.opendaylight.controller.config.api.DependencyResolver dependencyResolver, org.opendaylight.controller.config.yang.config.distributed_datastore_provider.DistributedDataStoreProviderModule oldModule, java.lang.AutoCloseable oldInstance) {
- super(identifier, dependencyResolver, oldModule, oldInstance);
- }
-
- @Override
- public void customValidation() {
- // add custom validation form module attributes here.
- }
-
- @Override
- public java.lang.AutoCloseable createInstance() {
- new DistributedDataStore();
-
- final class AutoCloseableDistributedDataStore implements AutoCloseable {
-
- @Override
- public void close() throws Exception {
-
- }
- }
-
- return new AutoCloseableDistributedDataStore();
- }
+ return new AutoCloseableDistributedDataStore();
+ }
}
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/AbstractActorTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/AbstractActorTest.java
index 2fe7b69cc9..45ef32f7ad 100644
--- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/AbstractActorTest.java
+++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/AbstractActorTest.java
@@ -17,12 +17,12 @@ public abstract class AbstractActorTest {
private static ActorSystem system;
@BeforeClass
- public static void setUp(){
+ public static void setUpClass(){
system = ActorSystem.create("test");
}
@AfterClass
- public static void tearDown(){
+ public static void tearDownClass(){
JavaTestKit.shutdownActorSystem(system);
system = null;
}
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DistributedDataStoreTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DistributedDataStoreTest.java
index 6544f33030..45492fd714 100644
--- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DistributedDataStoreTest.java
+++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DistributedDataStoreTest.java
@@ -12,13 +12,13 @@ import org.opendaylight.yangtools.concepts.ListenerRegistration;
import org.opendaylight.yangtools.yang.data.api.InstanceIdentifier;
import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
-public class DistributedDataStoreTest {
+public class DistributedDataStoreTest extends AbstractActorTest{
private DistributedDataStore distributedDataStore;
@org.junit.Before
public void setUp() throws Exception {
- distributedDataStore = new DistributedDataStore();
+ distributedDataStore = new DistributedDataStore(getSystem(), "config");
}
@org.junit.After
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardManagerTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardManagerTest.java
index 9c1ea70fdb..fa436c1605 100644
--- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardManagerTest.java
+++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardManagerTest.java
@@ -8,6 +8,7 @@ import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
import org.opendaylight.controller.cluster.datastore.messages.FindPrimary;
+import org.opendaylight.controller.cluster.datastore.messages.PrimaryFound;
import org.opendaylight.controller.cluster.datastore.messages.PrimaryNotFound;
import scala.concurrent.duration.Duration;
@@ -26,17 +27,13 @@ public class ShardManagerTest {
}
@Test
- public void testOnReceiveFindPrimary() throws Exception {
+ public void testOnReceiveFindPrimaryForNonExistentShard() throws Exception {
new JavaTestKit(system) {{
- final Props props = Props.create(ShardManager.class);
- final TestActorRef subject = TestActorRef.create(system, props, "test");
+ final Props props = ShardManager.props("config");
+ final TestActorRef subject = TestActorRef.create(system, props);
- // can also use JavaTestKit âfrom the outsideâ
- final JavaTestKit probe = new JavaTestKit(system);
-
- // the run() method needs to finish within 3 seconds
- new Within(duration("3 seconds")) {
+ new Within(duration("1 seconds")) {
protected void run() {
subject.tell(new FindPrimary("inventory"), getRef());
@@ -49,4 +46,25 @@ public class ShardManagerTest {
};
}};
}
+
+ @Test
+ public void testOnReceiveFindPrimaryForExistentShard() throws Exception {
+
+ new JavaTestKit(system) {{
+ final Props props = ShardManager.props("config");
+ final TestActorRef subject = TestActorRef.create(system, props);
+
+ // the run() method needs to finish within 3 seconds
+ new Within(duration("1 seconds")) {
+ protected void run() {
+
+ subject.tell(new FindPrimary(Shard.DEFAULT_NAME), getRef());
+
+ expectMsgClass(PrimaryFound.class);
+
+ expectNoMsg();
+ }
+ };
+ }};
+ }
}
\ No newline at end of file
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTest.java
index b5a341d95c..7f2a836b6f 100644
--- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTest.java
+++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTest.java
@@ -22,7 +22,7 @@ public class ShardTest extends AbstractActorTest{
@Test
public void testOnReceiveCreateTransactionChain() throws Exception {
new JavaTestKit(getSystem()) {{
- final Props props = Props.create(Shard.class);
+ final Props props = Shard.props("config");
final ActorRef subject = getSystem().actorOf(props, "testCreateTransactionChain");
new Within(duration("1 seconds")) {
@@ -55,7 +55,7 @@ public class ShardTest extends AbstractActorTest{
@Test
public void testOnReceiveRegisterListener() throws Exception {
new JavaTestKit(getSystem()) {{
- final Props props = Props.create(Shard.class);
+ final Props props = Shard.props("config");
final ActorRef subject = getSystem().actorOf(props, "testRegisterChangeListener");
new Within(duration("1 seconds")) {
--
2.36.6