From: Moiz Raja Date: Tue, 17 Jun 2014 20:41:01 +0000 (-0700) Subject: Initial implementation of the Shard Actor X-Git-Tag: release/helium~631^2 X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=commitdiff_plain;h=287688202f1166cfecf2149257de70ec627922fd Initial implementation of the Shard Actor Things to note, - Added a temporary dependency on sal-broker-impl. This is so that I could use the InMemoryDOMDataStore. Once InMemoryDOMDataStore is moved to it's own bundle then I will simply switch the dependency - Shard has been only implemented to the point where it is using an InMemoryDOMDataStore and handling the CreateTransaction and RegisterChangeListener messages This commit is intended to give a feel for what kind of coding patterns will be used to implement Shard and related actors and their tests Change-Id: I86f0d701399805185a0987bb1b97fe1358ce4cd9 Signed-off-by: Moiz Raja --- diff --git a/opendaylight/md-sal/sal-distributed-datastore/pom.xml b/opendaylight/md-sal/sal-distributed-datastore/pom.xml index cf28d067ab..ea686d966f 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/pom.xml +++ b/opendaylight/md-sal/sal-distributed-datastore/pom.xml @@ -57,6 +57,17 @@ sal-binding-config + + + org.opendaylight.controller + sal-broker-impl + + org.opendaylight.controller sal-common-api diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ListenerRegistration.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ListenerRegistration.java new file mode 100644 index 0000000000..922c1950b8 --- /dev/null +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ListenerRegistration.java @@ -0,0 +1,40 @@ +/* + * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ + +package org.opendaylight.controller.cluster.datastore; + +import akka.actor.Props; +import akka.actor.UntypedActor; +import akka.japi.Creator; +import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeListener; +import org.opendaylight.yangtools.yang.data.api.InstanceIdentifier; +import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode; + +public class ListenerRegistration extends UntypedActor{ + + private final org.opendaylight.yangtools.concepts.ListenerRegistration>> registration; + + public ListenerRegistration(org.opendaylight.yangtools.concepts.ListenerRegistration>> registration) { + this.registration = registration; + } + + @Override + public void onReceive(Object message) throws Exception { + throw new UnsupportedOperationException("onReceive"); + } + + public static Props props(final org.opendaylight.yangtools.concepts.ListenerRegistration>> registration){ + return Props.create(new Creator(){ + + @Override + public ListenerRegistration create() throws Exception { + return new ListenerRegistration(registration); + } + }); + } +} diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java new file mode 100644 index 0000000000..cab35dd7af --- /dev/null +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java @@ -0,0 +1,61 @@ +/* + * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ + +package org.opendaylight.controller.cluster.datastore; + +import akka.actor.ActorRef; +import akka.event.Logging; +import akka.event.LoggingAdapter; +import akka.persistence.UntypedProcessor; +import com.google.common.util.concurrent.ListeningExecutorService; +import com.google.common.util.concurrent.MoreExecutors; +import org.opendaylight.controller.cluster.datastore.messages.CreateTransactionChain; +import org.opendaylight.controller.cluster.datastore.messages.CreateTransactionChainReply; +import org.opendaylight.controller.cluster.datastore.messages.RegisterChangeListener; +import org.opendaylight.controller.cluster.datastore.messages.RegisterChangeListenerReply; +import org.opendaylight.controller.md.sal.dom.store.impl.InMemoryDOMDataStore; +import org.opendaylight.controller.sal.core.spi.data.DOMStoreTransactionChain; + +import java.util.concurrent.Executors; + +/** + * A Shard represents a portion of the logical data tree + *

+ * Our Shard uses InMemoryDataStore as it's internal representation and delegates all requests it + */ +public class Shard extends UntypedProcessor { + + ListeningExecutorService storeExecutor = MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(2)); + + private final InMemoryDOMDataStore store = new InMemoryDOMDataStore("OPER", storeExecutor); + + LoggingAdapter log = Logging.getLogger(getContext().system(), this); + + @Override + public void onReceive(Object message) throws Exception { + if (message instanceof CreateTransactionChain) { + createTransactionChain(); + } else if(message instanceof RegisterChangeListener){ + registerChangeListener((RegisterChangeListener) message); + } + } + + private void registerChangeListener(RegisterChangeListener registerChangeListener) { +// org.opendaylight.yangtools.concepts.ListenerRegistration>> registration = +// store.registerChangeListener(registerChangeListener.getPath(), registerChangeListener.getListener(), registerChangeListener.getScope()); + // TODO: Construct a ListenerRegistration actor with the actual registration returned when registering a listener with the datastore + ActorRef listenerRegistration = getContext().actorOf(ListenerRegistration.props(null)); + getSender().tell(new RegisterChangeListenerReply(listenerRegistration.path()), getSelf()); + } + + private void createTransactionChain() { + DOMStoreTransactionChain chain = store.createTransactionChain(); + ActorRef transactionChain = getContext().actorOf(TransactionChain.props(chain)); + getSender().tell(new CreateTransactionChainReply(transactionChain.path()), getSelf()); + } +} diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionChain.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionChain.java new file mode 100644 index 0000000000..1cce09fb33 --- /dev/null +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionChain.java @@ -0,0 +1,38 @@ +/* + * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ + +package org.opendaylight.controller.cluster.datastore; + +import akka.actor.Props; +import akka.actor.UntypedActor; +import akka.japi.Creator; +import org.opendaylight.controller.sal.core.spi.data.DOMStoreTransactionChain; + +public class TransactionChain extends UntypedActor{ + + private final DOMStoreTransactionChain chain; + + public TransactionChain(DOMStoreTransactionChain chain) { + this.chain = chain; + } + + @Override + public void onReceive(Object message) throws Exception { + throw new UnsupportedOperationException("onReceive"); + } + + public static Props props(final DOMStoreTransactionChain chain){ + return Props.create(new Creator(){ + + @Override + public TransactionChain create() throws Exception { + return new TransactionChain(chain); + } + }); + } +} diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/CreateTransactionChain.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/CreateTransactionChain.java new file mode 100644 index 0000000000..cdad3324ab --- /dev/null +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/CreateTransactionChain.java @@ -0,0 +1,13 @@ +/* + * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ + +package org.opendaylight.controller.cluster.datastore.messages; + +public class CreateTransactionChain { + +} diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/CreateTransactionChainReply.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/CreateTransactionChainReply.java new file mode 100644 index 0000000000..49dd9b63d2 --- /dev/null +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/CreateTransactionChainReply.java @@ -0,0 +1,23 @@ +/* + * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ + +package org.opendaylight.controller.cluster.datastore.messages; + +import akka.actor.ActorPath; + +public class CreateTransactionChainReply { + private final ActorPath transactionChainPath; + + public CreateTransactionChainReply(ActorPath transactionChainPath) { + this.transactionChainPath = transactionChainPath; + } + + public ActorPath getTransactionChainPath() { + return transactionChainPath; + } +} diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/RegisterChangeListener.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/RegisterChangeListener.java new file mode 100644 index 0000000000..0123a70147 --- /dev/null +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/RegisterChangeListener.java @@ -0,0 +1,39 @@ +/* + * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ + +package org.opendaylight.controller.cluster.datastore.messages; + +import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker; +import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeListener; +import org.opendaylight.yangtools.yang.data.api.InstanceIdentifier; +import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode; + +public class RegisterChangeListener { + private final InstanceIdentifier path; + private final AsyncDataChangeListener> listener; + private final AsyncDataBroker.DataChangeScope scope; + + + public RegisterChangeListener(InstanceIdentifier path, AsyncDataChangeListener> listener, AsyncDataBroker.DataChangeScope scope) { + this.path = path; + this.listener = listener; + this.scope = scope; + } + + public InstanceIdentifier getPath() { + return path; + } + + public AsyncDataChangeListener> getListener() { + return listener; + } + + public AsyncDataBroker.DataChangeScope getScope() { + return scope; + } +} diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/RegisterChangeListenerReply.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/RegisterChangeListenerReply.java new file mode 100644 index 0000000000..ae8bbbd75a --- /dev/null +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/RegisterChangeListenerReply.java @@ -0,0 +1,23 @@ +/* + * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ + +package org.opendaylight.controller.cluster.datastore.messages; + +import akka.actor.ActorPath; + +public class RegisterChangeListenerReply { + private final ActorPath listenerRegistrationPath; + + public RegisterChangeListenerReply(ActorPath listenerRegistrationPath) { + this.listenerRegistrationPath = listenerRegistrationPath; + } + + public ActorPath getListenerRegistrationPath() { + return listenerRegistrationPath; + } +} diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/AbstractActorTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/AbstractActorTest.java new file mode 100644 index 0000000000..ed928ec29c --- /dev/null +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/AbstractActorTest.java @@ -0,0 +1,34 @@ +/* + * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ + +package org.opendaylight.controller.cluster.datastore; + +import akka.actor.ActorSystem; +import akka.testkit.JavaTestKit; +import org.junit.AfterClass; +import org.junit.BeforeClass; + +public class AbstractActorTest { + private static ActorSystem system; + + @BeforeClass + public static void setUp(){ + system = ActorSystem.create("test"); + } + + @AfterClass + public static void tearDown(){ + JavaTestKit.shutdownActorSystem(system); + system = null; + } + + protected ActorSystem getSystem(){ + return system; + } + +} diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTest.java new file mode 100644 index 0000000000..934c064c3e --- /dev/null +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTest.java @@ -0,0 +1,94 @@ +package org.opendaylight.controller.cluster.datastore; + +import akka.actor.ActorRef; +import akka.actor.Props; +import akka.testkit.JavaTestKit; +import org.junit.Test; +import org.opendaylight.controller.cluster.datastore.messages.CreateTransactionChain; +import org.opendaylight.controller.cluster.datastore.messages.CreateTransactionChainReply; +import org.opendaylight.controller.cluster.datastore.messages.RegisterChangeListener; +import org.opendaylight.controller.cluster.datastore.messages.RegisterChangeListenerReply; +import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker; +import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeEvent; +import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeListener; +import org.opendaylight.yangtools.yang.data.api.InstanceIdentifier; +import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode; + +import static org.junit.Assert.assertTrue; + +public class ShardTest extends AbstractActorTest{ + @Test + public void testOnReceiveCreateTransaction() throws Exception { + new JavaTestKit(getSystem()) {{ + final Props props = Props.create(Shard.class); + final ActorRef subject = getSystem().actorOf(props, "testCreateTransaction"); + + new Within(duration("1 seconds")) { + protected void run() { + + subject.tell(new CreateTransactionChain(), getRef()); + + final String out = new ExpectMsg("match hint") { + // do not put code outside this method, will run afterwards + protected String match(Object in) { + if (in instanceof CreateTransactionChainReply) { + CreateTransactionChainReply reply = (CreateTransactionChainReply) in; + return reply.getTransactionChainPath().toString(); + } else { + throw noMatch(); + } + } + }.get(); // this extracts the received message + + assertTrue(out.matches("akka:\\/\\/test\\/user\\/testCreateTransaction\\/\\$.*")); + // Will wait for the rest of the 3 seconds + expectNoMsg(); + } + + + }; + }}; + } + + @Test + public void testOnReceiveRegisterListener() throws Exception { + new JavaTestKit(getSystem()) {{ + final Props props = Props.create(Shard.class); + final ActorRef subject = getSystem().actorOf(props, "testRegisterChangeListener"); + + new Within(duration("1 seconds")) { + protected void run() { + + subject.tell(new RegisterChangeListener(InstanceIdentifier.builder().build(), noOpDataChangeListener() , AsyncDataBroker.DataChangeScope.BASE), getRef()); + + final String out = new ExpectMsg("match hint") { + // do not put code outside this method, will run afterwards + protected String match(Object in) { + if (in instanceof RegisterChangeListenerReply) { + RegisterChangeListenerReply reply = (RegisterChangeListenerReply) in; + return reply.getListenerRegistrationPath().toString(); + } else { + throw noMatch(); + } + } + }.get(); // this extracts the received message + + assertTrue(out.matches("akka:\\/\\/test\\/user\\/testRegisterChangeListener\\/\\$.*")); + // Will wait for the rest of the 3 seconds + expectNoMsg(); + } + + + }; + }}; + } + + private AsyncDataChangeListener> noOpDataChangeListener(){ + return new AsyncDataChangeListener>() { + @Override + public void onDataChanged(AsyncDataChangeEvent> change) { + + } + }; + } +} \ No newline at end of file