Basic DistributedDataStoreIntegrationTest added
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / main / java / org / opendaylight / controller / cluster / datastore / Shard.java
index b4ad089027c6ea25ae9cada361fc5bf36eb704aa..09ad00598fedb0730cb520740d4f4588a058446e 100644 (file)
@@ -20,8 +20,10 @@ import com.google.common.util.concurrent.ListenableFuture;
 import com.google.common.util.concurrent.ListeningExecutorService;
 import com.google.common.util.concurrent.MoreExecutors;
 import org.opendaylight.controller.cluster.datastore.messages.CommitTransactionReply;
+import org.opendaylight.controller.cluster.datastore.messages.CreateTransaction;
 import org.opendaylight.controller.cluster.datastore.messages.CreateTransactionChain;
 import org.opendaylight.controller.cluster.datastore.messages.CreateTransactionChainReply;
+import org.opendaylight.controller.cluster.datastore.messages.CreateTransactionReply;
 import org.opendaylight.controller.cluster.datastore.messages.ForwardedCommitTransaction;
 import org.opendaylight.controller.cluster.datastore.messages.RegisterChangeListener;
 import org.opendaylight.controller.cluster.datastore.messages.RegisterChangeListenerReply;
@@ -29,6 +31,7 @@ import org.opendaylight.controller.cluster.datastore.messages.UpdateSchemaContex
 import org.opendaylight.controller.cluster.datastore.modification.Modification;
 import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeListener;
 import org.opendaylight.controller.md.sal.dom.store.impl.InMemoryDOMDataStore;
+import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadWriteTransaction;
 import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
 import org.opendaylight.controller.sal.core.spi.data.DOMStoreTransactionChain;
 import org.opendaylight.yangtools.yang.data.api.InstanceIdentifier;
@@ -61,6 +64,8 @@ public class Shard extends UntypedProcessor {
         Logging.getLogger(getContext().system(), this);
 
     private Shard(String name) {
+        log.info("Creating shard : {}", name );
+
         store = new InMemoryDOMDataStore(name, storeExecutor);
     }
 
@@ -77,6 +82,8 @@ public class Shard extends UntypedProcessor {
 
     @Override
     public void onReceive(Object message) throws Exception {
+        log.debug("Received message {}", message);
+
         if (message instanceof CreateTransactionChain) {
             createTransactionChain();
         } else if (message instanceof RegisterChangeListener) {
@@ -87,9 +94,21 @@ public class Shard extends UntypedProcessor {
             handleForwardedCommit((ForwardedCommitTransaction) message);
         } else if (message instanceof Persistent) {
             commit((Persistent) message);
+        } else if (message instanceof CreateTransaction) {
+            createTransaction();
         }
     }
 
+    private void createTransaction() {
+        DOMStoreReadWriteTransaction transaction =
+            store.newReadWriteTransaction();
+        ActorRef transactionActor = getContext().actorOf(
+            ShardTransaction.props(transaction, getSelf()));
+        getSender()
+            .tell(new CreateTransactionReply(transactionActor.path()),
+                getSelf());
+    }
+
     private void commit(Persistent message) {
         Modification modification = (Modification) message.payload();
         DOMStoreThreePhaseCommitCohort cohort =