From: Ed Warnicke Date: Mon, 30 Jun 2014 16:36:12 +0000 (+0000) Subject: Merge "Implement finding a primary based on the shard name and do basic wiring of... X-Git-Tag: release/helium~581 X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=commitdiff_plain;h=44a86821d69cd804b6b23b437e0b27136eaac2b5;hp=38b193d477d2f3051185d230537810ac98498fdf Merge "Implement finding a primary based on the shard name and do basic wiring of DistributedDataStore and ShardManager" --- diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DistributedDataStore.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DistributedDataStore.java index 8d5b0c2f4a..c87f1abb21 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DistributedDataStore.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DistributedDataStore.java @@ -8,6 +8,8 @@ package org.opendaylight.controller.cluster.datastore; +import akka.actor.ActorRef; +import akka.actor.ActorSystem; import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker; import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeListener; import org.opendaylight.controller.sal.core.spi.data.DOMStore; @@ -23,29 +25,34 @@ import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode; * */ public class DistributedDataStore implements DOMStore { - - @Override - public >> ListenerRegistration registerChangeListener(InstanceIdentifier path, L listener, AsyncDataBroker.DataChangeScope scope) { - return new ListenerRegistrationProxy(); - } - - @Override - public DOMStoreTransactionChain createTransactionChain() { - return new TransactionChainProxy(); - } - - @Override - public DOMStoreReadTransaction newReadOnlyTransaction() { - return new TransactionProxy(); - } - - @Override - public DOMStoreWriteTransaction newWriteOnlyTransaction() { - return new TransactionProxy(); - } - - @Override - public DOMStoreReadWriteTransaction newReadWriteTransaction() { - return new TransactionProxy(); - } + private final ActorRef shardManager; + + public DistributedDataStore(ActorSystem actorSystem, String type) { + shardManager = actorSystem.actorOf(ShardManager.props(type)); + } + + @Override + public >> ListenerRegistration registerChangeListener(InstanceIdentifier path, L listener, AsyncDataBroker.DataChangeScope scope) { + return new ListenerRegistrationProxy(); + } + + @Override + public DOMStoreTransactionChain createTransactionChain() { + return new TransactionChainProxy(); + } + + @Override + public DOMStoreReadTransaction newReadOnlyTransaction() { + return new TransactionProxy(); + } + + @Override + public DOMStoreWriteTransaction newWriteOnlyTransaction() { + return new TransactionProxy(); + } + + @Override + public DOMStoreReadWriteTransaction newReadWriteTransaction() { + return new TransactionProxy(); + } } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java index 8365328669..f96cb14510 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java @@ -9,8 +9,10 @@ package org.opendaylight.controller.cluster.datastore; import akka.actor.ActorRef; +import akka.actor.Props; import akka.event.Logging; import akka.event.LoggingAdapter; +import akka.japi.Creator; import akka.persistence.UntypedProcessor; import com.google.common.util.concurrent.ListeningExecutorService; import com.google.common.util.concurrent.MoreExecutors; @@ -28,26 +30,43 @@ import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode; import java.util.concurrent.Executors; /** - * A Shard represents a portion of the logical data tree - *

+ * A Shard represents a portion of the logical data tree
+ *

* Our Shard uses InMemoryDataStore as it's internal representation and delegates all requests it - * + *

*/ public class Shard extends UntypedProcessor { - ListeningExecutorService storeExecutor = MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(2)); + public static final String DEFAULT_NAME = "default"; + + private final ListeningExecutorService storeExecutor = MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(2)); + + private final InMemoryDOMDataStore store; + + private final LoggingAdapter log = Logging.getLogger(getContext().system(), this); - private final InMemoryDOMDataStore store = new InMemoryDOMDataStore("OPER", storeExecutor); + private Shard(String name){ + store = new InMemoryDOMDataStore(name, storeExecutor); + } + + public static Props props(final String name) { + return Props.create(new Creator() { - LoggingAdapter log = Logging.getLogger(getContext().system(), this); + @Override + public Shard create() throws Exception { + return new Shard(name); + } + + }); + } @Override public void onReceive(Object message) throws Exception { if (message instanceof CreateTransactionChain) { createTransactionChain(); - } else if(message instanceof RegisterChangeListener){ + } else if (message instanceof RegisterChangeListener) { registerChangeListener((RegisterChangeListener) message); - } else if(message instanceof UpdateSchemaContext){ + } else if (message instanceof UpdateSchemaContext) { updateSchemaContext((UpdateSchemaContext) message); } } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardManager.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardManager.java index 63266d6308..8d8527a240 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardManager.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardManager.java @@ -8,11 +8,16 @@ package org.opendaylight.controller.cluster.datastore; +import akka.actor.ActorPath; +import akka.actor.ActorRef; import akka.actor.Address; +import akka.actor.Props; import akka.actor.UntypedActor; import akka.event.Logging; import akka.event.LoggingAdapter; +import akka.japi.Creator; import org.opendaylight.controller.cluster.datastore.messages.FindPrimary; +import org.opendaylight.controller.cluster.datastore.messages.PrimaryFound; import org.opendaylight.controller.cluster.datastore.messages.PrimaryNotFound; import java.util.HashMap; @@ -21,41 +26,74 @@ import java.util.Map; /** * The ShardManager has the following jobs, - * - * - Create all the local shard replicas that belong on this cluster member - * - Find the primary replica for any given shard - * - Engage in shard replica elections which decide which replica should be the primary - * - * Creation of Shard replicas - * ========================== - * When the ShardManager is constructed it reads the cluster configuration to find out which shard replicas - * belong on this member. It finds out the name of the current cluster member from the Akka Clustering Service. - * - * Replica Elections - * ================= - * The Shard Manager uses multiple cues to initiate election. - * - When a member of the cluster dies - * - When a local shard replica dies - * - When a local shard replica comes alive + *

+ *

  • Create all the local shard replicas that belong on this cluster member + *
  • Find the primary replica for any given shard + *
  • Engage in shard replica elections which decide which replica should be the primary + *

    + *

    + *

    >Creation of Shard replicas

    + * When the ShardManager is constructed it reads the cluster configuration to find out which shard replicas + * belong on this member. It finds out the name of the current cluster member from the Akka Clustering Service. + *

    + *

    + *

    Replica Elections

    + *

    + *

    + * The Shard Manager uses multiple cues to initiate election. + *

  • When a member of the cluster dies + *
  • When a local shard replica dies + *
  • When a local shard replica comes alive + *

    */ public class ShardManager extends UntypedActor { - // Stores a mapping between a shard name and the address of the current primary - private final Map shardNameToPrimaryAddress = new HashMap<>(); + // Stores a mapping between a shard name and the address of the current primary + private final Map shardNameToPrimaryAddress = new HashMap<>(); + + // Stores a mapping between a member name and the address of the member + private final Map memberNameToAddress = new HashMap<>(); + + // Stores a mapping between the shard name and all the members on which a replica of that shard are available + private final Map> shardNameToMembers = new HashMap<>(); - // Stores a mapping between a member name and the address of the member - private final Map memberNameToAddress = new HashMap<>(); + private final LoggingAdapter log = Logging.getLogger(getContext().system(), this); - // Stores a mapping between the shard name and all the members on which a replica of that shard are available - private final Map> shardNameToMembers = new HashMap<>(); + private final ActorPath defaultShardPath; - LoggingAdapter log = Logging.getLogger(getContext().system(), this); + /** + * + * @param type defines the kind of data that goes into shards created by this shard manager. Examples of type would be + * configuration or operational + */ + private ShardManager(String type){ + ActorRef actor = getContext().actorOf(Shard.props(Shard.DEFAULT_NAME + "-" + type)); + defaultShardPath = actor.path(); + } - @Override - public void onReceive(Object message) throws Exception { - if(message instanceof FindPrimary ){ - FindPrimary msg = ((FindPrimary) message); - getSender().tell(new PrimaryNotFound(msg.getShardName()), getSelf()); - } + public static Props props(final String type){ + return Props.create(new Creator(){ + + @Override + public ShardManager create() throws Exception { + return new ShardManager(type); + } + }); + } + + @Override + public void onReceive(Object message) throws Exception { + if (message instanceof FindPrimary) { + FindPrimary msg = ((FindPrimary) message); + String shardName = msg.getShardName(); + if(Shard.DEFAULT_NAME.equals(shardName)){ + getSender().tell(new PrimaryFound(defaultShardPath.toString()), getSelf()); + } else { + getSender().tell(new PrimaryNotFound(shardName), getSelf()); + } } + } + + } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/PrimaryFound.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/PrimaryFound.java index 1326898b0f..d6aae3786f 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/PrimaryFound.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/PrimaryFound.java @@ -9,4 +9,39 @@ package org.opendaylight.controller.cluster.datastore.messages; public class PrimaryFound { + private final String primaryPath; + + public PrimaryFound(String primaryPath) { + this.primaryPath = primaryPath; + } + + public String getPrimaryPath() { + return primaryPath; + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + + PrimaryFound that = (PrimaryFound) o; + + if (!primaryPath.equals(that.primaryPath)) return false; + + return true; + } + + @Override + public int hashCode() { + return primaryPath.hashCode(); + } + + @Override + public String toString() { + return "PrimaryFound{" + + "primaryPath='" + primaryPath + '\'' + + '}'; + } + + } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/config/yang/config/distributed_datastore_provider/DistributedDataStoreProviderModule.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/config/yang/config/distributed_datastore_provider/DistributedDataStoreProviderModule.java index 241bcb0a41..3a78f93d8d 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/config/yang/config/distributed_datastore_provider/DistributedDataStoreProviderModule.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/config/yang/config/distributed_datastore_provider/DistributedDataStoreProviderModule.java @@ -1,34 +1,36 @@ package org.opendaylight.controller.config.yang.config.distributed_datastore_provider; +import akka.actor.ActorSystem; import org.opendaylight.controller.cluster.datastore.DistributedDataStore; public class DistributedDataStoreProviderModule extends org.opendaylight.controller.config.yang.config.distributed_datastore_provider.AbstractDistributedDataStoreProviderModule { - public DistributedDataStoreProviderModule(org.opendaylight.controller.config.api.ModuleIdentifier identifier, org.opendaylight.controller.config.api.DependencyResolver dependencyResolver) { - super(identifier, dependencyResolver); + public DistributedDataStoreProviderModule(org.opendaylight.controller.config.api.ModuleIdentifier identifier, org.opendaylight.controller.config.api.DependencyResolver dependencyResolver) { + super(identifier, dependencyResolver); + } + + public DistributedDataStoreProviderModule(org.opendaylight.controller.config.api.ModuleIdentifier identifier, org.opendaylight.controller.config.api.DependencyResolver dependencyResolver, org.opendaylight.controller.config.yang.config.distributed_datastore_provider.DistributedDataStoreProviderModule oldModule, java.lang.AutoCloseable oldInstance) { + super(identifier, dependencyResolver, oldModule, oldInstance); + } + + @Override + public void customValidation() { + // add custom validation form module attributes here. + } + + @Override + public java.lang.AutoCloseable createInstance() { + ActorSystem actorSystem = ActorSystem.create("opendaylight-cluster"); + final DistributedDataStore configurationStore = new DistributedDataStore(actorSystem, "config"); + final DistributedDataStore operationalStore = new DistributedDataStore(actorSystem, "operational"); + + final class AutoCloseableDistributedDataStore implements AutoCloseable { + + @Override + public void close() throws Exception { + } } - public DistributedDataStoreProviderModule(org.opendaylight.controller.config.api.ModuleIdentifier identifier, org.opendaylight.controller.config.api.DependencyResolver dependencyResolver, org.opendaylight.controller.config.yang.config.distributed_datastore_provider.DistributedDataStoreProviderModule oldModule, java.lang.AutoCloseable oldInstance) { - super(identifier, dependencyResolver, oldModule, oldInstance); - } - - @Override - public void customValidation() { - // add custom validation form module attributes here. - } - - @Override - public java.lang.AutoCloseable createInstance() { - new DistributedDataStore(); - - final class AutoCloseableDistributedDataStore implements AutoCloseable { - - @Override - public void close() throws Exception { - - } - } - - return new AutoCloseableDistributedDataStore(); - } + return new AutoCloseableDistributedDataStore(); + } } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/AbstractActorTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/AbstractActorTest.java index 2fe7b69cc9..45ef32f7ad 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/AbstractActorTest.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/AbstractActorTest.java @@ -17,12 +17,12 @@ public abstract class AbstractActorTest { private static ActorSystem system; @BeforeClass - public static void setUp(){ + public static void setUpClass(){ system = ActorSystem.create("test"); } @AfterClass - public static void tearDown(){ + public static void tearDownClass(){ JavaTestKit.shutdownActorSystem(system); system = null; } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DistributedDataStoreTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DistributedDataStoreTest.java index 6544f33030..45492fd714 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DistributedDataStoreTest.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DistributedDataStoreTest.java @@ -12,13 +12,13 @@ import org.opendaylight.yangtools.concepts.ListenerRegistration; import org.opendaylight.yangtools.yang.data.api.InstanceIdentifier; import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode; -public class DistributedDataStoreTest { +public class DistributedDataStoreTest extends AbstractActorTest{ private DistributedDataStore distributedDataStore; @org.junit.Before public void setUp() throws Exception { - distributedDataStore = new DistributedDataStore(); + distributedDataStore = new DistributedDataStore(getSystem(), "config"); } @org.junit.After diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardManagerTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardManagerTest.java index 9c1ea70fdb..fa436c1605 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardManagerTest.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardManagerTest.java @@ -8,6 +8,7 @@ import org.junit.AfterClass; import org.junit.BeforeClass; import org.junit.Test; import org.opendaylight.controller.cluster.datastore.messages.FindPrimary; +import org.opendaylight.controller.cluster.datastore.messages.PrimaryFound; import org.opendaylight.controller.cluster.datastore.messages.PrimaryNotFound; import scala.concurrent.duration.Duration; @@ -26,17 +27,13 @@ public class ShardManagerTest { } @Test - public void testOnReceiveFindPrimary() throws Exception { + public void testOnReceiveFindPrimaryForNonExistentShard() throws Exception { new JavaTestKit(system) {{ - final Props props = Props.create(ShardManager.class); - final TestActorRef subject = TestActorRef.create(system, props, "test"); + final Props props = ShardManager.props("config"); + final TestActorRef subject = TestActorRef.create(system, props); - // can also use JavaTestKit “from the outside” - final JavaTestKit probe = new JavaTestKit(system); - - // the run() method needs to finish within 3 seconds - new Within(duration("3 seconds")) { + new Within(duration("1 seconds")) { protected void run() { subject.tell(new FindPrimary("inventory"), getRef()); @@ -49,4 +46,25 @@ public class ShardManagerTest { }; }}; } + + @Test + public void testOnReceiveFindPrimaryForExistentShard() throws Exception { + + new JavaTestKit(system) {{ + final Props props = ShardManager.props("config"); + final TestActorRef subject = TestActorRef.create(system, props); + + // the run() method needs to finish within 3 seconds + new Within(duration("1 seconds")) { + protected void run() { + + subject.tell(new FindPrimary(Shard.DEFAULT_NAME), getRef()); + + expectMsgClass(PrimaryFound.class); + + expectNoMsg(); + } + }; + }}; + } } \ No newline at end of file diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTest.java index b5a341d95c..7f2a836b6f 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTest.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTest.java @@ -22,7 +22,7 @@ public class ShardTest extends AbstractActorTest{ @Test public void testOnReceiveCreateTransactionChain() throws Exception { new JavaTestKit(getSystem()) {{ - final Props props = Props.create(Shard.class); + final Props props = Shard.props("config"); final ActorRef subject = getSystem().actorOf(props, "testCreateTransactionChain"); new Within(duration("1 seconds")) { @@ -55,7 +55,7 @@ public class ShardTest extends AbstractActorTest{ @Test public void testOnReceiveRegisterListener() throws Exception { new JavaTestKit(getSystem()) {{ - final Props props = Props.create(Shard.class); + final Props props = Shard.props("config"); final ActorRef subject = getSystem().actorOf(props, "testRegisterChangeListener"); new Within(duration("1 seconds")) {