Merge "BUG-2184 Fix subtree filtering for identity-ref leaves"
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / main / java / org / opendaylight / controller / cluster / datastore / TransactionProxy.java
index ec198510d3586f88dee40d04f9294a3fb370a9ae..715f48c3492156d1b14005462da2c26aacb1768c 100644 (file)
@@ -157,7 +157,7 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction {
                 for(ActorSelection actor : remoteTransactionActors) {
                     LOG.trace("Sending CloseTransaction to {}", actor);
                     actorContext.sendOperationAsync(actor,
-                            new CloseTransaction().toSerializable());
+                        new CloseTransaction().toSerializable());
                 }
             }
         }
@@ -385,8 +385,8 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction {
             }
 
             Object response = actorContext.executeOperation(primaryShard.get(),
-                    new CreateTransaction(identifier.toString(), this.transactionType.ordinal(),
-                            getTransactionChainId()).toSerializable());
+                new CreateTransaction(identifier.toString(), this.transactionType.ordinal(),
+                    getTransactionChainId()).toSerializable());
             if (response.getClass().equals(CreateTransactionReply.SERIALIZABLE_CLASS)) {
                 CreateTransactionReply reply =
                     CreateTransactionReply.fromSerializable(response);
@@ -408,8 +408,12 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction {
                     remoteTransactionActorsMB.set(true);
                 }
 
+                // TxActor is always created where the leader of the shard is.
+                // Check if TxActor is created in the same node
+                boolean isTxActorLocal = actorContext.isLocalPath(transactionPath);
+
                 transactionContext = new TransactionContextImpl(shardName, transactionPath,
-                    transactionActor, identifier, actorContext, schemaContext);
+                    transactionActor, identifier, actorContext, schemaContext, isTxActorLocal);
 
                 remoteTransactionPaths.put(shardName, transactionContext);
             } else {
@@ -483,15 +487,17 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction {
         private final SchemaContext schemaContext;
         private final String actorPath;
         private final ActorSelection actor;
+        private final boolean isTxActorLocal;
 
         private TransactionContextImpl(String shardName, String actorPath,
                 ActorSelection actor, TransactionIdentifier identifier, ActorContext actorContext,
-                SchemaContext schemaContext) {
+                SchemaContext schemaContext, boolean isTxActorLocal) {
             super(shardName, identifier);
             this.actorPath = actorPath;
             this.actor = actor;
             this.actorContext = actorContext;
             this.schemaContext = schemaContext;
+            this.isTxActorLocal = isTxActorLocal;
         }
 
         private ActorSelection getActor() {
@@ -514,8 +520,9 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction {
             }
             // Send the ReadyTransaction message to the Tx actor.
 
+            ReadyTransaction readyTransaction = new ReadyTransaction();
             final Future<Object> replyFuture = actorContext.executeOperationAsync(getActor(),
-                    new ReadyTransaction().toSerializable());
+                isTxActorLocal ? readyTransaction : readyTransaction.toSerializable());
 
             // Combine all the previously recorded put/merge/delete operation reply Futures and the
             // ReadyTransactionReply Future into one Future. If any one fails then the combined
@@ -549,15 +556,15 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction {
 
                     // Note the Future get call here won't block as it's complete.
                     Object serializedReadyReply = replyFuture.value().get().get();
-                    if(serializedReadyReply.getClass().equals(
-                                                     ReadyTransactionReply.SERIALIZABLE_CLASS)) {
-                        ReadyTransactionReply reply = ReadyTransactionReply.fromSerializable(
-                               serializedReadyReply);
+                    if (serializedReadyReply instanceof ReadyTransactionReply) {
+                        return actorContext.actorSelection(((ReadyTransactionReply)serializedReadyReply).getCohortPath());
 
+                    } else if(serializedReadyReply.getClass().equals(ReadyTransactionReply.SERIALIZABLE_CLASS)) {
+                        ReadyTransactionReply reply = ReadyTransactionReply.fromSerializable(serializedReadyReply);
                         return actorContext.actorSelection(reply.getCohortPath());
+
                     } else {
                         // Throwing an exception here will fail the Future.
-
                         throw new IllegalArgumentException(String.format("Invalid reply type {}",
                                 serializedReadyReply.getClass()));
                     }
@@ -570,8 +577,10 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction {
             if(LOG.isDebugEnabled()) {
                 LOG.debug("Tx {} deleteData called path = {}", identifier, path);
             }
+
+            DeleteData deleteData = new DeleteData(path);
             recordedOperationFutures.add(actorContext.executeOperationAsync(getActor(),
-                    new DeleteData(path).toSerializable()));
+                isTxActorLocal ? deleteData : deleteData.toSerializable()));
         }
 
         @Override
@@ -579,8 +588,10 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction {
             if(LOG.isDebugEnabled()) {
                 LOG.debug("Tx {} mergeData called path = {}", identifier, path);
             }
+
+            MergeData mergeData = new MergeData(path, data, schemaContext);
             recordedOperationFutures.add(actorContext.executeOperationAsync(getActor(),
-                    new MergeData(path, data, schemaContext).toSerializable()));
+                isTxActorLocal ? mergeData : mergeData.toSerializable()));
         }
 
         @Override
@@ -588,8 +599,10 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction {
             if(LOG.isDebugEnabled()) {
                 LOG.debug("Tx {} writeData called path = {}", identifier, path);
             }
+
+            WriteData writeData = new WriteData(path, data, schemaContext);
             recordedOperationFutures.add(actorContext.executeOperationAsync(getActor(),
-                    new WriteData(path, data, schemaContext).toSerializable()));
+                isTxActorLocal ? writeData : writeData.toSerializable()));
         }
 
         @Override
@@ -619,6 +632,7 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction {
                 Future<Iterable<Object>> combinedFutures = akka.dispatch.Futures.sequence(
                         Lists.newArrayList(recordedOperationFutures),
                         actorContext.getActorSystem().dispatcher());
+
                 OnComplete<Iterable<Object>> onComplete = new OnComplete<Iterable<Object>>() {
                     @Override
                     public void onComplete(Throwable failure, Iterable<Object> notUsed)
@@ -663,25 +677,27 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction {
                         if(LOG.isDebugEnabled()) {
                             LOG.debug("Tx {} read operation succeeded", identifier, failure);
                         }
-                        if (readResponse.getClass().equals(ReadDataReply.SERIALIZABLE_CLASS)) {
-                            ReadDataReply reply = ReadDataReply.fromSerializable(schemaContext,
-                                    path, readResponse);
-                            if (reply.getNormalizedNode() == null) {
-                                returnFuture.set(Optional.<NormalizedNode<?, ?>>absent());
-                            } else {
-                                returnFuture.set(Optional.<NormalizedNode<?, ?>>of(
-                                        reply.getNormalizedNode()));
-                            }
+
+                        if (readResponse instanceof ReadDataReply) {
+                            ReadDataReply reply = (ReadDataReply) readResponse;
+                            returnFuture.set(Optional.<NormalizedNode<?, ?>>fromNullable(reply.getNormalizedNode()));
+
+                        } else if (readResponse.getClass().equals(ReadDataReply.SERIALIZABLE_CLASS)) {
+                            ReadDataReply reply = ReadDataReply.fromSerializable(schemaContext, path, readResponse);
+                            returnFuture.set(Optional.<NormalizedNode<?, ?>>fromNullable(reply.getNormalizedNode()));
+
                         } else {
                             returnFuture.setException(new ReadFailedException(
-                                    "Invalid response reading data for path " + path));
+                                "Invalid response reading data for path " + path));
                         }
                     }
                 }
             };
 
+            ReadData readData = new ReadData(path);
             Future<Object> readFuture = actorContext.executeOperationAsync(getActor(),
-                    new ReadData(path).toSerializable());
+                isTxActorLocal ? readData : readData.toSerializable());
+
             readFuture.onComplete(onComplete, actorContext.getActorSystem().dispatcher());
         }
 
@@ -756,9 +772,13 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction {
                         if(LOG.isDebugEnabled()) {
                             LOG.debug("Tx {} dataExists operation succeeded", identifier, failure);
                         }
-                        if (response.getClass().equals(DataExistsReply.SERIALIZABLE_CLASS)) {
-                            returnFuture.set(Boolean.valueOf(DataExistsReply.
-                                        fromSerializable(response).exists()));
+
+                        if (response instanceof DataExistsReply) {
+                            returnFuture.set(Boolean.valueOf(((DataExistsReply) response).exists()));
+
+                        } else if (response.getClass().equals(DataExistsReply.SERIALIZABLE_CLASS)) {
+                            returnFuture.set(Boolean.valueOf(DataExistsReply.fromSerializable(response).exists()));
+
                         } else {
                             returnFuture.setException(new ReadFailedException(
                                     "Invalid response checking exists for path " + path));
@@ -767,8 +787,10 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction {
                 }
             };
 
+            DataExists dataExists = new DataExists(path);
             Future<Object> future = actorContext.executeOperationAsync(getActor(),
-                    new DataExists(path).toSerializable());
+                isTxActorLocal ? dataExists : dataExists.toSerializable());
+
             future.onComplete(onComplete, actorContext.getActorSystem().dispatcher());
         }
     }