BUG 1712 - Distributed DataStore does not work properly with Transaction Chains
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / main / java / org / opendaylight / controller / cluster / datastore / TransactionProxy.java
index a8b20c030e1dd34f274ce2e02b56669739824af3..97a9ff0bf379ef3b8f6568ed37603ed285ba35a5 100644 (file)
@@ -71,6 +71,11 @@ import java.util.concurrent.atomic.AtomicLong;
  * </p>
  */
 public class TransactionProxy implements DOMStoreReadWriteTransaction {
+
+    private final TransactionChainProxy transactionChainProxy;
+
+
+
     public enum TransactionType {
         READ_ONLY,
         WRITE_ONLY,
@@ -177,12 +182,27 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction {
     private boolean inReadyState;
 
     public TransactionProxy(ActorContext actorContext, TransactionType transactionType) {
+        this(actorContext, transactionType, null);
+    }
+
+    @VisibleForTesting
+    List<Future<Object>> getRecordedOperationFutures() {
+        List<Future<Object>> recordedOperationFutures = Lists.newArrayList();
+        for(TransactionContext transactionContext : remoteTransactionPaths.values()) {
+            recordedOperationFutures.addAll(transactionContext.getRecordedOperationFutures());
+        }
+
+        return recordedOperationFutures;
+    }
+
+    public TransactionProxy(ActorContext actorContext, TransactionType transactionType, TransactionChainProxy transactionChainProxy) {
         this.actorContext = Preconditions.checkNotNull(actorContext,
-                "actorContext should not be null");
+            "actorContext should not be null");
         this.transactionType = Preconditions.checkNotNull(transactionType,
-                "transactionType should not be null");
+            "transactionType should not be null");
         this.schemaContext = Preconditions.checkNotNull(actorContext.getSchemaContext(),
-                "schemaContext should not be null");
+            "schemaContext should not be null");
+        this.transactionChainProxy = transactionChainProxy;
 
         String memberName = actorContext.getCurrentMemberName();
         if(memberName == null){
@@ -190,7 +210,7 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction {
         }
 
         this.identifier = TransactionIdentifier.builder().memberName(memberName).counter(
-                counter.getAndIncrement()).build();
+            counter.getAndIncrement()).build();
 
         if(transactionType == TransactionType.READ_ONLY) {
             // Read-only Tx's aren't explicitly closed by the client so we create a PhantomReference
@@ -201,23 +221,13 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction {
             remoteTransactionActorsMB = new AtomicBoolean();
 
             TransactionProxyCleanupPhantomReference cleanup =
-                                              new TransactionProxyCleanupPhantomReference(this);
+                new TransactionProxyCleanupPhantomReference(this);
             phantomReferenceCache.put(cleanup, cleanup);
         }
 
         LOG.debug("Created txn {} of type {}", identifier, transactionType);
     }
 
-    @VisibleForTesting
-    List<Future<Object>> getRecordedOperationFutures() {
-        List<Future<Object>> recordedOperationFutures = Lists.newArrayList();
-        for(TransactionContext transactionContext : remoteTransactionPaths.values()) {
-            recordedOperationFutures.addAll(transactionContext.getRecordedOperationFutures());
-        }
-
-        return recordedOperationFutures;
-    }
-
     @Override
     public CheckedFuture<Optional<NormalizedNode<?, ?>>, ReadFailedException> read(
             final YangInstanceIdentifier path) {
@@ -308,6 +318,10 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction {
             cohortPathFutures.add(transactionContext.readyTransaction());
         }
 
+        if(transactionChainProxy != null){
+            transactionChainProxy.onTransactionReady(cohortPathFutures);
+        }
+
         return new ThreePhaseCommitCohortProxy(actorContext, cohortPathFutures,
                 identifier.toString());
     }
@@ -340,20 +354,27 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction {
         return ShardStrategyFactory.getStrategy(path).findShard(path);
     }
 
-    private void createTransactionIfMissing(ActorContext actorContext, YangInstanceIdentifier path) {
+    private void createTransactionIfMissing(ActorContext actorContext,
+        YangInstanceIdentifier path) {
+
+        if(transactionChainProxy != null){
+            transactionChainProxy.waitTillCurrentTransactionReady();
+        }
+
         String shardName = ShardStrategyFactory.getStrategy(path).findShard(path);
 
         TransactionContext transactionContext =
             remoteTransactionPaths.get(shardName);
 
-        if(transactionContext != null){
+        if (transactionContext != null) {
             // A transaction already exists with that shard
             return;
         }
 
         try {
             Object response = actorContext.executeShardOperation(shardName,
-                new CreateTransaction(identifier.toString(),this.transactionType.ordinal() ).toSerializable());
+                new CreateTransaction(identifier.toString(), this.transactionType.ordinal(),
+                    getTransactionChainId()).toSerializable());
             if (response.getClass().equals(CreateTransactionReply.SERIALIZABLE_CLASS)) {
                 CreateTransactionReply reply =
                     CreateTransactionReply.fromSerializable(response);
@@ -364,7 +385,7 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction {
 
                 ActorSelection transactionActor = actorContext.actorSelection(transactionPath);
 
-                if(transactionType == TransactionType.READ_ONLY) {
+                if (transactionType == TransactionType.READ_ONLY) {
                     // Add the actor to the remoteTransactionActors list for access by the
                     // cleanup PhantonReference.
                     remoteTransactionActors.add(transactionActor);
@@ -375,19 +396,28 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction {
                 }
 
                 transactionContext = new TransactionContextImpl(shardName, transactionPath,
-                        transactionActor, identifier, actorContext, schemaContext);
+                    transactionActor, identifier, actorContext, schemaContext);
 
                 remoteTransactionPaths.put(shardName, transactionContext);
             } else {
                 throw new IllegalArgumentException(String.format(
-                        "Invalid reply type {} for CreateTransaction", response.getClass()));
+                    "Invalid reply type {} for CreateTransaction", response.getClass()));
             }
-        } catch(Exception e){
+        } catch (Exception e) {
             LOG.debug("Tx {} Creating NoOpTransaction because of : {}", identifier, e.getMessage());
-            remoteTransactionPaths.put(shardName, new NoOpTransactionContext(shardName, e, identifier));
+            remoteTransactionPaths
+                .put(shardName, new NoOpTransactionContext(shardName, e, identifier));
         }
     }
 
+    public String getTransactionChainId() {
+        if(transactionChainProxy == null){
+            return "";
+        }
+        return transactionChainProxy.getTransactionChainId();
+    }
+
+
     private interface TransactionContext {
         String getShardName();