Merge "Bug 2265: Use new NormalizedNode streaming in messages"
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / main / java / org / opendaylight / controller / cluster / datastore / TransactionProxy.java
index 7703f484c73e687391ff5dd9ffe1739afd201c0f..f34e88fb279c0262516dfb9fa6cdd8672c65003c 100644 (file)
@@ -43,6 +43,7 @@ import org.opendaylight.controller.cluster.datastore.messages.ReadDataReply;
 import org.opendaylight.controller.cluster.datastore.messages.ReadyTransaction;
 import org.opendaylight.controller.cluster.datastore.messages.ReadyTransactionReply;
 import org.opendaylight.controller.cluster.datastore.messages.SerializableMessage;
+import org.opendaylight.controller.cluster.datastore.messages.VersionedSerializableMessage;
 import org.opendaylight.controller.cluster.datastore.messages.WriteData;
 import org.opendaylight.controller.cluster.datastore.shardstrategy.ShardStrategyFactory;
 import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
@@ -157,8 +158,7 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction {
             if(remoteTransactionActorsMB.get()) {
                 for(ActorSelection actor : remoteTransactionActors) {
                     LOG.trace("Sending CloseTransaction to {}", actor);
-                    actorContext.sendOperationAsync(actor,
-                            new CloseTransaction().toSerializable());
+                    actorContext.sendOperationAsync(actor, CloseTransaction.INSTANCE.toSerializable());
                 }
             }
         }
@@ -617,10 +617,6 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction {
             }
         }
 
-
-
-
-
         /**
          * Performs a CreateTransaction try async.
          */
@@ -763,11 +759,11 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction {
         private final String transactionPath;
         private final ActorSelection actor;
         private final boolean isTxActorLocal;
-        private final int remoteTransactionVersion;
+        private final short remoteTransactionVersion;
 
         private TransactionContextImpl(String transactionPath, ActorSelection actor, TransactionIdentifier identifier,
                 ActorContext actorContext, SchemaContext schemaContext,
-                boolean isTxActorLocal, int remoteTransactionVersion) {
+                boolean isTxActorLocal, short remoteTransactionVersion) {
             super(identifier);
             this.transactionPath = transactionPath;
             this.actor = actor;
@@ -785,11 +781,16 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction {
             return actorContext.executeOperationAsync(getActor(), isTxActorLocal ? msg : msg.toSerializable());
         }
 
+        private Future<Object> executeOperationAsync(VersionedSerializableMessage msg) {
+            return actorContext.executeOperationAsync(getActor(), isTxActorLocal ? msg :
+                msg.toSerializable(remoteTransactionVersion));
+        }
+
         @Override
         public void closeTransaction() {
             LOG.debug("Tx {} closeTransaction called", identifier);
 
-            actorContext.sendOperationAsync(getActor(), new CloseTransaction().toSerializable());
+            actorContext.sendOperationAsync(getActor(), CloseTransaction.INSTANCE.toSerializable());
         }
 
         @Override
@@ -799,7 +800,7 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction {
 
             // Send the ReadyTransaction message to the Tx actor.
 
-            final Future<Object> replyFuture = executeOperationAsync(new ReadyTransaction());
+            final Future<Object> replyFuture = executeOperationAsync(ReadyTransaction.INSTANCE);
 
             // Combine all the previously recorded put/merge/delete operation reply Futures and the
             // ReadyTransactionReply Future into one Future. If any one fails then the combined
@@ -846,7 +847,8 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction {
                         // At some point in the future when upgrades from Helium are not supported
                         // we could remove this code to resolvePath and just use the cohortPath as the
                         // resolved cohortPath
-                        if(TransactionContextImpl.this.remoteTransactionVersion < CreateTransaction.HELIUM_1_VERSION) {
+                        if(TransactionContextImpl.this.remoteTransactionVersion <
+                                DataStoreVersions.HELIUM_1_VERSION) {
                             cohortPath = actorContext.resolvePath(transactionPath, cohortPath);
                         }
 
@@ -872,14 +874,14 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction {
         public void mergeData(YangInstanceIdentifier path, NormalizedNode<?, ?> data) {
             LOG.debug("Tx {} mergeData called path = {}", identifier, path);
 
-            recordedOperationFutures.add(executeOperationAsync(new MergeData(path, data, schemaContext)));
+            recordedOperationFutures.add(executeOperationAsync(new MergeData(path, data)));
         }
 
         @Override
         public void writeData(YangInstanceIdentifier path, NormalizedNode<?, ?> data) {
             LOG.debug("Tx {} writeData called path = {}", identifier, path);
 
-            recordedOperationFutures.add(executeOperationAsync(new WriteData(path, data, schemaContext)));
+            recordedOperationFutures.add(executeOperationAsync(new WriteData(path, data)));
         }
 
         @Override
@@ -950,8 +952,8 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction {
                             ReadDataReply reply = (ReadDataReply) readResponse;
                             returnFuture.set(Optional.<NormalizedNode<?, ?>>fromNullable(reply.getNormalizedNode()));
 
-                        } else if (readResponse.getClass().equals(ReadDataReply.SERIALIZABLE_CLASS)) {
-                            ReadDataReply reply = ReadDataReply.fromSerializable(schemaContext, path, readResponse);
+                        } else if (ReadDataReply.isSerializedType(readResponse)) {
+                            ReadDataReply reply = ReadDataReply.fromSerializable(readResponse);
                             returnFuture.set(Optional.<NormalizedNode<?, ?>>fromNullable(reply.getNormalizedNode()));
 
                         } else {