<artifactId>sal-binding-config</artifactId>
</dependency>
+ <!--
+ Adding a temporary dependency on the sal-broker-impl so that we can use InMemoryDOMDataStore
+
+ InMemoryDOMDataStore needs to be moved into its own module and be wired up using config subsystem before
+ this bundle can use it
+ -->
+ <dependency>
+ <groupId>org.opendaylight.controller</groupId>
+ <artifactId>sal-broker-impl</artifactId>
+ </dependency>
+
<dependency>
<groupId>org.opendaylight.controller</groupId>
<artifactId>sal-common-api</artifactId>
--- /dev/null
+/*
+ * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.cluster.datastore;
+
+import akka.actor.Props;
+import akka.actor.UntypedActor;
+import akka.japi.Creator;
+import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeListener;
+import org.opendaylight.yangtools.yang.data.api.InstanceIdentifier;
+import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
+
+public class ListenerRegistration extends UntypedActor{
+
+ private final org.opendaylight.yangtools.concepts.ListenerRegistration<AsyncDataChangeListener<InstanceIdentifier, NormalizedNode<?, ?>>> registration;
+
+ public ListenerRegistration(org.opendaylight.yangtools.concepts.ListenerRegistration<AsyncDataChangeListener<InstanceIdentifier, NormalizedNode<?, ?>>> registration) {
+ this.registration = registration;
+ }
+
+ @Override
+ public void onReceive(Object message) throws Exception {
+ throw new UnsupportedOperationException("onReceive");
+ }
+
+ public static Props props(final org.opendaylight.yangtools.concepts.ListenerRegistration<AsyncDataChangeListener<InstanceIdentifier, NormalizedNode<?, ?>>> registration){
+ return Props.create(new Creator<ListenerRegistration>(){
+
+ @Override
+ public ListenerRegistration create() throws Exception {
+ return new ListenerRegistration(registration);
+ }
+ });
+ }
+}
--- /dev/null
+/*
+ * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.cluster.datastore;
+
+import akka.actor.ActorRef;
+import akka.event.Logging;
+import akka.event.LoggingAdapter;
+import akka.persistence.UntypedProcessor;
+import com.google.common.util.concurrent.ListeningExecutorService;
+import com.google.common.util.concurrent.MoreExecutors;
+import org.opendaylight.controller.cluster.datastore.messages.CreateTransactionChain;
+import org.opendaylight.controller.cluster.datastore.messages.CreateTransactionChainReply;
+import org.opendaylight.controller.cluster.datastore.messages.RegisterChangeListener;
+import org.opendaylight.controller.cluster.datastore.messages.RegisterChangeListenerReply;
+import org.opendaylight.controller.md.sal.dom.store.impl.InMemoryDOMDataStore;
+import org.opendaylight.controller.sal.core.spi.data.DOMStoreTransactionChain;
+
+import java.util.concurrent.Executors;
+
+/**
+ * A Shard represents a portion of the logical data tree
+ * <p/>
+ * Our Shard uses InMemoryDataStore as it's internal representation and delegates all requests it
+ */
+public class Shard extends UntypedProcessor {
+
+ ListeningExecutorService storeExecutor = MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(2));
+
+ private final InMemoryDOMDataStore store = new InMemoryDOMDataStore("OPER", storeExecutor);
+
+ LoggingAdapter log = Logging.getLogger(getContext().system(), this);
+
+ @Override
+ public void onReceive(Object message) throws Exception {
+ if (message instanceof CreateTransactionChain) {
+ createTransactionChain();
+ } else if(message instanceof RegisterChangeListener){
+ registerChangeListener((RegisterChangeListener) message);
+ }
+ }
+
+ private void registerChangeListener(RegisterChangeListener registerChangeListener) {
+// org.opendaylight.yangtools.concepts.ListenerRegistration<AsyncDataChangeListener<InstanceIdentifier, NormalizedNode<?, ?>>> registration =
+// store.registerChangeListener(registerChangeListener.getPath(), registerChangeListener.getListener(), registerChangeListener.getScope());
+ // TODO: Construct a ListenerRegistration actor with the actual registration returned when registering a listener with the datastore
+ ActorRef listenerRegistration = getContext().actorOf(ListenerRegistration.props(null));
+ getSender().tell(new RegisterChangeListenerReply(listenerRegistration.path()), getSelf());
+ }
+
+ private void createTransactionChain() {
+ DOMStoreTransactionChain chain = store.createTransactionChain();
+ ActorRef transactionChain = getContext().actorOf(TransactionChain.props(chain));
+ getSender().tell(new CreateTransactionChainReply(transactionChain.path()), getSelf());
+ }
+}
--- /dev/null
+/*
+ * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.cluster.datastore;
+
+import akka.actor.Props;
+import akka.actor.UntypedActor;
+import akka.japi.Creator;
+import org.opendaylight.controller.sal.core.spi.data.DOMStoreTransactionChain;
+
+public class TransactionChain extends UntypedActor{
+
+ private final DOMStoreTransactionChain chain;
+
+ public TransactionChain(DOMStoreTransactionChain chain) {
+ this.chain = chain;
+ }
+
+ @Override
+ public void onReceive(Object message) throws Exception {
+ throw new UnsupportedOperationException("onReceive");
+ }
+
+ public static Props props(final DOMStoreTransactionChain chain){
+ return Props.create(new Creator<TransactionChain>(){
+
+ @Override
+ public TransactionChain create() throws Exception {
+ return new TransactionChain(chain);
+ }
+ });
+ }
+}
--- /dev/null
+/*
+ * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.cluster.datastore.messages;
+
+public class CreateTransactionChain {
+
+}
--- /dev/null
+/*
+ * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.cluster.datastore.messages;
+
+import akka.actor.ActorPath;
+
+public class CreateTransactionChainReply {
+ private final ActorPath transactionChainPath;
+
+ public CreateTransactionChainReply(ActorPath transactionChainPath) {
+ this.transactionChainPath = transactionChainPath;
+ }
+
+ public ActorPath getTransactionChainPath() {
+ return transactionChainPath;
+ }
+}
--- /dev/null
+/*
+ * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.cluster.datastore.messages;
+
+import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker;
+import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeListener;
+import org.opendaylight.yangtools.yang.data.api.InstanceIdentifier;
+import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
+
+public class RegisterChangeListener {
+ private final InstanceIdentifier path;
+ private final AsyncDataChangeListener<InstanceIdentifier, NormalizedNode<?, ?>> listener;
+ private final AsyncDataBroker.DataChangeScope scope;
+
+
+ public RegisterChangeListener(InstanceIdentifier path, AsyncDataChangeListener<InstanceIdentifier, NormalizedNode<?, ?>> listener, AsyncDataBroker.DataChangeScope scope) {
+ this.path = path;
+ this.listener = listener;
+ this.scope = scope;
+ }
+
+ public InstanceIdentifier getPath() {
+ return path;
+ }
+
+ public AsyncDataChangeListener<InstanceIdentifier, NormalizedNode<?, ?>> getListener() {
+ return listener;
+ }
+
+ public AsyncDataBroker.DataChangeScope getScope() {
+ return scope;
+ }
+}
--- /dev/null
+/*
+ * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.cluster.datastore.messages;
+
+import akka.actor.ActorPath;
+
+public class RegisterChangeListenerReply {
+ private final ActorPath listenerRegistrationPath;
+
+ public RegisterChangeListenerReply(ActorPath listenerRegistrationPath) {
+ this.listenerRegistrationPath = listenerRegistrationPath;
+ }
+
+ public ActorPath getListenerRegistrationPath() {
+ return listenerRegistrationPath;
+ }
+}
--- /dev/null
+/*
+ * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.cluster.datastore;
+
+import akka.actor.ActorSystem;
+import akka.testkit.JavaTestKit;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+
+public class AbstractActorTest {
+ private static ActorSystem system;
+
+ @BeforeClass
+ public static void setUp(){
+ system = ActorSystem.create("test");
+ }
+
+ @AfterClass
+ public static void tearDown(){
+ JavaTestKit.shutdownActorSystem(system);
+ system = null;
+ }
+
+ protected ActorSystem getSystem(){
+ return system;
+ }
+
+}
--- /dev/null
+package org.opendaylight.controller.cluster.datastore;
+
+import akka.actor.ActorRef;
+import akka.actor.Props;
+import akka.testkit.JavaTestKit;
+import org.junit.Test;
+import org.opendaylight.controller.cluster.datastore.messages.CreateTransactionChain;
+import org.opendaylight.controller.cluster.datastore.messages.CreateTransactionChainReply;
+import org.opendaylight.controller.cluster.datastore.messages.RegisterChangeListener;
+import org.opendaylight.controller.cluster.datastore.messages.RegisterChangeListenerReply;
+import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker;
+import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeEvent;
+import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeListener;
+import org.opendaylight.yangtools.yang.data.api.InstanceIdentifier;
+import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
+
+import static org.junit.Assert.assertTrue;
+
+public class ShardTest extends AbstractActorTest{
+ @Test
+ public void testOnReceiveCreateTransaction() throws Exception {
+ new JavaTestKit(getSystem()) {{
+ final Props props = Props.create(Shard.class);
+ final ActorRef subject = getSystem().actorOf(props, "testCreateTransaction");
+
+ new Within(duration("1 seconds")) {
+ protected void run() {
+
+ subject.tell(new CreateTransactionChain(), getRef());
+
+ final String out = new ExpectMsg<String>("match hint") {
+ // do not put code outside this method, will run afterwards
+ protected String match(Object in) {
+ if (in instanceof CreateTransactionChainReply) {
+ CreateTransactionChainReply reply = (CreateTransactionChainReply) in;
+ return reply.getTransactionChainPath().toString();
+ } else {
+ throw noMatch();
+ }
+ }
+ }.get(); // this extracts the received message
+
+ assertTrue(out.matches("akka:\\/\\/test\\/user\\/testCreateTransaction\\/\\$.*"));
+ // Will wait for the rest of the 3 seconds
+ expectNoMsg();
+ }
+
+
+ };
+ }};
+ }
+
+ @Test
+ public void testOnReceiveRegisterListener() throws Exception {
+ new JavaTestKit(getSystem()) {{
+ final Props props = Props.create(Shard.class);
+ final ActorRef subject = getSystem().actorOf(props, "testRegisterChangeListener");
+
+ new Within(duration("1 seconds")) {
+ protected void run() {
+
+ subject.tell(new RegisterChangeListener(InstanceIdentifier.builder().build(), noOpDataChangeListener() , AsyncDataBroker.DataChangeScope.BASE), getRef());
+
+ final String out = new ExpectMsg<String>("match hint") {
+ // do not put code outside this method, will run afterwards
+ protected String match(Object in) {
+ if (in instanceof RegisterChangeListenerReply) {
+ RegisterChangeListenerReply reply = (RegisterChangeListenerReply) in;
+ return reply.getListenerRegistrationPath().toString();
+ } else {
+ throw noMatch();
+ }
+ }
+ }.get(); // this extracts the received message
+
+ assertTrue(out.matches("akka:\\/\\/test\\/user\\/testRegisterChangeListener\\/\\$.*"));
+ // Will wait for the rest of the 3 seconds
+ expectNoMsg();
+ }
+
+
+ };
+ }};
+ }
+
+ private AsyncDataChangeListener<InstanceIdentifier, NormalizedNode<?, ?>> noOpDataChangeListener(){
+ return new AsyncDataChangeListener<InstanceIdentifier, NormalizedNode<?, ?>>() {
+ @Override
+ public void onDataChanged(AsyncDataChangeEvent<InstanceIdentifier, NormalizedNode<?, ?>> change) {
+
+ }
+ };
+ }
+}
\ No newline at end of file