Implement DataChangeListener
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / main / java / org / opendaylight / controller / cluster / datastore / ShardTransaction.java
index f747afa7867ac8f7f6434987045315b98bc9cca5..ff02bfbcce520be7a791a555cba318520d90e978 100644 (file)
@@ -9,8 +9,8 @@
 package org.opendaylight.controller.cluster.datastore;
 
 import akka.actor.ActorRef;
+import akka.actor.PoisonPill;
 import akka.actor.Props;
-import akka.actor.UntypedActor;
 import akka.event.Logging;
 import akka.event.LoggingAdapter;
 import akka.japi.Creator;
@@ -36,6 +36,7 @@ import org.opendaylight.controller.cluster.datastore.modification.MutableComposi
 import org.opendaylight.controller.cluster.datastore.modification.WriteModification;
 import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadWriteTransaction;
 import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
+import org.opendaylight.controller.sal.core.spi.data.DOMStoreTransactionChain;
 import org.opendaylight.yangtools.yang.data.api.InstanceIdentifier;
 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
 
@@ -63,10 +64,15 @@ import java.util.concurrent.ExecutionException;
  * <li> {@link org.opendaylight.controller.cluster.datastore.messages.CloseTransaction}
  * </p>
  */
-public class ShardTransaction extends UntypedActor {
+public class ShardTransaction extends AbstractUntypedActor {
 
     private final ActorRef shardActor;
 
+    // FIXME : see below
+    // If transactionChain is not null then this transaction is part of a
+    // transactionChain. Not really clear as to what that buys us
+    private final DOMStoreTransactionChain transactionChain;
+
     private final DOMStoreReadWriteTransaction transaction;
 
     private final MutableCompositeModification modification =
@@ -77,11 +83,18 @@ public class ShardTransaction extends UntypedActor {
 
     public ShardTransaction(DOMStoreReadWriteTransaction transaction,
         ActorRef shardActor) {
+        this(null, transaction, shardActor);
+    }
+
+    public ShardTransaction(DOMStoreTransactionChain transactionChain, DOMStoreReadWriteTransaction transaction,
+        ActorRef shardActor) {
+        this.transactionChain = transactionChain;
         this.transaction = transaction;
         this.shardActor = shardActor;
     }
 
 
+
     public static Props props(final DOMStoreReadWriteTransaction transaction,
         final ActorRef shardActor) {
         return Props.create(new Creator<ShardTransaction>() {
@@ -93,10 +106,20 @@ public class ShardTransaction extends UntypedActor {
         });
     }
 
-    @Override
-    public void onReceive(Object message) throws Exception {
-        log.debug("Received message {}", message);
+    public static Props props(final DOMStoreTransactionChain transactionChain, final DOMStoreReadWriteTransaction transaction,
+        final ActorRef shardActor) {
+        return Props.create(new Creator<ShardTransaction>() {
 
+            @Override
+            public ShardTransaction create() throws Exception {
+                return new ShardTransaction(transactionChain, transaction, shardActor);
+            }
+        });
+    }
+
+
+    @Override
+    public void handleReceive(Object message) throws Exception {
         if (message instanceof ReadData) {
             readData((ReadData) message);
         } else if (message instanceof WriteData) {
@@ -131,7 +154,7 @@ public class ShardTransaction extends UntypedActor {
                     if (optional.isPresent()) {
                         sender.tell(new ReadDataReply(optional.get()), self);
                     } else {
-                        //TODO : Need to decide what to do here
+                        sender.tell(new ReadDataReply(null), self);
                     }
                 } catch (InterruptedException | ExecutionException e) {
                     log.error(e,
@@ -176,6 +199,7 @@ public class ShardTransaction extends UntypedActor {
     private void closeTransaction(CloseTransaction message) {
         transaction.close();
         getSender().tell(new CloseTransactionReply(), getSelf());
+        getSelf().tell(PoisonPill.getInstance(), getSelf());
     }