BUG 2296 : TransactionProxy should support the ability to accept a local TPC actor... 51/12451/1
authorMoiz Raja <moraja@cisco.com>
Sun, 2 Nov 2014 16:14:03 +0000 (08:14 -0800)
committerTom Pantelis <tpanteli@brocade.com>
Mon, 3 Nov 2014 00:39:07 +0000 (00:39 +0000)
In Helium when the ShardTransaction processes a Ready message it sends back a the path of a ThreePhaseCommitCohort actor in the ReadyReply. This path is actually a local actor path, this local actor path is then converted into a remote path by the TransactionProxy.

The fix for Bug 1607 breaks this capability which is required to support forward compatibility in a cluster where a transaction request originates in a node that has been upgraded to Helium-1 and the actual transaction is happening on a node which has not yet been upgraded to Helium-1.

Change-Id: I857384bdd61b3492ea270dcf04d14883811c37c2
Signed-off-by: Moiz Raja <moraja@cisco.com>
14 files changed:
opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/protobuff/messages/transaction/ShardTransactionMessages.java
opendaylight/md-sal/sal-clustering-commons/src/main/resources/ShardTransaction.proto
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardTransactionChain.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionProxy.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/CreateTransaction.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/CreateTransactionReply.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/ReadyTransactionReply.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/utils/ActorContext.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTransactionFailureTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTransactionTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/TransactionProxyTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/utils/ActorContextTest.java

index 3cd290c..3a1cfaa 100644 (file)
@@ -1463,6 +1463,16 @@ public final class ShardTransactionMessages {
      */
     com.google.protobuf.ByteString
         getTransactionIdBytes();
+
+    // optional int32 messageVersion = 3;
+    /**
+     * <code>optional int32 messageVersion = 3;</code>
+     */
+    boolean hasMessageVersion();
+    /**
+     * <code>optional int32 messageVersion = 3;</code>
+     */
+    int getMessageVersion();
   }
   /**
    * Protobuf type {@code org.opendaylight.controller.mdsal.CreateTransactionReply}
@@ -1525,6 +1535,11 @@ public final class ShardTransactionMessages {
               transactionId_ = input.readBytes();
               break;
             }
+            case 24: {
+              bitField0_ |= 0x00000004;
+              messageVersion_ = input.readInt32();
+              break;
+            }
           }
         }
       } catch (com.google.protobuf.InvalidProtocolBufferException e) {
@@ -1651,9 +1666,26 @@ public final class ShardTransactionMessages {
       }
     }
 
+    // optional int32 messageVersion = 3;
+    public static final int MESSAGEVERSION_FIELD_NUMBER = 3;
+    private int messageVersion_;
+    /**
+     * <code>optional int32 messageVersion = 3;</code>
+     */
+    public boolean hasMessageVersion() {
+      return ((bitField0_ & 0x00000004) == 0x00000004);
+    }
+    /**
+     * <code>optional int32 messageVersion = 3;</code>
+     */
+    public int getMessageVersion() {
+      return messageVersion_;
+    }
+
     private void initFields() {
       transactionActorPath_ = "";
       transactionId_ = "";
+      messageVersion_ = 0;
     }
     private byte memoizedIsInitialized = -1;
     public final boolean isInitialized() {
@@ -1681,6 +1713,9 @@ public final class ShardTransactionMessages {
       if (((bitField0_ & 0x00000002) == 0x00000002)) {
         output.writeBytes(2, getTransactionIdBytes());
       }
+      if (((bitField0_ & 0x00000004) == 0x00000004)) {
+        output.writeInt32(3, messageVersion_);
+      }
       getUnknownFields().writeTo(output);
     }
 
@@ -1698,6 +1733,10 @@ public final class ShardTransactionMessages {
         size += com.google.protobuf.CodedOutputStream
           .computeBytesSize(2, getTransactionIdBytes());
       }
+      if (((bitField0_ & 0x00000004) == 0x00000004)) {
+        size += com.google.protobuf.CodedOutputStream
+          .computeInt32Size(3, messageVersion_);
+      }
       size += getUnknownFields().getSerializedSize();
       memoizedSerializedSize = size;
       return size;
@@ -1818,6 +1857,8 @@ public final class ShardTransactionMessages {
         bitField0_ = (bitField0_ & ~0x00000001);
         transactionId_ = "";
         bitField0_ = (bitField0_ & ~0x00000002);
+        messageVersion_ = 0;
+        bitField0_ = (bitField0_ & ~0x00000004);
         return this;
       }
 
@@ -1854,6 +1895,10 @@ public final class ShardTransactionMessages {
           to_bitField0_ |= 0x00000002;
         }
         result.transactionId_ = transactionId_;
+        if (((from_bitField0_ & 0x00000004) == 0x00000004)) {
+          to_bitField0_ |= 0x00000004;
+        }
+        result.messageVersion_ = messageVersion_;
         result.bitField0_ = to_bitField0_;
         onBuilt();
         return result;
@@ -1880,6 +1925,9 @@ public final class ShardTransactionMessages {
           transactionId_ = other.transactionId_;
           onChanged();
         }
+        if (other.hasMessageVersion()) {
+          setMessageVersion(other.getMessageVersion());
+        }
         this.mergeUnknownFields(other.getUnknownFields());
         return this;
       }
@@ -2063,6 +2111,39 @@ public final class ShardTransactionMessages {
         return this;
       }
 
+      // optional int32 messageVersion = 3;
+      private int messageVersion_ ;
+      /**
+       * <code>optional int32 messageVersion = 3;</code>
+       */
+      public boolean hasMessageVersion() {
+        return ((bitField0_ & 0x00000004) == 0x00000004);
+      }
+      /**
+       * <code>optional int32 messageVersion = 3;</code>
+       */
+      public int getMessageVersion() {
+        return messageVersion_;
+      }
+      /**
+       * <code>optional int32 messageVersion = 3;</code>
+       */
+      public Builder setMessageVersion(int value) {
+        bitField0_ |= 0x00000004;
+        messageVersion_ = value;
+        onChanged();
+        return this;
+      }
+      /**
+       * <code>optional int32 messageVersion = 3;</code>
+       */
+      public Builder clearMessageVersion() {
+        bitField0_ = (bitField0_ & ~0x00000004);
+        messageVersion_ = 0;
+        onChanged();
+        return this;
+      }
+
       // @@protoc_insertion_point(builder_scope:org.opendaylight.controller.mdsal.CreateTransactionReply)
     }
 
@@ -7838,33 +7919,34 @@ public final class ShardTransactionMessages {
       "\n\021CreateTransaction\022\025\n\rtransactionId\030\001 \002" +
       "(\t\022\027\n\017transactionType\030\002 \002(\005\022\032\n\022transacti" +
       "onChainId\030\003 \001(\t\022\026\n\016messageVersion\030\004 \001(\005\"" +
-      "M\n\026CreateTransactionReply\022\034\n\024transaction" +
-      "ActorPath\030\001 \002(\t\022\025\n\rtransactionId\030\002 \002(\t\"\022" +
-      "\n\020ReadyTransaction\"*\n\025ReadyTransactionRe" +
-      "ply\022\021\n\tactorPath\030\001 \002(\t\"l\n\nDeleteData\022^\n\037",
-      "instanceIdentifierPathArguments\030\001 \002(\01325." +
-      "org.opendaylight.controller.mdsal.Instan" +
-      "ceIdentifier\"\021\n\017DeleteDataReply\"j\n\010ReadD" +
-      "ata\022^\n\037instanceIdentifierPathArguments\030\001" +
-      " \002(\01325.org.opendaylight.controller.mdsal" +
-      ".InstanceIdentifier\"P\n\rReadDataReply\022?\n\016" +
-      "normalizedNode\030\001 \001(\0132\'.org.opendaylight." +
-      "controller.mdsal.Node\"\254\001\n\tWriteData\022^\n\037i" +
-      "nstanceIdentifierPathArguments\030\001 \002(\01325.o" +
-      "rg.opendaylight.controller.mdsal.Instanc",
-      "eIdentifier\022?\n\016normalizedNode\030\002 \002(\0132\'.or" +
-      "g.opendaylight.controller.mdsal.Node\"\020\n\016" +
-      "WriteDataReply\"\254\001\n\tMergeData\022^\n\037instance" +
-      "IdentifierPathArguments\030\001 \002(\01325.org.open" +
-      "daylight.controller.mdsal.InstanceIdenti" +
-      "fier\022?\n\016normalizedNode\030\002 \002(\0132\'.org.opend" +
-      "aylight.controller.mdsal.Node\"\020\n\016MergeDa" +
-      "taReply\"l\n\nDataExists\022^\n\037instanceIdentif" +
-      "ierPathArguments\030\001 \002(\01325.org.opendayligh" +
-      "t.controller.mdsal.InstanceIdentifier\"!\n",
-      "\017DataExistsReply\022\016\n\006exists\030\001 \002(\010BV\n:org." +
-      "opendaylight.controller.protobuff.messag" +
-      "es.transactionB\030ShardTransactionMessages"
+      "e\n\026CreateTransactionReply\022\034\n\024transaction" +
+      "ActorPath\030\001 \002(\t\022\025\n\rtransactionId\030\002 \002(\t\022\026" +
+      "\n\016messageVersion\030\003 \001(\005\"\022\n\020ReadyTransacti" +
+      "on\"*\n\025ReadyTransactionReply\022\021\n\tactorPath",
+      "\030\001 \002(\t\"l\n\nDeleteData\022^\n\037instanceIdentifi" +
+      "erPathArguments\030\001 \002(\01325.org.opendaylight" +
+      ".controller.mdsal.InstanceIdentifier\"\021\n\017" +
+      "DeleteDataReply\"j\n\010ReadData\022^\n\037instanceI" +
+      "dentifierPathArguments\030\001 \002(\01325.org.opend" +
+      "aylight.controller.mdsal.InstanceIdentif" +
+      "ier\"P\n\rReadDataReply\022?\n\016normalizedNode\030\001" +
+      " \001(\0132\'.org.opendaylight.controller.mdsal" +
+      ".Node\"\254\001\n\tWriteData\022^\n\037instanceIdentifie" +
+      "rPathArguments\030\001 \002(\01325.org.opendaylight.",
+      "controller.mdsal.InstanceIdentifier\022?\n\016n" +
+      "ormalizedNode\030\002 \002(\0132\'.org.opendaylight.c" +
+      "ontroller.mdsal.Node\"\020\n\016WriteDataReply\"\254" +
+      "\001\n\tMergeData\022^\n\037instanceIdentifierPathAr" +
+      "guments\030\001 \002(\01325.org.opendaylight.control" +
+      "ler.mdsal.InstanceIdentifier\022?\n\016normaliz" +
+      "edNode\030\002 \002(\0132\'.org.opendaylight.controll" +
+      "er.mdsal.Node\"\020\n\016MergeDataReply\"l\n\nDataE" +
+      "xists\022^\n\037instanceIdentifierPathArguments" +
+      "\030\001 \002(\01325.org.opendaylight.controller.mds",
+      "al.InstanceIdentifier\"!\n\017DataExistsReply" +
+      "\022\016\n\006exists\030\001 \002(\010BV\n:org.opendaylight.con" +
+      "troller.protobuff.messages.transactionB\030" +
+      "ShardTransactionMessages"
     };
     com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner assigner =
       new com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner() {
@@ -7894,7 +7976,7 @@ public final class ShardTransactionMessages {
           internal_static_org_opendaylight_controller_mdsal_CreateTransactionReply_fieldAccessorTable = new
             com.google.protobuf.GeneratedMessage.FieldAccessorTable(
               internal_static_org_opendaylight_controller_mdsal_CreateTransactionReply_descriptor,
-              new java.lang.String[] { "TransactionActorPath", "TransactionId", });
+              new java.lang.String[] { "TransactionActorPath", "TransactionId", "MessageVersion", });
           internal_static_org_opendaylight_controller_mdsal_ReadyTransaction_descriptor =
             getDescriptor().getMessageTypes().get(4);
           internal_static_org_opendaylight_controller_mdsal_ReadyTransaction_fieldAccessorTable = new
index cd1132d..c5e4ee4 100644 (file)
@@ -20,9 +20,9 @@ message CreateTransaction{
 }
 
 message CreateTransactionReply{
-required string transactionActorPath = 1;
-required string transactionId = 2;
-
+  required string transactionActorPath = 1;
+  required string transactionId = 2;
+  optional int32 messageVersion = 3;
 }
 
 message ReadyTransaction{
@@ -30,7 +30,7 @@ message ReadyTransaction{
 }
 
 message ReadyTransactionReply{
-required string actorPath = 1;
+  required string actorPath = 1;
 }
 
 message DeleteData {
index 1bf32e7..5ea9b30 100644 (file)
@@ -8,13 +8,25 @@
 
 package org.opendaylight.controller.cluster.datastore;
 
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.TimeUnit;
-import javax.annotation.Nonnull;
+import akka.actor.ActorRef;
+import akka.actor.ActorSelection;
+import akka.actor.Cancellable;
+import akka.actor.PoisonPill;
+import akka.actor.Props;
+import akka.event.Logging;
+import akka.event.LoggingAdapter;
+import akka.japi.Creator;
+import akka.persistence.RecoveryFailure;
+import akka.serialization.Serialization;
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Optional;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+import com.google.common.util.concurrent.FutureCallback;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.protobuf.ByteString;
+import com.google.protobuf.InvalidProtocolBufferException;
 import org.opendaylight.controller.cluster.DataPersistenceProvider;
 import org.opendaylight.controller.cluster.common.actor.CommonConfig;
 import org.opendaylight.controller.cluster.common.actor.MeteringBehavior;
@@ -65,25 +77,14 @@ import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
 import scala.concurrent.duration.Duration;
 import scala.concurrent.duration.FiniteDuration;
-import akka.actor.ActorRef;
-import akka.actor.ActorSelection;
-import akka.actor.Cancellable;
-import akka.actor.PoisonPill;
-import akka.actor.Props;
-import akka.event.Logging;
-import akka.event.LoggingAdapter;
-import akka.japi.Creator;
-import akka.persistence.RecoveryFailure;
-import akka.serialization.Serialization;
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Optional;
-import com.google.common.base.Preconditions;
-import com.google.common.collect.Lists;
-import com.google.common.util.concurrent.FutureCallback;
-import com.google.common.util.concurrent.Futures;
-import com.google.common.util.concurrent.ListenableFuture;
-import com.google.protobuf.ByteString;
-import com.google.protobuf.InvalidProtocolBufferException;
+
+import javax.annotation.Nonnull;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
 
 /**
  * A Shard represents a portion of the logical data tree <br/>
@@ -93,8 +94,6 @@ import com.google.protobuf.InvalidProtocolBufferException;
  */
 public class Shard extends RaftActor {
 
-    private static final int HELIUM_1_TX_VERSION = 1;
-
     private static final Object COMMIT_TRANSACTION_REPLY = new CommitTransactionReply().toSerializable();
 
     private static final Object TX_COMMIT_TIMEOUT_CHECK_MESSAGE = "txCommitTimeoutCheck";
@@ -391,7 +390,7 @@ public class Shard extends RaftActor {
         // transactionId so to maintain backwards compatibility, we create a separate cohort actor
         // to provide the compatible behavior.
         ActorRef replyActorPath = self();
-        if(ready.getTxnClientVersion() < HELIUM_1_TX_VERSION) {
+        if(ready.getTxnClientVersion() < CreateTransaction.HELIUM_1_VERSION) {
             LOG.debug("Creating BackwardsCompatibleThreePhaseCommitCohort");
             replyActorPath = getContext().actorOf(BackwardsCompatibleThreePhaseCommitCohort.props(
                     ready.getTransactionID()));
@@ -536,7 +535,7 @@ public class Shard extends RaftActor {
     private void createTransaction(CreateTransaction createTransaction) {
         createTransaction(createTransaction.getTransactionType(),
             createTransaction.getTransactionId(), createTransaction.getTransactionChainId(),
-            createTransaction.getClientVersion());
+            createTransaction.getVersion());
     }
 
     private ActorRef createTransaction(int transactionType, String remoteTransactionId,
@@ -615,7 +614,7 @@ public class Shard extends RaftActor {
         LOG.debug("registerDataChangeListener sending reply, listenerRegistrationPath = {} ",
                     listenerRegistration.path());
 
-        getSender().tell(new RegisterChangeListenerReply(listenerRegistration.path()),getSelf());
+        getSender().tell(new RegisterChangeListenerReply(listenerRegistration.path()), getSelf());
     }
 
     private ListenerRegistration<AsyncDataChangeListener<YangInstanceIdentifier,
@@ -782,7 +781,7 @@ public class Shard extends RaftActor {
             createSnapshotTransaction = createTransaction(
                 TransactionProxy.TransactionType.READ_ONLY.ordinal(),
                 "createSnapshot" + ++createSnapshotTransactionCounter, "",
-                CreateTransaction.CURRENT_CLIENT_VERSION);
+                CreateTransaction.CURRENT_VERSION);
 
             createSnapshotTransaction.tell(
                 new ReadData(YangInstanceIdentifier.builder().build()).toSerializable(), self());
index 4bf184e..78c6a55 100644 (file)
@@ -64,21 +64,21 @@ public class ShardTransactionChain extends AbstractUntypedActor {
                     ShardTransaction.props( chain.newReadOnlyTransaction(), getShardActor(),
                             schemaContext, datastoreContext, shardStats,
                             createTransaction.getTransactionId(),
-                            createTransaction.getClientVersion()), transactionName);
+                            createTransaction.getVersion()), transactionName);
         } else if (createTransaction.getTransactionType() ==
                 TransactionProxy.TransactionType.READ_WRITE.ordinal()) {
             return getContext().actorOf(
                     ShardTransaction.props( chain.newReadWriteTransaction(), getShardActor(),
                             schemaContext, datastoreContext, shardStats,
                             createTransaction.getTransactionId(),
-                            createTransaction.getClientVersion()), transactionName);
+                            createTransaction.getVersion()), transactionName);
         } else if (createTransaction.getTransactionType() ==
                 TransactionProxy.TransactionType.WRITE_ONLY.ordinal()) {
             return getContext().actorOf(
                     ShardTransaction.props( chain.newWriteOnlyTransaction(), getShardActor(),
                             schemaContext, datastoreContext, shardStats,
                             createTransaction.getTransactionId(),
-                            createTransaction.getClientVersion()), transactionName);
+                            createTransaction.getVersion()), transactionName);
         } else {
             throw new IllegalArgumentException (
                     "CreateTransaction message has unidentified transaction type=" +
index 239207a..ffb1ab7 100644 (file)
@@ -49,6 +49,8 @@ import org.slf4j.LoggerFactory;
 import scala.concurrent.Future;
 import scala.concurrent.Promise;
 import scala.concurrent.duration.FiniteDuration;
+
+import javax.annotation.concurrent.GuardedBy;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -56,7 +58,6 @@ import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicLong;
-import javax.annotation.concurrent.GuardedBy;
 
 /**
  * TransactionProxy acts as a proxy for one or more transactions that were created on a remote shard
@@ -668,8 +669,8 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction {
             // Check if TxActor is created in the same node
             boolean isTxActorLocal = actorContext.isLocalPath(transactionPath);
 
-            transactionContext = new TransactionContextImpl(transactionActor, identifier,
-                actorContext, schemaContext, isTxActorLocal);
+            transactionContext = new TransactionContextImpl(transactionPath, transactionActor, identifier,
+                actorContext, schemaContext, isTxActorLocal, reply.getVersion());
         }
     }
 
@@ -712,17 +713,21 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction {
 
         private final ActorContext actorContext;
         private final SchemaContext schemaContext;
+        private final String transactionPath;
         private final ActorSelection actor;
         private final boolean isTxActorLocal;
+        private final int remoteTransactionVersion;
 
-        private TransactionContextImpl(ActorSelection actor, TransactionIdentifier identifier,
+        private TransactionContextImpl(String transactionPath, ActorSelection actor, TransactionIdentifier identifier,
                 ActorContext actorContext, SchemaContext schemaContext,
-                boolean isTxActorLocal) {
+                boolean isTxActorLocal, int remoteTransactionVersion) {
             super(identifier);
+            this.transactionPath = transactionPath;
             this.actor = actor;
             this.actorContext = actorContext;
             this.schemaContext = schemaContext;
             this.isTxActorLocal = isTxActorLocal;
+            this.remoteTransactionVersion = remoteTransactionVersion;
         }
 
         private ActorSelection getActor() {
@@ -783,7 +788,20 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction {
 
                     } else if(serializedReadyReply.getClass().equals(ReadyTransactionReply.SERIALIZABLE_CLASS)) {
                         ReadyTransactionReply reply = ReadyTransactionReply.fromSerializable(serializedReadyReply);
-                        return actorContext.actorSelection(reply.getCohortPath());
+                        String cohortPath = reply.getCohortPath();
+
+                        // In Helium we used to return the local path of the actor which represented
+                        // a remote ThreePhaseCommitCohort. The local path would then be converted to
+                        // a remote path using this resolvePath method. To maintain compatibility with
+                        // a Helium node we need to continue to do this conversion.
+                        // At some point in the future when upgrades from Helium are not supported
+                        // we could remove this code to resolvePath and just use the cohortPath as the
+                        // resolved cohortPath
+                        if(TransactionContextImpl.this.remoteTransactionVersion < CreateTransaction.HELIUM_1_VERSION) {
+                            cohortPath = actorContext.resolvePath(transactionPath, cohortPath);
+                        }
+
+                        return actorContext.actorSelection(cohortPath);
 
                     } else {
                         // Throwing an exception here will fail the Future.
index cbb881f..bf82e66 100644 (file)
@@ -16,27 +16,28 @@ public class CreateTransaction implements SerializableMessage {
     public static final Class<ShardTransactionMessages.CreateTransaction> SERIALIZABLE_CLASS =
             ShardTransactionMessages.CreateTransaction.class;
 
-    public static final int CURRENT_CLIENT_VERSION = 1;
+    public static final int HELIUM_1_VERSION = 1;
+    public static final int CURRENT_VERSION = HELIUM_1_VERSION;
 
     private final String transactionId;
     private final int transactionType;
     private final String transactionChainId;
-    private final int clientVersion;
+    private final int version;
 
     public CreateTransaction(String transactionId, int transactionType) {
         this(transactionId, transactionType, "");
     }
 
     public CreateTransaction(String transactionId, int transactionType, String transactionChainId) {
-        this(transactionId, transactionType, transactionChainId, CURRENT_CLIENT_VERSION);
+        this(transactionId, transactionType, transactionChainId, CURRENT_VERSION);
     }
 
     private CreateTransaction(String transactionId, int transactionType, String transactionChainId,
-            int clientVersion) {
+            int version) {
         this.transactionId = transactionId;
         this.transactionType = transactionType;
         this.transactionChainId = transactionChainId;
-        this.clientVersion = clientVersion;
+        this.version = version;
     }
 
     public String getTransactionId() {
@@ -47,8 +48,8 @@ public class CreateTransaction implements SerializableMessage {
         return transactionType;
     }
 
-    public int getClientVersion() {
-        return clientVersion;
+    public int getVersion() {
+        return version;
     }
 
     @Override
@@ -57,7 +58,7 @@ public class CreateTransaction implements SerializableMessage {
             .setTransactionId(transactionId)
             .setTransactionType(transactionType)
             .setTransactionChainId(transactionChainId)
-            .setMessageVersion(clientVersion).build();
+            .setMessageVersion(version).build();
     }
 
     public static CreateTransaction fromSerializable(Object message) {
index 096d131..14620f1 100644 (file)
@@ -15,13 +15,21 @@ public class CreateTransactionReply implements SerializableMessage {
     public static final Class SERIALIZABLE_CLASS = ShardTransactionMessages.CreateTransactionReply.class;
     private final String transactionPath;
     private final String transactionId;
+    private final int version;
 
     public CreateTransactionReply(String transactionPath,
         String transactionId) {
+        this(transactionPath, transactionId, CreateTransaction.CURRENT_VERSION);
+    }
+
+    public CreateTransactionReply(String transactionPath,
+                                  String transactionId, int version) {
         this.transactionPath = transactionPath;
         this.transactionId = transactionId;
+        this.version = version;
     }
 
+
     public String getTransactionPath() {
         return transactionPath;
     }
@@ -30,16 +38,21 @@ public class CreateTransactionReply implements SerializableMessage {
         return transactionId;
     }
 
+    public int getVersion() {
+        return version;
+    }
+
     public Object toSerializable(){
         return ShardTransactionMessages.CreateTransactionReply.newBuilder()
             .setTransactionActorPath(transactionPath)
             .setTransactionId(transactionId)
+            .setMessageVersion(version)
             .build();
     }
 
     public static CreateTransactionReply fromSerializable(Object serializable){
         ShardTransactionMessages.CreateTransactionReply o = (ShardTransactionMessages.CreateTransactionReply) serializable;
-        return new CreateTransactionReply(o.getTransactionActorPath(), o.getTransactionId());
+        return new CreateTransactionReply(o.getTransactionActorPath(), o.getTransactionId(), o.getMessageVersion());
     }
 
 }
index eee4891..282e23e 100644 (file)
@@ -17,7 +17,6 @@ public class ReadyTransactionReply implements SerializableMessage {
     private final String cohortPath;
 
     public ReadyTransactionReply(String cohortPath) {
-
         this.cohortPath = cohortPath;
     }
 
@@ -27,8 +26,9 @@ public class ReadyTransactionReply implements SerializableMessage {
 
     @Override
     public ShardTransactionMessages.ReadyTransactionReply toSerializable() {
-        return ShardTransactionMessages.ReadyTransactionReply.newBuilder().
-                setActorPath(cohortPath).build();
+        return ShardTransactionMessages.ReadyTransactionReply.newBuilder()
+                .setActorPath(cohortPath)
+                .build();
     }
 
     public static ReadyTransactionReply fromSerializable(Object serializable) {
index e409168..904dcdf 100644 (file)
@@ -395,4 +395,28 @@ public class ActorContext {
 
         return hostPort1.equals(hostPort2);
     }
+
+    /**
+     * @deprecated This method is present only to support backward compatibility with Helium and should not be
+     * used any further
+     *
+     *
+     * @param primaryPath
+     * @param localPathOfRemoteActor
+     * @return
+    */
+    @Deprecated
+    public String resolvePath(final String primaryPath,
+                                            final String localPathOfRemoteActor) {
+        StringBuilder builder = new StringBuilder();
+        String[] primaryPathElements = primaryPath.split("/");
+        builder.append(primaryPathElements[0]).append("//")
+            .append(primaryPathElements[1]).append(primaryPathElements[2]);
+        String[] remotePathElements = localPathOfRemoteActor.split("/");
+        for (int i = 3; i < remotePathElements.length; i++) {
+                builder.append("/").append(remotePathElements[i]);
+            }
+
+        return builder.toString();
+    }
 }
index 557a75a..c6bb0d6 100644 (file)
@@ -1,26 +1,21 @@
 package org.opendaylight.controller.cluster.datastore;
 
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertNull;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
-import static org.mockito.Mockito.doAnswer;
-import static org.mockito.Mockito.doReturn;
-import static org.mockito.Mockito.inOrder;
-import static org.mockito.Mockito.mock;
-import static org.opendaylight.controller.cluster.datastore.messages.CreateTransaction.CURRENT_CLIENT_VERSION;
-import java.io.IOException;
-import java.util.Collections;
-import java.util.HashSet;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.atomic.AtomicReference;
+import akka.actor.ActorRef;
+import akka.actor.PoisonPill;
+import akka.actor.Props;
+import akka.dispatch.Dispatchers;
+import akka.dispatch.OnComplete;
+import akka.japi.Creator;
+import akka.pattern.Patterns;
+import akka.testkit.TestActorRef;
+import akka.util.Timeout;
+import com.google.common.base.Function;
+import com.google.common.base.Optional;
+import com.google.common.util.concurrent.CheckedFuture;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.MoreExecutors;
+import com.google.common.util.concurrent.Uninterruptibles;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
@@ -84,22 +79,29 @@ import org.opendaylight.yangtools.yang.model.api.SchemaContext;
 import scala.concurrent.Await;
 import scala.concurrent.Future;
 import scala.concurrent.duration.FiniteDuration;
-import akka.actor.ActorRef;
-import akka.actor.PoisonPill;
-import akka.actor.Props;
-import akka.dispatch.Dispatchers;
-import akka.dispatch.OnComplete;
-import akka.japi.Creator;
-import akka.pattern.Patterns;
-import akka.testkit.TestActorRef;
-import akka.util.Timeout;
-import com.google.common.base.Function;
-import com.google.common.base.Optional;
-import com.google.common.util.concurrent.CheckedFuture;
-import com.google.common.util.concurrent.Futures;
-import com.google.common.util.concurrent.ListenableFuture;
-import com.google.common.util.concurrent.MoreExecutors;
-import com.google.common.util.concurrent.Uninterruptibles;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.inOrder;
+import static org.mockito.Mockito.mock;
+import static org.opendaylight.controller.cluster.datastore.messages.CreateTransaction.CURRENT_VERSION;
 
 
 public class ShardTest extends AbstractActorTest {
@@ -610,7 +612,7 @@ public class ShardTest extends AbstractActorTest {
             // Simulate the ForwardedReadyTransaction message for the first Tx that would be sent
             // by the ShardTransaction.
 
-            shard.tell(new ForwardedReadyTransaction(transactionID1, CURRENT_CLIENT_VERSION,
+            shard.tell(new ForwardedReadyTransaction(transactionID1, CURRENT_VERSION,
                     cohort1, modification1, true), getRef());
             ReadyTransactionReply readyReply = ReadyTransactionReply.fromSerializable(
                     expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS));
@@ -625,11 +627,11 @@ public class ShardTest extends AbstractActorTest {
 
             // Send the ForwardedReadyTransaction for the next 2 Tx's.
 
-            shard.tell(new ForwardedReadyTransaction(transactionID2, CURRENT_CLIENT_VERSION,
+            shard.tell(new ForwardedReadyTransaction(transactionID2, CURRENT_VERSION,
                     cohort2, modification2, true), getRef());
             expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS);
 
-            shard.tell(new ForwardedReadyTransaction(transactionID3, CURRENT_CLIENT_VERSION,
+            shard.tell(new ForwardedReadyTransaction(transactionID3, CURRENT_VERSION,
                     cohort3, modification3, true), getRef());
             expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS);
 
@@ -794,11 +796,11 @@ public class ShardTest extends AbstractActorTest {
             // Simulate the ForwardedReadyTransaction messages that would be sent
             // by the ShardTransaction.
 
-            shard.tell(new ForwardedReadyTransaction(transactionID1, CURRENT_CLIENT_VERSION,
+            shard.tell(new ForwardedReadyTransaction(transactionID1, CURRENT_VERSION,
                     cohort1, modification1, true), getRef());
             expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS);
 
-            shard.tell(new ForwardedReadyTransaction(transactionID2, CURRENT_CLIENT_VERSION,
+            shard.tell(new ForwardedReadyTransaction(transactionID2, CURRENT_VERSION,
                     cohort2, modification2, true), getRef());
             expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS);
 
@@ -863,7 +865,7 @@ public class ShardTest extends AbstractActorTest {
             // Simulate the ForwardedReadyTransaction messages that would be sent
             // by the ShardTransaction.
 
-            shard.tell(new ForwardedReadyTransaction(transactionID, CURRENT_CLIENT_VERSION,
+            shard.tell(new ForwardedReadyTransaction(transactionID, CURRENT_VERSION,
                     cohort, modification, true), getRef());
             expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS);
 
@@ -907,7 +909,7 @@ public class ShardTest extends AbstractActorTest {
             // Simulate the ForwardedReadyTransaction messages that would be sent
             // by the ShardTransaction.
 
-            shard.tell(new ForwardedReadyTransaction(transactionID, CURRENT_CLIENT_VERSION,
+            shard.tell(new ForwardedReadyTransaction(transactionID, CURRENT_VERSION,
                     cohort, modification, true), getRef());
             expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS);
 
@@ -960,7 +962,7 @@ public class ShardTest extends AbstractActorTest {
                     TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME),
                     modification, preCommit);
 
-            shard.tell(new ForwardedReadyTransaction(transactionID, CURRENT_CLIENT_VERSION,
+            shard.tell(new ForwardedReadyTransaction(transactionID, CURRENT_VERSION,
                     cohort, modification, true), getRef());
             expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS);
 
@@ -1025,11 +1027,11 @@ public class ShardTest extends AbstractActorTest {
 
             // Ready the Tx's
 
-            shard.tell(new ForwardedReadyTransaction(transactionID1, CURRENT_CLIENT_VERSION,
+            shard.tell(new ForwardedReadyTransaction(transactionID1, CURRENT_VERSION,
                     cohort1, modification1, true), getRef());
             expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS);
 
-            shard.tell(new ForwardedReadyTransaction(transactionID2, CURRENT_CLIENT_VERSION,
+            shard.tell(new ForwardedReadyTransaction(transactionID2, CURRENT_VERSION,
                     cohort2, modification2, true), getRef());
             expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS);
 
@@ -1089,15 +1091,15 @@ public class ShardTest extends AbstractActorTest {
 
             // Ready the Tx's
 
-            shard.tell(new ForwardedReadyTransaction(transactionID1, CURRENT_CLIENT_VERSION,
+            shard.tell(new ForwardedReadyTransaction(transactionID1, CURRENT_VERSION,
                     cohort1, modification1, true), getRef());
             expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS);
 
-            shard.tell(new ForwardedReadyTransaction(transactionID2, CURRENT_CLIENT_VERSION,
+            shard.tell(new ForwardedReadyTransaction(transactionID2, CURRENT_VERSION,
                     cohort2, modification2, true), getRef());
             expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS);
 
-            shard.tell(new ForwardedReadyTransaction(transactionID3, CURRENT_CLIENT_VERSION,
+            shard.tell(new ForwardedReadyTransaction(transactionID3, CURRENT_VERSION,
                     cohort3, modification3, true), getRef());
             expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS);
 
@@ -1161,11 +1163,11 @@ public class ShardTest extends AbstractActorTest {
             // Simulate the ForwardedReadyTransaction messages that would be sent
             // by the ShardTransaction.
 
-            shard.tell(new ForwardedReadyTransaction(transactionID1, CURRENT_CLIENT_VERSION,
+            shard.tell(new ForwardedReadyTransaction(transactionID1, CURRENT_VERSION,
                     cohort1, modification1, true), getRef());
             expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS);
 
-            shard.tell(new ForwardedReadyTransaction(transactionID2, CURRENT_CLIENT_VERSION,
+            shard.tell(new ForwardedReadyTransaction(transactionID2, CURRENT_VERSION,
                     cohort2, modification2, true), getRef());
             expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS);
 
index 261bfef..45b00f5 100644 (file)
@@ -77,7 +77,7 @@ public class ShardTransactionFailureTest extends AbstractActorTest {
         final ActorRef shard = createShard();
         final Props props = ShardTransaction.props(store.newReadOnlyTransaction(), shard,
                 testSchemaContext, datastoreContext, shardStats, "txn",
-                CreateTransaction.CURRENT_CLIENT_VERSION);
+                CreateTransaction.CURRENT_VERSION);
 
         final TestActorRef<ShardTransaction> subject = TestActorRef
             .create(getSystem(), props,
@@ -107,7 +107,7 @@ public class ShardTransactionFailureTest extends AbstractActorTest {
         final ActorRef shard = createShard();
         final Props props = ShardTransaction.props(store.newReadWriteTransaction(), shard,
                 testSchemaContext, datastoreContext, shardStats, "txn",
-                CreateTransaction.CURRENT_CLIENT_VERSION);
+                CreateTransaction.CURRENT_VERSION);
 
         final TestActorRef<ShardTransaction> subject = TestActorRef
             .create(getSystem(), props,
@@ -137,7 +137,7 @@ public class ShardTransactionFailureTest extends AbstractActorTest {
         final ActorRef shard = createShard();
         final Props props = ShardTransaction.props(store.newReadWriteTransaction(), shard,
                 testSchemaContext, datastoreContext, shardStats, "txn",
-                CreateTransaction.CURRENT_CLIENT_VERSION);
+                CreateTransaction.CURRENT_VERSION);
 
         final TestActorRef<ShardTransaction> subject = TestActorRef
             .create(getSystem(), props,
@@ -167,7 +167,7 @@ public class ShardTransactionFailureTest extends AbstractActorTest {
         final ActorRef shard = createShard();
         final Props props = ShardTransaction.props(store.newWriteOnlyTransaction(), shard,
                 testSchemaContext, datastoreContext, shardStats, "txn",
-                CreateTransaction.CURRENT_CLIENT_VERSION);
+                CreateTransaction.CURRENT_VERSION);
 
         final TestActorRef<ShardTransaction> subject = TestActorRef
             .create(getSystem(), props,
@@ -200,7 +200,7 @@ public class ShardTransactionFailureTest extends AbstractActorTest {
         final ActorRef shard = createShard();
         final Props props = ShardTransaction.props(store.newReadWriteTransaction(), shard,
                 testSchemaContext, datastoreContext, shardStats, "txn",
-                CreateTransaction.CURRENT_CLIENT_VERSION);
+                CreateTransaction.CURRENT_VERSION);
 
         final TestActorRef<ShardTransaction> subject = TestActorRef
             .create(getSystem(), props,
@@ -238,7 +238,7 @@ public class ShardTransactionFailureTest extends AbstractActorTest {
         final ActorRef shard = createShard();
         final Props props = ShardTransaction.props(store.newReadWriteTransaction(), shard,
                 testSchemaContext, datastoreContext, shardStats, "txn",
-                CreateTransaction.CURRENT_CLIENT_VERSION);
+                CreateTransaction.CURRENT_VERSION);
 
         final TestActorRef<ShardTransaction> subject = TestActorRef
             .create(getSystem(), props, "testNegativeMergeTransactionReady");
@@ -271,7 +271,7 @@ public class ShardTransactionFailureTest extends AbstractActorTest {
         final ActorRef shard = createShard();
         final Props props = ShardTransaction.props(store.newReadWriteTransaction(), shard,
                 testSchemaContext, datastoreContext, shardStats, "txn",
-                CreateTransaction.CURRENT_CLIENT_VERSION);
+                CreateTransaction.CURRENT_VERSION);
 
         final TestActorRef<ShardTransaction> subject = TestActorRef
             .create(getSystem(), props,
index cc3bb55..c869be8 100644 (file)
@@ -80,13 +80,13 @@ public class ShardTransactionTest extends AbstractActorTest {
             final ActorRef shard = createShard();
             Props props = ShardTransaction.props(store.newReadOnlyTransaction(), shard,
                     testSchemaContext, datastoreContext, shardStats, "txn",
-                    CreateTransaction.CURRENT_CLIENT_VERSION);
+                    CreateTransaction.CURRENT_VERSION);
 
             testOnReceiveReadData(getSystem().actorOf(props, "testReadDataRO"));
 
             props = ShardTransaction.props(store.newReadWriteTransaction(), shard,
                     testSchemaContext, datastoreContext, shardStats, "txn",
-                    CreateTransaction.CURRENT_CLIENT_VERSION);
+                    CreateTransaction.CURRENT_VERSION);
 
             testOnReceiveReadData(getSystem().actorOf(props, "testReadDataRW"));
         }
@@ -118,14 +118,14 @@ public class ShardTransactionTest extends AbstractActorTest {
             final ActorRef shard = createShard();
             Props props = ShardTransaction.props( store.newReadOnlyTransaction(), shard,
                     testSchemaContext, datastoreContext, shardStats, "txn",
-                    CreateTransaction.CURRENT_CLIENT_VERSION);
+                    CreateTransaction.CURRENT_VERSION);
 
             testOnReceiveReadDataWhenDataNotFound(getSystem().actorOf(
                     props, "testReadDataWhenDataNotFoundRO"));
 
             props = ShardTransaction.props( store.newReadWriteTransaction(), shard,
                     testSchemaContext, datastoreContext, shardStats, "txn",
-                    CreateTransaction.CURRENT_CLIENT_VERSION);
+                    CreateTransaction.CURRENT_VERSION);
 
             testOnReceiveReadDataWhenDataNotFound(getSystem().actorOf(
                     props, "testReadDataWhenDataNotFoundRW"));
@@ -156,13 +156,13 @@ public class ShardTransactionTest extends AbstractActorTest {
             final ActorRef shard = createShard();
             Props props = ShardTransaction.props(store.newReadOnlyTransaction(), shard,
                     testSchemaContext, datastoreContext, shardStats, "txn",
-                    CreateTransaction.CURRENT_CLIENT_VERSION);
+                    CreateTransaction.CURRENT_VERSION);
 
             testOnReceiveDataExistsPositive(getSystem().actorOf(props, "testDataExistsPositiveRO"));
 
             props = ShardTransaction.props(store.newReadWriteTransaction(), shard,
                     testSchemaContext, datastoreContext, shardStats, "txn",
-                    CreateTransaction.CURRENT_CLIENT_VERSION);
+                    CreateTransaction.CURRENT_VERSION);
 
             testOnReceiveDataExistsPositive(getSystem().actorOf(props, "testDataExistsPositiveRW"));
         }
@@ -191,13 +191,13 @@ public class ShardTransactionTest extends AbstractActorTest {
             final ActorRef shard = createShard();
             Props props = ShardTransaction.props(store.newReadOnlyTransaction(), shard,
                     testSchemaContext, datastoreContext, shardStats, "txn",
-                    CreateTransaction.CURRENT_CLIENT_VERSION);
+                    CreateTransaction.CURRENT_VERSION);
 
             testOnReceiveDataExistsNegative(getSystem().actorOf(props, "testDataExistsNegativeRO"));
 
             props = ShardTransaction.props(store.newReadWriteTransaction(), shard,
                     testSchemaContext, datastoreContext, shardStats, "txn",
-                    CreateTransaction.CURRENT_CLIENT_VERSION);
+                    CreateTransaction.CURRENT_VERSION);
 
             testOnReceiveDataExistsNegative(getSystem().actorOf(props, "testDataExistsNegativeRW"));
         }
@@ -238,7 +238,7 @@ public class ShardTransactionTest extends AbstractActorTest {
             final ActorRef shard = createShard();
             final Props props = ShardTransaction.props(store.newWriteOnlyTransaction(), shard,
                     testSchemaContext, datastoreContext, shardStats, "txn",
-                    CreateTransaction.CURRENT_CLIENT_VERSION);
+                    CreateTransaction.CURRENT_VERSION);
             final ActorRef transaction = getSystem().actorOf(props, "testWriteData");
 
             transaction.tell(new WriteData(TestModel.TEST_PATH,
@@ -265,7 +265,7 @@ public class ShardTransactionTest extends AbstractActorTest {
             final ActorRef shard = createShard();
             final Props props = ShardTransaction.props(store.newReadWriteTransaction(), shard,
                     testSchemaContext, datastoreContext, shardStats, "txn",
-                    CreateTransaction.CURRENT_CLIENT_VERSION);
+                    CreateTransaction.CURRENT_VERSION);
             final ActorRef transaction = getSystem().actorOf(props, "testMergeData");
 
             transaction.tell(new MergeData(TestModel.TEST_PATH,
@@ -291,7 +291,7 @@ public class ShardTransactionTest extends AbstractActorTest {
             final ActorRef shard = createShard();
             final Props props = ShardTransaction.props( store.newWriteOnlyTransaction(), shard,
                     testSchemaContext, datastoreContext, shardStats, "txn",
-                    CreateTransaction.CURRENT_CLIENT_VERSION);
+                    CreateTransaction.CURRENT_VERSION);
             final ActorRef transaction = getSystem().actorOf(props, "testDeleteData");
 
             transaction.tell(new DeleteData(TestModel.TEST_PATH).toSerializable(), getRef());
@@ -314,7 +314,7 @@ public class ShardTransactionTest extends AbstractActorTest {
             final ActorRef shard = createShard();
             final Props props = ShardTransaction.props( store.newReadWriteTransaction(), shard,
                     testSchemaContext, datastoreContext, shardStats, "txn",
-                    CreateTransaction.CURRENT_CLIENT_VERSION);
+                    CreateTransaction.CURRENT_VERSION);
             final ActorRef transaction = getSystem().actorOf(props, "testReadyTransaction");
 
             watch(transaction);
@@ -332,7 +332,7 @@ public class ShardTransactionTest extends AbstractActorTest {
             final ActorRef shard = createShard();
             final Props props = ShardTransaction.props( store.newReadWriteTransaction(), shard,
                 testSchemaContext, datastoreContext, shardStats, "txn",
-                CreateTransaction.CURRENT_CLIENT_VERSION);
+                CreateTransaction.CURRENT_VERSION);
             final ActorRef transaction = getSystem().actorOf(props, "testReadyTransaction2");
 
             watch(transaction);
@@ -354,7 +354,7 @@ public class ShardTransactionTest extends AbstractActorTest {
             final ActorRef shard = createShard();
             final Props props = ShardTransaction.props(store.newReadWriteTransaction(), shard,
                     testSchemaContext, datastoreContext, shardStats, "txn",
-                    CreateTransaction.CURRENT_CLIENT_VERSION);
+                    CreateTransaction.CURRENT_VERSION);
             final ActorRef transaction = getSystem().actorOf(props, "testCloseTransaction");
 
             watch(transaction);
@@ -371,7 +371,7 @@ public class ShardTransactionTest extends AbstractActorTest {
         final ActorRef shard = createShard();
         final Props props = ShardTransaction.props(store.newReadOnlyTransaction(), shard,
                 testSchemaContext, datastoreContext, shardStats, "txn",
-                CreateTransaction.CURRENT_CLIENT_VERSION);
+                CreateTransaction.CURRENT_VERSION);
         final TestActorRef<ShardTransaction> transaction = TestActorRef.apply(props,getSystem());
 
         transaction.receive(new DeleteData(TestModel.TEST_PATH).toSerializable(), ActorRef.noSender());
@@ -387,7 +387,7 @@ public class ShardTransactionTest extends AbstractActorTest {
             final ActorRef shard = createShard();
             final Props props = ShardTransaction.props(store.newReadWriteTransaction(), shard,
                     testSchemaContext, datastoreContext, shardStats, "txn",
-                    CreateTransaction.CURRENT_CLIENT_VERSION);
+                    CreateTransaction.CURRENT_VERSION);
             final ActorRef transaction =
                 getSystem().actorOf(props, "testShardTransactionInactivity");
 
index 35f346f..b77b0b6 100644 (file)
@@ -338,13 +338,15 @@ public class TransactionProxyTest {
         return getSystem().actorSelection(actorRef.path());
     }
 
-    private CreateTransactionReply createTransactionReply(ActorRef actorRef){
+    private CreateTransactionReply createTransactionReply(ActorRef actorRef, int transactionVersion){
         return CreateTransactionReply.newBuilder()
             .setTransactionActorPath(actorRef.path().toString())
-            .setTransactionId("txn-1").build();
+            .setTransactionId("txn-1")
+            .setMessageVersion(transactionVersion)
+            .build();
     }
 
-    private ActorRef setupActorContextWithInitialCreateTransaction(ActorSystem actorSystem, TransactionType type) {
+    private ActorRef setupActorContextWithInitialCreateTransaction(ActorSystem actorSystem, TransactionType type, int transactionVersion) {
         ActorRef actorRef = actorSystem.actorOf(Props.create(DoNothingActor.class));
         doReturn(actorSystem.actorSelection(actorRef.path())).
                 when(mockActorContext).actorSelection(actorRef.path().toString());
@@ -352,7 +354,7 @@ public class TransactionProxyTest {
         doReturn(Futures.successful(actorSystem.actorSelection(actorRef.path()))).
                 when(mockActorContext).findPrimaryShardAsync(eq(DefaultShardStrategy.DEFAULT_SHARD));
 
-        doReturn(Futures.successful(createTransactionReply(actorRef))).when(mockActorContext).
+        doReturn(Futures.successful(createTransactionReply(actorRef, transactionVersion))).when(mockActorContext).
                 executeOperationAsync(eq(actorSystem.actorSelection(actorRef.path())),
                         eqCreateTransaction(memberName, type));
 
@@ -361,6 +363,11 @@ public class TransactionProxyTest {
         return actorRef;
     }
 
+    private ActorRef setupActorContextWithInitialCreateTransaction(ActorSystem actorSystem, TransactionType type) {
+        return setupActorContextWithInitialCreateTransaction(actorSystem, type, CreateTransaction.CURRENT_VERSION);
+    }
+
+
     private void propagateReadFailedExceptionCause(CheckedFuture<?, ReadFailedException> future)
             throws Throwable {
 
@@ -835,6 +842,47 @@ public class TransactionProxyTest {
         verifyCohortFutures(proxy, getSystem().actorSelection(actorRef.path()));
     }
 
+    @SuppressWarnings("unchecked")
+    @Test
+    public void testReadyForwardCompatibility() throws Exception {
+        ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_WRITE, 0);
+
+        NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
+
+        doReturn(readSerializedDataReply(null)).when(mockActorContext).executeOperationAsync(
+                eq(actorSelection(actorRef)), eqSerializedReadData());
+
+        doReturn(writeSerializedDataReply()).when(mockActorContext).executeOperationAsync(
+                eq(actorSelection(actorRef)), eqSerializedWriteData(nodeToWrite));
+
+        doReturn(readySerializedTxReply(actorRef.path().toString())).when(mockActorContext).executeOperationAsync(
+                eq(actorSelection(actorRef)), isA(ReadyTransaction.SERIALIZABLE_CLASS));
+
+        doReturn(actorRef.path().toString()).when(mockActorContext).resolvePath(eq(actorRef.path().toString()),
+                eq(actorRef.path().toString()));
+
+        TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
+                READ_WRITE);
+
+        transactionProxy.read(TestModel.TEST_PATH);
+
+        transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
+
+        DOMStoreThreePhaseCommitCohort ready = transactionProxy.ready();
+
+        assertTrue(ready instanceof ThreePhaseCommitCohortProxy);
+
+        ThreePhaseCommitCohortProxy proxy = (ThreePhaseCommitCohortProxy) ready;
+
+        verifyRecordingOperationFutures(transactionProxy.getRecordedOperationFutures(),
+                WriteDataReply.SERIALIZABLE_CLASS);
+
+        verifyCohortFutures(proxy, getSystem().actorSelection(actorRef.path()));
+
+        verify(mockActorContext).resolvePath(eq(actorRef.path().toString()),
+                eq(actorRef.path().toString()));
+    }
+
     @SuppressWarnings("unchecked")
     @Test
     public void testReadyWithRecordingOperationFailure() throws Exception {
index 60f9a2d..39d337e 100644 (file)
@@ -2,6 +2,7 @@ package org.opendaylight.controller.cluster.datastore.utils;
 
 import akka.actor.ActorRef;
 import akka.actor.ActorSelection;
+import akka.actor.ActorSystem;
 import akka.actor.Props;
 import akka.actor.UntypedActor;
 import akka.japi.Creator;
@@ -17,7 +18,9 @@ import org.opendaylight.controller.cluster.datastore.messages.LocalShardNotFound
 import scala.concurrent.Await;
 import scala.concurrent.Future;
 import scala.concurrent.duration.Duration;
+
 import java.util.concurrent.TimeUnit;
+
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
 import static org.mockito.Mockito.mock;
@@ -182,4 +185,51 @@ public class ActorContextTest extends AbstractActorTest{
         clusterWrapper.setSelfAddress("akka.tcp://system@127.0.0.1:2551/");
         assertEquals(false, actorContext.isLocalPath("akka.tcp://system@127.0.0.1:2550/"));
     }
+
+    @Test
+    public void testResolvePathForRemoteActor() {
+        ActorContext actorContext =
+                new ActorContext(mock(ActorSystem.class), mock(ActorRef.class), mock(
+                        ClusterWrapper.class),
+                        mock(Configuration.class));
+
+        String actual = actorContext.resolvePath(
+                "akka.tcp://system@127.0.0.1:2550/user/shardmanager/shard",
+                "akka://system/user/shardmanager/shard/transaction");
+
+        String expected = "akka.tcp://system@127.0.0.1:2550/user/shardmanager/shard/transaction";
+
+        assertEquals(expected, actual);
+    }
+
+    @Test
+    public void testResolvePathForLocalActor() {
+        ActorContext actorContext =
+                new ActorContext(getSystem(), mock(ActorRef.class), mock(ClusterWrapper.class),
+                        mock(Configuration.class));
+
+        String actual = actorContext.resolvePath(
+                "akka://system/user/shardmanager/shard",
+                "akka://system/user/shardmanager/shard/transaction");
+
+        String expected = "akka://system/user/shardmanager/shard/transaction";
+
+        assertEquals(expected, actual);
+    }
+
+    @Test
+    public void testResolvePathForRemoteActorWithProperRemoteAddress() {
+        ActorContext actorContext =
+                new ActorContext(getSystem(), mock(ActorRef.class), mock(ClusterWrapper.class),
+                        mock(Configuration.class));
+
+        String actual = actorContext.resolvePath(
+                "akka.tcp://system@7.0.0.1:2550/user/shardmanager/shard",
+                "akka.tcp://system@7.0.0.1:2550/user/shardmanager/shard/transaction");
+
+        String expected = "akka.tcp://system@7.0.0.1:2550/user/shardmanager/shard/transaction";
+
+        assertEquals(expected, actual);
+    }
+
 }