Merge "Kill Dynamic Actors when we're done with them"
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / main / java / org / opendaylight / controller / cluster / datastore / TransactionProxy.java
index c3c7e7c00cb0d6ee4077042fa11fb548c2d74e1f..c12276134e99c0bff0e9f3dc6c3d61550c39e4ad 100644 (file)
@@ -8,35 +8,46 @@
 
 package org.opendaylight.controller.cluster.datastore;
 
+import akka.actor.ActorPath;
 import akka.actor.ActorSelection;
 import com.google.common.base.Optional;
 import com.google.common.util.concurrent.ListenableFuture;
 import com.google.common.util.concurrent.ListenableFutureTask;
+import org.opendaylight.controller.cluster.datastore.messages.CloseTransaction;
 import org.opendaylight.controller.cluster.datastore.messages.CreateTransaction;
 import org.opendaylight.controller.cluster.datastore.messages.CreateTransactionReply;
+import org.opendaylight.controller.cluster.datastore.messages.DeleteData;
+import org.opendaylight.controller.cluster.datastore.messages.MergeData;
 import org.opendaylight.controller.cluster.datastore.messages.ReadData;
 import org.opendaylight.controller.cluster.datastore.messages.ReadDataReply;
+import org.opendaylight.controller.cluster.datastore.messages.ReadyTransaction;
+import org.opendaylight.controller.cluster.datastore.messages.ReadyTransactionReply;
+import org.opendaylight.controller.cluster.datastore.messages.WriteData;
 import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
 import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadWriteTransaction;
 import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
 import org.opendaylight.yangtools.yang.data.api.InstanceIdentifier;
 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
 
+import java.util.ArrayList;
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
 import java.util.concurrent.Callable;
 import java.util.concurrent.Executors;
+import java.util.concurrent.atomic.AtomicLong;
 
 /**
  * TransactionProxy acts as a proxy for one or more transactions that were created on a remote shard
- *
+ * <p>
  * Creating a transaction on the consumer side will create one instance of a transaction proxy. If during
  * the transaction reads and writes are done on data that belongs to different shards then a separate transaction will
  * be created on each of those shards by the TransactionProxy
- *
+ *</p>
+ * <p>
  * The TransactionProxy does not make any guarantees about atomicity or order in which the transactions on the various
  * shards will be executed.
- *
+ * </p>
  */
 public class TransactionProxy implements DOMStoreReadWriteTransaction {
 
@@ -46,15 +57,19 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction {
         READ_WRITE
     }
 
-    private final TransactionType readOnly;
+    private static final AtomicLong counter = new AtomicLong();
+
+    private final TransactionType transactionType;
     private final ActorContext actorContext;
     private final Map<String, ActorSelection> remoteTransactionPaths = new HashMap<>();
+    private final String identifier;
 
     public TransactionProxy(
         ActorContext actorContext,
-        TransactionType readOnly) {
+        TransactionType transactionType) {
 
-        this.readOnly = readOnly;
+        this.identifier = "transaction-" + counter.getAndIncrement();
+        this.transactionType = transactionType;
         this.actorContext = actorContext;
 
         Object response = actorContext.executeShardOperation(Shard.DEFAULT_NAME, new CreateTransaction(), ActorContext.ASK_DURATION);
@@ -95,32 +110,51 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction {
 
     @Override
     public void write(InstanceIdentifier path, NormalizedNode<?, ?> data) {
-        throw new UnsupportedOperationException("write");
+        final ActorSelection remoteTransaction = remoteTransactionFromIdentifier(path);
+        remoteTransaction.tell(new WriteData(path, data), null);
     }
 
     @Override
     public void merge(InstanceIdentifier path, NormalizedNode<?, ?> data) {
-        throw new UnsupportedOperationException("merge");
+        final ActorSelection remoteTransaction = remoteTransactionFromIdentifier(path);
+        remoteTransaction.tell(new MergeData(path, data), null);
     }
 
     @Override
     public void delete(InstanceIdentifier path) {
-        throw new UnsupportedOperationException("delete");
+        final ActorSelection remoteTransaction = remoteTransactionFromIdentifier(path);
+        remoteTransaction.tell(new DeleteData(path), null);
     }
 
     @Override
     public DOMStoreThreePhaseCommitCohort ready() {
-        throw new UnsupportedOperationException("ready");
+        List<ActorPath> cohortPaths = new ArrayList<>();
+
+        for(ActorSelection remoteTransaction : remoteTransactionPaths.values()) {
+            Object result = actorContext.executeRemoteOperation(remoteTransaction,
+                new ReadyTransaction(),
+                ActorContext.ASK_DURATION
+            );
+
+            if(result instanceof ReadyTransactionReply){
+                ReadyTransactionReply reply = (ReadyTransactionReply) result;
+                cohortPaths.add(reply.getCohortPath());
+            }
+        }
+
+        return new ThreePhaseCommitCohortProxy(actorContext, cohortPaths);
     }
 
     @Override
     public Object getIdentifier() {
-        throw new UnsupportedOperationException("getIdentifier");
+        return this.identifier;
     }
 
     @Override
     public void close() {
-        throw new UnsupportedOperationException("close");
+        for(ActorSelection remoteTransaction : remoteTransactionPaths.values()) {
+            remoteTransaction.tell(new CloseTransaction(), null);
+        }
     }
 
     private ActorSelection remoteTransactionFromIdentifier(InstanceIdentifier path){