Bug 2294: Handle Shard backwards compatibility 44/12444/2
authortpantelis <tpanteli@brocade.com>
Thu, 30 Oct 2014 11:30:32 +0000 (07:30 -0400)
committertpantelis <tpanteli@brocade.com>
Thu, 30 Oct 2014 11:40:46 +0000 (07:40 -0400)
Implemented as outlined in Bug 2294.

Change-Id: I14aa8ef5f320f9d165492396ece4ea63cce9b0c3
Signed-off-by: tpantelis <tpanteli@brocade.com>
17 files changed:
opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/protobuff/messages/cohort3pc/ThreePhaseCommitCohortMessages.java
opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/protobuff/messages/transaction/ShardTransactionMessages.java
opendaylight/md-sal/sal-clustering-commons/src/main/resources/Cohort.proto
opendaylight/md-sal/sal-clustering-commons/src/main/resources/ShardTransaction.proto
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardReadTransaction.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardReadWriteTransaction.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardTransaction.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardTransactionChain.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardWriteTransaction.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/compat/BackwardsCompatibleThreePhaseCommitCohort.java [new file with mode: 0644]
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/CreateTransaction.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/ForwardedReadyTransaction.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTransactionFailureTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTransactionHeliumBackwardsCompatibilityTest.java [new file with mode: 0644]
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTransactionTest.java

index e43b44582d77b5526234e301535d368a2f83fede..2a79a5b8275ca8a547b39268dbc8764aa529def6 100644 (file)
@@ -11,17 +11,17 @@ public final class ThreePhaseCommitCohortMessages {
   public interface CanCommitTransactionOrBuilder
       extends com.google.protobuf.MessageOrBuilder {
 
-    // required string transactionId = 1;
+    // optional string transactionId = 1;
     /**
-     * <code>required string transactionId = 1;</code>
+     * <code>optional string transactionId = 1;</code>
      */
     boolean hasTransactionId();
     /**
-     * <code>required string transactionId = 1;</code>
+     * <code>optional string transactionId = 1;</code>
      */
     java.lang.String getTransactionId();
     /**
-     * <code>required string transactionId = 1;</code>
+     * <code>optional string transactionId = 1;</code>
      */
     com.google.protobuf.ByteString
         getTransactionIdBytes();
@@ -122,17 +122,17 @@ public final class ThreePhaseCommitCohortMessages {
     }
 
     private int bitField0_;
-    // required string transactionId = 1;
+    // optional string transactionId = 1;
     public static final int TRANSACTIONID_FIELD_NUMBER = 1;
     private java.lang.Object transactionId_;
     /**
-     * <code>required string transactionId = 1;</code>
+     * <code>optional string transactionId = 1;</code>
      */
     public boolean hasTransactionId() {
       return ((bitField0_ & 0x00000001) == 0x00000001);
     }
     /**
-     * <code>required string transactionId = 1;</code>
+     * <code>optional string transactionId = 1;</code>
      */
     public java.lang.String getTransactionId() {
       java.lang.Object ref = transactionId_;
@@ -149,7 +149,7 @@ public final class ThreePhaseCommitCohortMessages {
       }
     }
     /**
-     * <code>required string transactionId = 1;</code>
+     * <code>optional string transactionId = 1;</code>
      */
     public com.google.protobuf.ByteString
         getTransactionIdBytes() {
@@ -173,10 +173,6 @@ public final class ThreePhaseCommitCohortMessages {
       byte isInitialized = memoizedIsInitialized;
       if (isInitialized != -1) return isInitialized == 1;
 
-      if (!hasTransactionId()) {
-        memoizedIsInitialized = 0;
-        return false;
-      }
       memoizedIsInitialized = 1;
       return true;
     }
@@ -376,10 +372,6 @@ public final class ThreePhaseCommitCohortMessages {
       }
 
       public final boolean isInitialized() {
-        if (!hasTransactionId()) {
-
-          return false;
-        }
         return true;
       }
 
@@ -402,16 +394,16 @@ public final class ThreePhaseCommitCohortMessages {
       }
       private int bitField0_;
 
-      // required string transactionId = 1;
+      // optional string transactionId = 1;
       private java.lang.Object transactionId_ = "";
       /**
-       * <code>required string transactionId = 1;</code>
+       * <code>optional string transactionId = 1;</code>
        */
       public boolean hasTransactionId() {
         return ((bitField0_ & 0x00000001) == 0x00000001);
       }
       /**
-       * <code>required string transactionId = 1;</code>
+       * <code>optional string transactionId = 1;</code>
        */
       public java.lang.String getTransactionId() {
         java.lang.Object ref = transactionId_;
@@ -425,7 +417,7 @@ public final class ThreePhaseCommitCohortMessages {
         }
       }
       /**
-       * <code>required string transactionId = 1;</code>
+       * <code>optional string transactionId = 1;</code>
        */
       public com.google.protobuf.ByteString
           getTransactionIdBytes() {
@@ -441,7 +433,7 @@ public final class ThreePhaseCommitCohortMessages {
         }
       }
       /**
-       * <code>required string transactionId = 1;</code>
+       * <code>optional string transactionId = 1;</code>
        */
       public Builder setTransactionId(
           java.lang.String value) {
@@ -454,7 +446,7 @@ public final class ThreePhaseCommitCohortMessages {
         return this;
       }
       /**
-       * <code>required string transactionId = 1;</code>
+       * <code>optional string transactionId = 1;</code>
        */
       public Builder clearTransactionId() {
         bitField0_ = (bitField0_ & ~0x00000001);
@@ -463,7 +455,7 @@ public final class ThreePhaseCommitCohortMessages {
         return this;
       }
       /**
-       * <code>required string transactionId = 1;</code>
+       * <code>optional string transactionId = 1;</code>
        */
       public Builder setTransactionIdBytes(
           com.google.protobuf.ByteString value) {
@@ -894,17 +886,17 @@ public final class ThreePhaseCommitCohortMessages {
   public interface AbortTransactionOrBuilder
       extends com.google.protobuf.MessageOrBuilder {
 
-    // required string transactionId = 1;
+    // optional string transactionId = 1;
     /**
-     * <code>required string transactionId = 1;</code>
+     * <code>optional string transactionId = 1;</code>
      */
     boolean hasTransactionId();
     /**
-     * <code>required string transactionId = 1;</code>
+     * <code>optional string transactionId = 1;</code>
      */
     java.lang.String getTransactionId();
     /**
-     * <code>required string transactionId = 1;</code>
+     * <code>optional string transactionId = 1;</code>
      */
     com.google.protobuf.ByteString
         getTransactionIdBytes();
@@ -1005,17 +997,17 @@ public final class ThreePhaseCommitCohortMessages {
     }
 
     private int bitField0_;
-    // required string transactionId = 1;
+    // optional string transactionId = 1;
     public static final int TRANSACTIONID_FIELD_NUMBER = 1;
     private java.lang.Object transactionId_;
     /**
-     * <code>required string transactionId = 1;</code>
+     * <code>optional string transactionId = 1;</code>
      */
     public boolean hasTransactionId() {
       return ((bitField0_ & 0x00000001) == 0x00000001);
     }
     /**
-     * <code>required string transactionId = 1;</code>
+     * <code>optional string transactionId = 1;</code>
      */
     public java.lang.String getTransactionId() {
       java.lang.Object ref = transactionId_;
@@ -1032,7 +1024,7 @@ public final class ThreePhaseCommitCohortMessages {
       }
     }
     /**
-     * <code>required string transactionId = 1;</code>
+     * <code>optional string transactionId = 1;</code>
      */
     public com.google.protobuf.ByteString
         getTransactionIdBytes() {
@@ -1056,10 +1048,6 @@ public final class ThreePhaseCommitCohortMessages {
       byte isInitialized = memoizedIsInitialized;
       if (isInitialized != -1) return isInitialized == 1;
 
-      if (!hasTransactionId()) {
-        memoizedIsInitialized = 0;
-        return false;
-      }
       memoizedIsInitialized = 1;
       return true;
     }
@@ -1259,10 +1247,6 @@ public final class ThreePhaseCommitCohortMessages {
       }
 
       public final boolean isInitialized() {
-        if (!hasTransactionId()) {
-
-          return false;
-        }
         return true;
       }
 
@@ -1285,16 +1269,16 @@ public final class ThreePhaseCommitCohortMessages {
       }
       private int bitField0_;
 
-      // required string transactionId = 1;
+      // optional string transactionId = 1;
       private java.lang.Object transactionId_ = "";
       /**
-       * <code>required string transactionId = 1;</code>
+       * <code>optional string transactionId = 1;</code>
        */
       public boolean hasTransactionId() {
         return ((bitField0_ & 0x00000001) == 0x00000001);
       }
       /**
-       * <code>required string transactionId = 1;</code>
+       * <code>optional string transactionId = 1;</code>
        */
       public java.lang.String getTransactionId() {
         java.lang.Object ref = transactionId_;
@@ -1308,7 +1292,7 @@ public final class ThreePhaseCommitCohortMessages {
         }
       }
       /**
-       * <code>required string transactionId = 1;</code>
+       * <code>optional string transactionId = 1;</code>
        */
       public com.google.protobuf.ByteString
           getTransactionIdBytes() {
@@ -1324,7 +1308,7 @@ public final class ThreePhaseCommitCohortMessages {
         }
       }
       /**
-       * <code>required string transactionId = 1;</code>
+       * <code>optional string transactionId = 1;</code>
        */
       public Builder setTransactionId(
           java.lang.String value) {
@@ -1337,7 +1321,7 @@ public final class ThreePhaseCommitCohortMessages {
         return this;
       }
       /**
-       * <code>required string transactionId = 1;</code>
+       * <code>optional string transactionId = 1;</code>
        */
       public Builder clearTransactionId() {
         bitField0_ = (bitField0_ & ~0x00000001);
@@ -1346,7 +1330,7 @@ public final class ThreePhaseCommitCohortMessages {
         return this;
       }
       /**
-       * <code>required string transactionId = 1;</code>
+       * <code>optional string transactionId = 1;</code>
        */
       public Builder setTransactionIdBytes(
           com.google.protobuf.ByteString value) {
@@ -1682,17 +1666,17 @@ public final class ThreePhaseCommitCohortMessages {
   public interface CommitTransactionOrBuilder
       extends com.google.protobuf.MessageOrBuilder {
 
-    // required string transactionId = 1;
+    // optional string transactionId = 1;
     /**
-     * <code>required string transactionId = 1;</code>
+     * <code>optional string transactionId = 1;</code>
      */
     boolean hasTransactionId();
     /**
-     * <code>required string transactionId = 1;</code>
+     * <code>optional string transactionId = 1;</code>
      */
     java.lang.String getTransactionId();
     /**
-     * <code>required string transactionId = 1;</code>
+     * <code>optional string transactionId = 1;</code>
      */
     com.google.protobuf.ByteString
         getTransactionIdBytes();
@@ -1793,17 +1777,17 @@ public final class ThreePhaseCommitCohortMessages {
     }
 
     private int bitField0_;
-    // required string transactionId = 1;
+    // optional string transactionId = 1;
     public static final int TRANSACTIONID_FIELD_NUMBER = 1;
     private java.lang.Object transactionId_;
     /**
-     * <code>required string transactionId = 1;</code>
+     * <code>optional string transactionId = 1;</code>
      */
     public boolean hasTransactionId() {
       return ((bitField0_ & 0x00000001) == 0x00000001);
     }
     /**
-     * <code>required string transactionId = 1;</code>
+     * <code>optional string transactionId = 1;</code>
      */
     public java.lang.String getTransactionId() {
       java.lang.Object ref = transactionId_;
@@ -1820,7 +1804,7 @@ public final class ThreePhaseCommitCohortMessages {
       }
     }
     /**
-     * <code>required string transactionId = 1;</code>
+     * <code>optional string transactionId = 1;</code>
      */
     public com.google.protobuf.ByteString
         getTransactionIdBytes() {
@@ -1844,10 +1828,6 @@ public final class ThreePhaseCommitCohortMessages {
       byte isInitialized = memoizedIsInitialized;
       if (isInitialized != -1) return isInitialized == 1;
 
-      if (!hasTransactionId()) {
-        memoizedIsInitialized = 0;
-        return false;
-      }
       memoizedIsInitialized = 1;
       return true;
     }
@@ -2047,10 +2027,6 @@ public final class ThreePhaseCommitCohortMessages {
       }
 
       public final boolean isInitialized() {
-        if (!hasTransactionId()) {
-
-          return false;
-        }
         return true;
       }
 
@@ -2073,16 +2049,16 @@ public final class ThreePhaseCommitCohortMessages {
       }
       private int bitField0_;
 
-      // required string transactionId = 1;
+      // optional string transactionId = 1;
       private java.lang.Object transactionId_ = "";
       /**
-       * <code>required string transactionId = 1;</code>
+       * <code>optional string transactionId = 1;</code>
        */
       public boolean hasTransactionId() {
         return ((bitField0_ & 0x00000001) == 0x00000001);
       }
       /**
-       * <code>required string transactionId = 1;</code>
+       * <code>optional string transactionId = 1;</code>
        */
       public java.lang.String getTransactionId() {
         java.lang.Object ref = transactionId_;
@@ -2096,7 +2072,7 @@ public final class ThreePhaseCommitCohortMessages {
         }
       }
       /**
-       * <code>required string transactionId = 1;</code>
+       * <code>optional string transactionId = 1;</code>
        */
       public com.google.protobuf.ByteString
           getTransactionIdBytes() {
@@ -2112,7 +2088,7 @@ public final class ThreePhaseCommitCohortMessages {
         }
       }
       /**
-       * <code>required string transactionId = 1;</code>
+       * <code>optional string transactionId = 1;</code>
        */
       public Builder setTransactionId(
           java.lang.String value) {
@@ -2125,7 +2101,7 @@ public final class ThreePhaseCommitCohortMessages {
         return this;
       }
       /**
-       * <code>required string transactionId = 1;</code>
+       * <code>optional string transactionId = 1;</code>
        */
       public Builder clearTransactionId() {
         bitField0_ = (bitField0_ & ~0x00000001);
@@ -2134,7 +2110,7 @@ public final class ThreePhaseCommitCohortMessages {
         return this;
       }
       /**
-       * <code>required string transactionId = 1;</code>
+       * <code>optional string transactionId = 1;</code>
        */
       public Builder setTransactionIdBytes(
           com.google.protobuf.ByteString value) {
@@ -3136,11 +3112,11 @@ public final class ThreePhaseCommitCohortMessages {
     java.lang.String[] descriptorData = {
       "\n\014Cohort.proto\022!org.opendaylight.control" +
       "ler.mdsal\"-\n\024CanCommitTransaction\022\025\n\rtra" +
-      "nsactionId\030\001 \002(\t\".\n\031CanCommitTransaction" +
+      "nsactionId\030\001 \001(\t\".\n\031CanCommitTransaction" +
       "Reply\022\021\n\tcanCommit\030\001 \002(\010\")\n\020AbortTransac" +
-      "tion\022\025\n\rtransactionId\030\001 \002(\t\"\027\n\025AbortTran" +
+      "tion\022\025\n\rtransactionId\030\001 \001(\t\"\027\n\025AbortTran" +
       "sactionReply\"*\n\021CommitTransaction\022\025\n\rtra" +
-      "nsactionId\030\001 \002(\t\"\030\n\026CommitTransactionRep" +
+      "nsactionId\030\001 \001(\t\"\030\n\026CommitTransactionRep" +
       "ly\"\026\n\024PreCommitTransaction\"\033\n\031PreCommitT" +
       "ransactionReplyBZ\n8org.opendaylight.cont" +
       "roller.protobuff.messages.cohort3pcB\036Thr",
index 96a39bddd376a9a777be0c056693cae8a892925d..3cd290c6de4b2bc57e7f1d61dc2fdd11ed293654 100644 (file)
@@ -668,6 +668,16 @@ public final class ShardTransactionMessages {
      */
     com.google.protobuf.ByteString
         getTransactionChainIdBytes();
+
+    // optional int32 messageVersion = 4;
+    /**
+     * <code>optional int32 messageVersion = 4;</code>
+     */
+    boolean hasMessageVersion();
+    /**
+     * <code>optional int32 messageVersion = 4;</code>
+     */
+    int getMessageVersion();
   }
   /**
    * Protobuf type {@code org.opendaylight.controller.mdsal.CreateTransaction}
@@ -735,6 +745,11 @@ public final class ShardTransactionMessages {
               transactionChainId_ = input.readBytes();
               break;
             }
+            case 32: {
+              bitField0_ |= 0x00000008;
+              messageVersion_ = input.readInt32();
+              break;
+            }
           }
         }
       } catch (com.google.protobuf.InvalidProtocolBufferException e) {
@@ -877,10 +892,27 @@ public final class ShardTransactionMessages {
       }
     }
 
+    // optional int32 messageVersion = 4;
+    public static final int MESSAGEVERSION_FIELD_NUMBER = 4;
+    private int messageVersion_;
+    /**
+     * <code>optional int32 messageVersion = 4;</code>
+     */
+    public boolean hasMessageVersion() {
+      return ((bitField0_ & 0x00000008) == 0x00000008);
+    }
+    /**
+     * <code>optional int32 messageVersion = 4;</code>
+     */
+    public int getMessageVersion() {
+      return messageVersion_;
+    }
+
     private void initFields() {
       transactionId_ = "";
       transactionType_ = 0;
       transactionChainId_ = "";
+      messageVersion_ = 0;
     }
     private byte memoizedIsInitialized = -1;
     public final boolean isInitialized() {
@@ -911,6 +943,9 @@ public final class ShardTransactionMessages {
       if (((bitField0_ & 0x00000004) == 0x00000004)) {
         output.writeBytes(3, getTransactionChainIdBytes());
       }
+      if (((bitField0_ & 0x00000008) == 0x00000008)) {
+        output.writeInt32(4, messageVersion_);
+      }
       getUnknownFields().writeTo(output);
     }
 
@@ -932,6 +967,10 @@ public final class ShardTransactionMessages {
         size += com.google.protobuf.CodedOutputStream
           .computeBytesSize(3, getTransactionChainIdBytes());
       }
+      if (((bitField0_ & 0x00000008) == 0x00000008)) {
+        size += com.google.protobuf.CodedOutputStream
+          .computeInt32Size(4, messageVersion_);
+      }
       size += getUnknownFields().getSerializedSize();
       memoizedSerializedSize = size;
       return size;
@@ -1054,6 +1093,8 @@ public final class ShardTransactionMessages {
         bitField0_ = (bitField0_ & ~0x00000002);
         transactionChainId_ = "";
         bitField0_ = (bitField0_ & ~0x00000004);
+        messageVersion_ = 0;
+        bitField0_ = (bitField0_ & ~0x00000008);
         return this;
       }
 
@@ -1094,6 +1135,10 @@ public final class ShardTransactionMessages {
           to_bitField0_ |= 0x00000004;
         }
         result.transactionChainId_ = transactionChainId_;
+        if (((from_bitField0_ & 0x00000008) == 0x00000008)) {
+          to_bitField0_ |= 0x00000008;
+        }
+        result.messageVersion_ = messageVersion_;
         result.bitField0_ = to_bitField0_;
         onBuilt();
         return result;
@@ -1123,6 +1168,9 @@ public final class ShardTransactionMessages {
           transactionChainId_ = other.transactionChainId_;
           onChanged();
         }
+        if (other.hasMessageVersion()) {
+          setMessageVersion(other.getMessageVersion());
+        }
         this.mergeUnknownFields(other.getUnknownFields());
         return this;
       }
@@ -1339,6 +1387,39 @@ public final class ShardTransactionMessages {
         return this;
       }
 
+      // optional int32 messageVersion = 4;
+      private int messageVersion_ ;
+      /**
+       * <code>optional int32 messageVersion = 4;</code>
+       */
+      public boolean hasMessageVersion() {
+        return ((bitField0_ & 0x00000008) == 0x00000008);
+      }
+      /**
+       * <code>optional int32 messageVersion = 4;</code>
+       */
+      public int getMessageVersion() {
+        return messageVersion_;
+      }
+      /**
+       * <code>optional int32 messageVersion = 4;</code>
+       */
+      public Builder setMessageVersion(int value) {
+        bitField0_ |= 0x00000008;
+        messageVersion_ = value;
+        onChanged();
+        return this;
+      }
+      /**
+       * <code>optional int32 messageVersion = 4;</code>
+       */
+      public Builder clearMessageVersion() {
+        bitField0_ = (bitField0_ & ~0x00000008);
+        messageVersion_ = 0;
+        onChanged();
+        return this;
+      }
+
       // @@protoc_insertion_point(builder_scope:org.opendaylight.controller.mdsal.CreateTransaction)
     }
 
@@ -7753,37 +7834,37 @@ public final class ShardTransactionMessages {
     java.lang.String[] descriptorData = {
       "\n\026ShardTransaction.proto\022!org.opendaylig" +
       "ht.controller.mdsal\032\014Common.proto\"\022\n\020Clo" +
-      "seTransaction\"\027\n\025CloseTransactionReply\"_" +
+      "seTransaction\"\027\n\025CloseTransactionReply\"w" +
       "\n\021CreateTransaction\022\025\n\rtransactionId\030\001 \002" +
       "(\t\022\027\n\017transactionType\030\002 \002(\005\022\032\n\022transacti" +
-      "onChainId\030\003 \001(\t\"M\n\026CreateTransactionRepl" +
-      "y\022\034\n\024transactionActorPath\030\001 \002(\t\022\025\n\rtrans" +
-      "actionId\030\002 \002(\t\"\022\n\020ReadyTransaction\"*\n\025Re" +
-      "adyTransactionReply\022\021\n\tactorPath\030\001 \002(\t\"l" +
-      "\n\nDeleteData\022^\n\037instanceIdentifierPathAr",
-      "guments\030\001 \002(\01325.org.opendaylight.control" +
-      "ler.mdsal.InstanceIdentifier\"\021\n\017DeleteDa" +
-      "taReply\"j\n\010ReadData\022^\n\037instanceIdentifie" +
-      "rPathArguments\030\001 \002(\01325.org.opendaylight." +
-      "controller.mdsal.InstanceIdentifier\"P\n\rR" +
-      "eadDataReply\022?\n\016normalizedNode\030\001 \001(\0132\'.o" +
-      "rg.opendaylight.controller.mdsal.Node\"\254\001" +
-      "\n\tWriteData\022^\n\037instanceIdentifierPathArg" +
-      "uments\030\001 \002(\01325.org.opendaylight.controll" +
-      "er.mdsal.InstanceIdentifier\022?\n\016normalize",
-      "dNode\030\002 \002(\0132\'.org.opendaylight.controlle" +
-      "r.mdsal.Node\"\020\n\016WriteDataReply\"\254\001\n\tMerge" +
-      "Data\022^\n\037instanceIdentifierPathArguments\030" +
-      "\001 \002(\01325.org.opendaylight.controller.mdsa" +
-      "l.InstanceIdentifier\022?\n\016normalizedNode\030\002" +
-      " \002(\0132\'.org.opendaylight.controller.mdsal" +
-      ".Node\"\020\n\016MergeDataReply\"l\n\nDataExists\022^\n" +
-      "\037instanceIdentifierPathArguments\030\001 \002(\01325" +
-      ".org.opendaylight.controller.mdsal.Insta" +
-      "nceIdentifier\"!\n\017DataExistsReply\022\016\n\006exis",
-      "ts\030\001 \002(\010BV\n:org.opendaylight.controller." +
-      "protobuff.messages.transactionB\030ShardTra" +
-      "nsactionMessages"
+      "onChainId\030\003 \001(\t\022\026\n\016messageVersion\030\004 \001(\005\"" +
+      "M\n\026CreateTransactionReply\022\034\n\024transaction" +
+      "ActorPath\030\001 \002(\t\022\025\n\rtransactionId\030\002 \002(\t\"\022" +
+      "\n\020ReadyTransaction\"*\n\025ReadyTransactionRe" +
+      "ply\022\021\n\tactorPath\030\001 \002(\t\"l\n\nDeleteData\022^\n\037",
+      "instanceIdentifierPathArguments\030\001 \002(\01325." +
+      "org.opendaylight.controller.mdsal.Instan" +
+      "ceIdentifier\"\021\n\017DeleteDataReply\"j\n\010ReadD" +
+      "ata\022^\n\037instanceIdentifierPathArguments\030\001" +
+      " \002(\01325.org.opendaylight.controller.mdsal" +
+      ".InstanceIdentifier\"P\n\rReadDataReply\022?\n\016" +
+      "normalizedNode\030\001 \001(\0132\'.org.opendaylight." +
+      "controller.mdsal.Node\"\254\001\n\tWriteData\022^\n\037i" +
+      "nstanceIdentifierPathArguments\030\001 \002(\01325.o" +
+      "rg.opendaylight.controller.mdsal.Instanc",
+      "eIdentifier\022?\n\016normalizedNode\030\002 \002(\0132\'.or" +
+      "g.opendaylight.controller.mdsal.Node\"\020\n\016" +
+      "WriteDataReply\"\254\001\n\tMergeData\022^\n\037instance" +
+      "IdentifierPathArguments\030\001 \002(\01325.org.open" +
+      "daylight.controller.mdsal.InstanceIdenti" +
+      "fier\022?\n\016normalizedNode\030\002 \002(\0132\'.org.opend" +
+      "aylight.controller.mdsal.Node\"\020\n\016MergeDa" +
+      "taReply\"l\n\nDataExists\022^\n\037instanceIdentif" +
+      "ierPathArguments\030\001 \002(\01325.org.opendayligh" +
+      "t.controller.mdsal.InstanceIdentifier\"!\n",
+      "\017DataExistsReply\022\016\n\006exists\030\001 \002(\010BV\n:org." +
+      "opendaylight.controller.protobuff.messag" +
+      "es.transactionB\030ShardTransactionMessages"
     };
     com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner assigner =
       new com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner() {
@@ -7807,7 +7888,7 @@ public final class ShardTransactionMessages {
           internal_static_org_opendaylight_controller_mdsal_CreateTransaction_fieldAccessorTable = new
             com.google.protobuf.GeneratedMessage.FieldAccessorTable(
               internal_static_org_opendaylight_controller_mdsal_CreateTransaction_descriptor,
-              new java.lang.String[] { "TransactionId", "TransactionType", "TransactionChainId", });
+              new java.lang.String[] { "TransactionId", "TransactionType", "TransactionChainId", "MessageVersion", });
           internal_static_org_opendaylight_controller_mdsal_CreateTransactionReply_descriptor =
             getDescriptor().getMessageTypes().get(3);
           internal_static_org_opendaylight_controller_mdsal_CreateTransactionReply_fieldAccessorTable = new
index 49c6cd07a866199415116349041a9defe039d18c..222f68ab4f8f0572f20abde916d4b61bb6f25390 100644 (file)
@@ -5,7 +5,7 @@ option java_outer_classname = "ThreePhaseCommitCohortMessages";
 
 
 message CanCommitTransaction{
-  required string transactionId = 1;
+  optional string transactionId = 1;
 }
 
 message CanCommitTransactionReply{
@@ -14,7 +14,7 @@ message CanCommitTransactionReply{
 }
 
 message AbortTransaction{
-  required string transactionId = 1;
+  optional string transactionId = 1;
 }
 
 message AbortTransactionReply {
@@ -22,7 +22,7 @@ message AbortTransactionReply {
 }
 
 message CommitTransaction{
-  required string transactionId = 1;
+  optional string transactionId = 1;
 }
 
 message CommitTransactionReply{
index 26581478d97a1449ff88a07383e9a88c554aa9a5..cd1132d99ddf34c3f4bdd2a7392360f870d0b8a1 100644 (file)
@@ -16,6 +16,7 @@ message CreateTransaction{
   required string transactionId = 1;
   required int32  transactionType =2;
   optional string transactionChainId = 3;
+  optional int32 messageVersion = 4;
 }
 
 message CreateTransactionReply{
index 7d67e0856fe69da8721cd7173c63ec79d294a83e..1bf32e7fca7191236afeeb2c7ed0048b28db499c 100644 (file)
@@ -8,29 +8,18 @@
 
 package org.opendaylight.controller.cluster.datastore;
 
-import akka.actor.ActorRef;
-import akka.actor.ActorSelection;
-import akka.actor.Cancellable;
-import akka.actor.PoisonPill;
-import akka.actor.Props;
-import akka.event.Logging;
-import akka.event.LoggingAdapter;
-import akka.japi.Creator;
-import akka.persistence.RecoveryFailure;
-import akka.serialization.Serialization;
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Optional;
-import com.google.common.base.Preconditions;
-import com.google.common.collect.Lists;
-import com.google.common.util.concurrent.FutureCallback;
-import com.google.common.util.concurrent.Futures;
-import com.google.common.util.concurrent.ListenableFuture;
-import com.google.protobuf.ByteString;
-import com.google.protobuf.InvalidProtocolBufferException;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import javax.annotation.Nonnull;
 import org.opendaylight.controller.cluster.DataPersistenceProvider;
 import org.opendaylight.controller.cluster.common.actor.CommonConfig;
 import org.opendaylight.controller.cluster.common.actor.MeteringBehavior;
 import org.opendaylight.controller.cluster.datastore.ShardCommitCoordinator.CohortEntry;
+import org.opendaylight.controller.cluster.datastore.compat.BackwardsCompatibleThreePhaseCommitCohort;
 import org.opendaylight.controller.cluster.datastore.exceptions.NoShardLeaderException;
 import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
 import org.opendaylight.controller.cluster.datastore.identifiers.ShardTransactionIdentifier;
@@ -76,14 +65,25 @@ import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
 import scala.concurrent.duration.Duration;
 import scala.concurrent.duration.FiniteDuration;
-
-import javax.annotation.Nonnull;
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.TimeUnit;
+import akka.actor.ActorRef;
+import akka.actor.ActorSelection;
+import akka.actor.Cancellable;
+import akka.actor.PoisonPill;
+import akka.actor.Props;
+import akka.event.Logging;
+import akka.event.LoggingAdapter;
+import akka.japi.Creator;
+import akka.persistence.RecoveryFailure;
+import akka.serialization.Serialization;
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Optional;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+import com.google.common.util.concurrent.FutureCallback;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.protobuf.ByteString;
+import com.google.protobuf.InvalidProtocolBufferException;
 
 /**
  * A Shard represents a portion of the logical data tree <br/>
@@ -93,6 +93,8 @@ import java.util.concurrent.TimeUnit;
  */
 public class Shard extends RaftActor {
 
+    private static final int HELIUM_1_TX_VERSION = 1;
+
     private static final Object COMMIT_TRANSACTION_REPLY = new CommitTransactionReply().toSerializable();
 
     private static final Object TX_COMMIT_TIMEOUT_CHECK_MESSAGE = "txCommitTimeoutCheck";
@@ -374,7 +376,8 @@ public class Shard extends RaftActor {
     }
 
     private void handleForwardedReadyTransaction(ForwardedReadyTransaction ready) {
-        LOG.debug("Readying transaction {}", ready.getTransactionID());
+        LOG.debug("Readying transaction {}, client version {}", ready.getTransactionID(),
+                ready.getTxnClientVersion());
 
         // This message is forwarded by the ShardTransaction on ready. We cache the cohort in the
         // commitCoordinator in preparation for the subsequent three phase commit initiated by
@@ -382,12 +385,22 @@ public class Shard extends RaftActor {
         commitCoordinator.transactionReady(ready.getTransactionID(), ready.getCohort(),
                 ready.getModification());
 
-        // Return our actor path as we'll handle the three phase commit.
-        ReadyTransactionReply readyTransactionReply =
-            new ReadyTransactionReply(Serialization.serializedActorPath(self()));
-        getSender().tell(
-            ready.isReturnSerialized() ? readyTransactionReply.toSerializable() : readyTransactionReply,
-            getSelf());
+        // Return our actor path as we'll handle the three phase commit, except if the Tx client
+        // version < 1 (Helium-1 version). This means the Tx was initiated by a base Helium version
+        // node. In that case, the subsequent 3-phase commit messages won't contain the
+        // transactionId so to maintain backwards compatibility, we create a separate cohort actor
+        // to provide the compatible behavior.
+        ActorRef replyActorPath = self();
+        if(ready.getTxnClientVersion() < HELIUM_1_TX_VERSION) {
+            LOG.debug("Creating BackwardsCompatibleThreePhaseCommitCohort");
+            replyActorPath = getContext().actorOf(BackwardsCompatibleThreePhaseCommitCohort.props(
+                    ready.getTransactionID()));
+        }
+
+        ReadyTransactionReply readyTransactionReply = new ReadyTransactionReply(
+                Serialization.serializedActorPath(replyActorPath));
+        getSender().tell(ready.isReturnSerialized() ? readyTransactionReply.toSerializable() :
+                readyTransactionReply, getSelf());
     }
 
     private void handleAbortTransaction(AbortTransaction abort) {
@@ -465,10 +478,8 @@ public class Shard extends RaftActor {
         }
     }
 
-    private ActorRef createTypedTransactionActor(
-        int transactionType,
-        ShardTransactionIdentifier transactionId,
-        String transactionChainId ) {
+    private ActorRef createTypedTransactionActor(int transactionType,
+            ShardTransactionIdentifier transactionId, String transactionChainId, int clientVersion ) {
 
         DOMStoreTransactionFactory factory = store;
 
@@ -492,7 +503,8 @@ public class Shard extends RaftActor {
             return getContext().actorOf(
                 ShardTransaction.props(factory.newReadOnlyTransaction(), getSelf(),
                         schemaContext,datastoreContext, shardMBean,
-                        transactionId.getRemoteTransactionId()), transactionId.toString());
+                        transactionId.getRemoteTransactionId(), clientVersion),
+                        transactionId.toString());
 
         } else if (transactionType == TransactionProxy.TransactionType.READ_WRITE.ordinal()) {
 
@@ -501,7 +513,8 @@ public class Shard extends RaftActor {
             return getContext().actorOf(
                 ShardTransaction.props(factory.newReadWriteTransaction(), getSelf(),
                         schemaContext, datastoreContext, shardMBean,
-                        transactionId.getRemoteTransactionId()), transactionId.toString());
+                        transactionId.getRemoteTransactionId(), clientVersion),
+                        transactionId.toString());
 
 
         } else if (transactionType == TransactionProxy.TransactionType.WRITE_ONLY.ordinal()) {
@@ -511,7 +524,8 @@ public class Shard extends RaftActor {
             return getContext().actorOf(
                 ShardTransaction.props(factory.newWriteOnlyTransaction(), getSelf(),
                         schemaContext, datastoreContext, shardMBean,
-                        transactionId.getRemoteTransactionId()), transactionId.toString());
+                        transactionId.getRemoteTransactionId(), clientVersion),
+                        transactionId.toString());
         } else {
             throw new IllegalArgumentException(
                 "Shard="+name + ":CreateTransaction message has unidentified transaction type="
@@ -521,10 +535,12 @@ public class Shard extends RaftActor {
 
     private void createTransaction(CreateTransaction createTransaction) {
         createTransaction(createTransaction.getTransactionType(),
-            createTransaction.getTransactionId(), createTransaction.getTransactionChainId());
+            createTransaction.getTransactionId(), createTransaction.getTransactionChainId(),
+            createTransaction.getClientVersion());
     }
 
-    private ActorRef createTransaction(int transactionType, String remoteTransactionId, String transactionChainId) {
+    private ActorRef createTransaction(int transactionType, String remoteTransactionId,
+            String transactionChainId, int clientVersion) {
 
         ShardTransactionIdentifier transactionId =
             ShardTransactionIdentifier.builder()
@@ -533,8 +549,8 @@ public class Shard extends RaftActor {
         if(LOG.isDebugEnabled()) {
             LOG.debug("Creating transaction : {} ", transactionId);
         }
-        ActorRef transactionActor =
-            createTypedTransactionActor(transactionType, transactionId, transactionChainId);
+        ActorRef transactionActor = createTypedTransactionActor(transactionType, transactionId,
+                transactionChainId, clientVersion);
 
         getSender()
             .tell(new CreateTransactionReply(
@@ -765,7 +781,8 @@ public class Shard extends RaftActor {
             // so that this actor does not get block building the snapshot
             createSnapshotTransaction = createTransaction(
                 TransactionProxy.TransactionType.READ_ONLY.ordinal(),
-                "createSnapshot" + ++createSnapshotTransactionCounter, "");
+                "createSnapshot" + ++createSnapshotTransactionCounter, "",
+                CreateTransaction.CURRENT_CLIENT_VERSION);
 
             createSnapshotTransaction.tell(
                 new ReadData(YangInstanceIdentifier.builder().build()).toSerializable(), self());
index d12e9997bb29175c7d6c414efdc72b339b706932..9d8f57252a2fc43bd316f213c9f8b68f4f4c453f 100644 (file)
@@ -26,8 +26,9 @@ public class ShardReadTransaction extends ShardTransaction {
     private final DOMStoreReadTransaction transaction;
 
     public ShardReadTransaction(DOMStoreReadTransaction transaction, ActorRef shardActor,
-            SchemaContext schemaContext, ShardStats shardStats, String transactionID) {
-        super(shardActor, schemaContext, shardStats, transactionID);
+            SchemaContext schemaContext, ShardStats shardStats, String transactionID,
+            int txnClientVersion) {
+        super(shardActor, schemaContext, shardStats, transactionID, txnClientVersion);
         this.transaction = transaction;
     }
 
index b1fd02d2172cc4c28679c6e4ae86cd31cf2a2657..e558677ebbaba5598d1ca84875a6402764887f34 100644 (file)
@@ -26,8 +26,9 @@ public class ShardReadWriteTransaction extends ShardWriteTransaction {
     private final DOMStoreReadWriteTransaction transaction;
 
     public ShardReadWriteTransaction(DOMStoreReadWriteTransaction transaction, ActorRef shardActor,
-            SchemaContext schemaContext, ShardStats shardStats, String transactionID) {
-        super(transaction, shardActor, schemaContext, shardStats, transactionID);
+            SchemaContext schemaContext, ShardStats shardStats, String transactionID,
+            int txnClientVersion) {
+        super(transaction, shardActor, schemaContext, shardStats, transactionID, txnClientVersion);
         this.transaction = transaction;
     }
 
index 32de47f451d9ff53f301339975357440e651bea5..59bb4bfd77892e1a915563b3d435150de27c4ca1 100644 (file)
@@ -57,26 +57,29 @@ import org.opendaylight.yangtools.yang.model.api.SchemaContext;
  */
 public abstract class ShardTransaction extends AbstractUntypedActorWithMetering {
 
+    protected static final boolean SERIALIZED_REPLY = true;
+
     private final ActorRef shardActor;
     private final SchemaContext schemaContext;
     private final ShardStats shardStats;
     private final String transactionID;
-    protected static final boolean SERIALIZED_REPLY = true;
+    private final int txnClientVersion;
 
     protected ShardTransaction(ActorRef shardActor, SchemaContext schemaContext,
-            ShardStats shardStats, String transactionID) {
+            ShardStats shardStats, String transactionID, int txnClientVersion) {
         super("shard-tx"); //actor name override used for metering. This does not change the "real" actor name
         this.shardActor = shardActor;
         this.schemaContext = schemaContext;
         this.shardStats = shardStats;
         this.transactionID = transactionID;
+        this.txnClientVersion = txnClientVersion;
     }
 
     public static Props props(DOMStoreTransaction transaction, ActorRef shardActor,
             SchemaContext schemaContext,DatastoreContext datastoreContext, ShardStats shardStats,
-            String transactionID) {
+            String transactionID, int txnClientVersion) {
         return Props.create(new ShardTransactionCreator(transaction, shardActor, schemaContext,
-           datastoreContext, shardStats, transactionID));
+           datastoreContext, shardStats, transactionID, txnClientVersion));
     }
 
     protected abstract DOMStoreTransaction getDOMStoreTransaction();
@@ -93,6 +96,10 @@ public abstract class ShardTransaction extends AbstractUntypedActorWithMetering
         return schemaContext;
     }
 
+    protected int getTxnClientVersion() {
+        return txnClientVersion;
+    }
+
     @Override
     public void handleReceive(Object message) throws Exception {
         if (message.getClass().equals(CloseTransaction.SERIALIZABLE_CLASS)) {
@@ -169,16 +176,18 @@ public abstract class ShardTransaction extends AbstractUntypedActorWithMetering
         final DatastoreContext datastoreContext;
         final ShardStats shardStats;
         final String transactionID;
+        final int txnClientVersion;
 
         ShardTransactionCreator(DOMStoreTransaction transaction, ActorRef shardActor,
                 SchemaContext schemaContext, DatastoreContext datastoreContext,
-                ShardStats shardStats, String transactionID) {
+                ShardStats shardStats, String transactionID, int txnClientVersion) {
             this.transaction = transaction;
             this.shardActor = shardActor;
             this.shardStats = shardStats;
             this.schemaContext = schemaContext;
             this.datastoreContext = datastoreContext;
             this.transactionID = transactionID;
+            this.txnClientVersion = txnClientVersion;
         }
 
         @Override
@@ -186,13 +195,13 @@ public abstract class ShardTransaction extends AbstractUntypedActorWithMetering
             ShardTransaction tx;
             if(transaction instanceof DOMStoreReadWriteTransaction) {
                 tx = new ShardReadWriteTransaction((DOMStoreReadWriteTransaction)transaction,
-                        shardActor, schemaContext, shardStats, transactionID);
+                        shardActor, schemaContext, shardStats, transactionID, txnClientVersion);
             } else if(transaction instanceof DOMStoreReadTransaction) {
                 tx = new ShardReadTransaction((DOMStoreReadTransaction)transaction, shardActor,
-                        schemaContext, shardStats, transactionID);
+                        schemaContext, shardStats, transactionID, txnClientVersion);
             } else {
                 tx = new ShardWriteTransaction((DOMStoreWriteTransaction)transaction,
-                        shardActor, schemaContext, shardStats, transactionID);
+                        shardActor, schemaContext, shardStats, transactionID, txnClientVersion);
             }
 
             tx.getContext().setReceiveTimeout(datastoreContext.getShardTransactionIdleTimeout());
index 943a82f6f9f03565369062bdd8f7779bea5921f0..4bf184e9a71f0810550e3e16d9c4ec0104f95f45 100644 (file)
@@ -63,19 +63,22 @@ public class ShardTransactionChain extends AbstractUntypedActor {
             return getContext().actorOf(
                     ShardTransaction.props( chain.newReadOnlyTransaction(), getShardActor(),
                             schemaContext, datastoreContext, shardStats,
-                            createTransaction.getTransactionId()), transactionName);
+                            createTransaction.getTransactionId(),
+                            createTransaction.getClientVersion()), transactionName);
         } else if (createTransaction.getTransactionType() ==
                 TransactionProxy.TransactionType.READ_WRITE.ordinal()) {
             return getContext().actorOf(
                     ShardTransaction.props( chain.newReadWriteTransaction(), getShardActor(),
                             schemaContext, datastoreContext, shardStats,
-                            createTransaction.getTransactionId()), transactionName);
+                            createTransaction.getTransactionId(),
+                            createTransaction.getClientVersion()), transactionName);
         } else if (createTransaction.getTransactionType() ==
                 TransactionProxy.TransactionType.WRITE_ONLY.ordinal()) {
             return getContext().actorOf(
                     ShardTransaction.props( chain.newWriteOnlyTransaction(), getShardActor(),
                             schemaContext, datastoreContext, shardStats,
-                            createTransaction.getTransactionId()), transactionName);
+                            createTransaction.getTransactionId(),
+                            createTransaction.getClientVersion()), transactionName);
         } else {
             throw new IllegalArgumentException (
                     "CreateTransaction message has unidentified transaction type=" +
index b0eaf98d59c9ccd2d1eda1f6d782736db238290e..44f2c7bd0a2794715d09cb29bb7a5ce42b352344 100644 (file)
@@ -42,8 +42,9 @@ public class ShardWriteTransaction extends ShardTransaction {
     private final DOMStoreWriteTransaction transaction;
 
     public ShardWriteTransaction(DOMStoreWriteTransaction transaction, ActorRef shardActor,
-            SchemaContext schemaContext, ShardStats shardStats, String transactionID) {
-        super(shardActor, schemaContext, shardStats, transactionID);
+            SchemaContext schemaContext, ShardStats shardStats, String transactionID,
+            int txnClientVersion) {
+        super(shardActor, schemaContext, shardStats, transactionID, txnClientVersion);
         this.transaction = transaction;
     }
 
@@ -144,8 +145,8 @@ public class ShardWriteTransaction extends ShardTransaction {
 
         DOMStoreThreePhaseCommitCohort cohort =  transaction.ready();
 
-        getShardActor().forward(new ForwardedReadyTransaction(transactionID, cohort, modification,
-                returnSerialized), getContext());
+        getShardActor().forward(new ForwardedReadyTransaction(transactionID, getTxnClientVersion(),
+                cohort, modification, returnSerialized), getContext());
 
         // The shard will handle the commit from here so we're no longer needed - self-destruct.
         getSelf().tell(PoisonPill.getInstance(), getSelf());
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/compat/BackwardsCompatibleThreePhaseCommitCohort.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/compat/BackwardsCompatibleThreePhaseCommitCohort.java
new file mode 100644 (file)
index 0000000..30ab97c
--- /dev/null
@@ -0,0 +1,87 @@
+/*
+ * Copyright (c) 2014 Brocade Communications Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.datastore.compat;
+
+import org.opendaylight.controller.cluster.common.actor.AbstractUntypedActor;
+import org.opendaylight.controller.cluster.datastore.messages.AbortTransaction;
+import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransaction;
+import org.opendaylight.controller.cluster.datastore.messages.CommitTransaction;
+import org.opendaylight.controller.cluster.datastore.messages.PreCommitTransaction;
+import org.opendaylight.controller.cluster.datastore.messages.PreCommitTransactionReply;
+import akka.actor.PoisonPill;
+import akka.actor.Props;
+import akka.event.Logging;
+import akka.event.LoggingAdapter;
+import akka.japi.Creator;
+
+/**
+ * An actor to maintain backwards compatibility for the base Helium version where the 3-phase commit
+ * messages don't contain the transactionId. This actor just forwards a new message containing the
+ * transactionId to the parent Shard.
+ *
+ * @author Thomas Pantelis
+ */
+public class BackwardsCompatibleThreePhaseCommitCohort extends AbstractUntypedActor {
+
+    private final LoggingAdapter LOG = Logging.getLogger(getContext().system(), this);
+
+    private final String transactionId;
+
+    private BackwardsCompatibleThreePhaseCommitCohort(String transactionId) {
+        this.transactionId = transactionId;
+    }
+
+    @Override
+    public void handleReceive(Object message) throws Exception {
+        if(message.getClass().equals(CanCommitTransaction.SERIALIZABLE_CLASS)) {
+            LOG.debug("BackwardsCompatibleThreePhaseCommitCohort CanCommitTransaction");
+
+            getContext().parent().forward(new CanCommitTransaction(transactionId).toSerializable(),
+                    getContext());
+        } else if(message.getClass().equals(PreCommitTransaction.SERIALIZABLE_CLASS)) {
+            LOG.debug("BackwardsCompatibleThreePhaseCommitCohort PreCommitTransaction");
+
+            // The Shard doesn't need the PreCommitTransaction message so just return the reply here.
+            getSender().tell(new PreCommitTransactionReply().toSerializable(), self());
+        } else if(message.getClass().equals(CommitTransaction.SERIALIZABLE_CLASS)) {
+            LOG.debug("BackwardsCompatibleThreePhaseCommitCohort CommitTransaction");
+
+            getContext().parent().forward(new CommitTransaction(transactionId).toSerializable(),
+                    getContext());
+
+            // We're done now - we can self-destruct
+            self().tell(PoisonPill.getInstance(), self());
+        } else if(message.getClass().equals(AbortTransaction.SERIALIZABLE_CLASS)) {
+            LOG.debug("BackwardsCompatibleThreePhaseCommitCohort AbortTransaction");
+
+            getContext().parent().forward(new AbortTransaction(transactionId).toSerializable(),
+                    getContext());
+            self().tell(PoisonPill.getInstance(), self());
+        }
+    }
+
+    public static Props props(String transactionId) {
+        return Props.create(new BackwardsCompatibleThreePhaseCommitCohortCreator(transactionId));
+    }
+
+    private static class BackwardsCompatibleThreePhaseCommitCohortCreator
+                                  implements Creator<BackwardsCompatibleThreePhaseCommitCohort> {
+        private static final long serialVersionUID = 1L;
+
+        private final String transactionId;
+
+        BackwardsCompatibleThreePhaseCommitCohortCreator(String transactionId) {
+            this.transactionId = transactionId;
+        }
+
+        @Override
+        public BackwardsCompatibleThreePhaseCommitCohort create() throws Exception {
+            return new BackwardsCompatibleThreePhaseCommitCohort(transactionId);
+        }
+    }
+}
index 361d406ac80dda74fe33950ace971ece600b3d7a..cbb881f7884733c141d536cd00093acc9e8382f4 100644 (file)
@@ -13,24 +13,32 @@ import org.opendaylight.controller.protobuff.messages.transaction.ShardTransacti
 
 
 public class CreateTransaction implements SerializableMessage {
-    public static final Class SERIALIZABLE_CLASS = ShardTransactionMessages.CreateTransaction.class;
+    public static final Class<ShardTransactionMessages.CreateTransaction> SERIALIZABLE_CLASS =
+            ShardTransactionMessages.CreateTransaction.class;
+
+    public static final int CURRENT_CLIENT_VERSION = 1;
+
     private final String transactionId;
     private final int transactionType;
     private final String transactionChainId;
+    private final int clientVersion;
 
     public CreateTransaction(String transactionId, int transactionType) {
         this(transactionId, transactionType, "");
     }
 
     public CreateTransaction(String transactionId, int transactionType, String transactionChainId) {
+        this(transactionId, transactionType, transactionChainId, CURRENT_CLIENT_VERSION);
+    }
 
+    private CreateTransaction(String transactionId, int transactionType, String transactionChainId,
+            int clientVersion) {
         this.transactionId = transactionId;
         this.transactionType = transactionType;
         this.transactionChainId = transactionChainId;
-
+        this.clientVersion = clientVersion;
     }
 
-
     public String getTransactionId() {
         return transactionId;
     }
@@ -39,19 +47,25 @@ public class CreateTransaction implements SerializableMessage {
         return transactionType;
     }
 
+    public int getClientVersion() {
+        return clientVersion;
+    }
+
     @Override
     public Object toSerializable() {
         return ShardTransactionMessages.CreateTransaction.newBuilder()
             .setTransactionId(transactionId)
             .setTransactionType(transactionType)
-            .setTransactionChainId(transactionChainId).build();
+            .setTransactionChainId(transactionChainId)
+            .setMessageVersion(clientVersion).build();
     }
 
     public static CreateTransaction fromSerializable(Object message) {
         ShardTransactionMessages.CreateTransaction createTransaction =
             (ShardTransactionMessages.CreateTransaction) message;
         return new CreateTransaction(createTransaction.getTransactionId(),
-            createTransaction.getTransactionType(), createTransaction.getTransactionChainId());
+            createTransaction.getTransactionType(), createTransaction.getTransactionChainId(),
+            createTransaction.getMessageVersion());
     }
 
     public String getTransactionChainId() {
index 180108f2186b6efbab2977f1d54cf33811a4c638..38886c9a583c500e4fc9b86f54422c001482e777 100644 (file)
@@ -20,14 +20,16 @@ public class ForwardedReadyTransaction {
     private final DOMStoreThreePhaseCommitCohort cohort;
     private final Modification modification;
     private final boolean returnSerialized;
+    private final int txnClientVersion;
 
-    public ForwardedReadyTransaction(String transactionID, DOMStoreThreePhaseCommitCohort cohort,
-            Modification modification, boolean returnSerialized) {
+    public ForwardedReadyTransaction(String transactionID, int txnClientVersion,
+            DOMStoreThreePhaseCommitCohort cohort, Modification modification,
+            boolean returnSerialized) {
         this.transactionID = transactionID;
         this.cohort = cohort;
         this.modification = modification;
         this.returnSerialized = returnSerialized;
-
+        this.txnClientVersion = txnClientVersion;
     }
 
     public String getTransactionID() {
@@ -45,4 +47,8 @@ public class ForwardedReadyTransaction {
     public boolean isReturnSerialized() {
         return returnSerialized;
     }
+
+    public int getTxnClientVersion() {
+        return txnClientVersion;
+    }
 }
index cd8a65844756b51820c5f6fb93dfe62a53071d31..557a75a075e80cc8979acf35d3242ba16c5c2481 100644 (file)
@@ -1,21 +1,26 @@
 package org.opendaylight.controller.cluster.datastore;
 
-import akka.actor.ActorRef;
-import akka.actor.PoisonPill;
-import akka.actor.Props;
-import akka.dispatch.Dispatchers;
-import akka.dispatch.OnComplete;
-import akka.japi.Creator;
-import akka.pattern.Patterns;
-import akka.testkit.TestActorRef;
-import akka.util.Timeout;
-import com.google.common.base.Function;
-import com.google.common.base.Optional;
-import com.google.common.util.concurrent.CheckedFuture;
-import com.google.common.util.concurrent.Futures;
-import com.google.common.util.concurrent.ListenableFuture;
-import com.google.common.util.concurrent.MoreExecutors;
-import com.google.common.util.concurrent.Uninterruptibles;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.inOrder;
+import static org.mockito.Mockito.mock;
+import static org.opendaylight.controller.cluster.datastore.messages.CreateTransaction.CURRENT_CLIENT_VERSION;
+import java.io.IOException;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
@@ -79,28 +84,22 @@ import org.opendaylight.yangtools.yang.model.api.SchemaContext;
 import scala.concurrent.Await;
 import scala.concurrent.Future;
 import scala.concurrent.duration.FiniteDuration;
-
-import java.io.IOException;
-import java.util.Collections;
-import java.util.HashSet;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.atomic.AtomicReference;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertNull;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
-import static org.mockito.Mockito.doAnswer;
-import static org.mockito.Mockito.doReturn;
-import static org.mockito.Mockito.inOrder;
-import static org.mockito.Mockito.mock;
+import akka.actor.ActorRef;
+import akka.actor.PoisonPill;
+import akka.actor.Props;
+import akka.dispatch.Dispatchers;
+import akka.dispatch.OnComplete;
+import akka.japi.Creator;
+import akka.pattern.Patterns;
+import akka.testkit.TestActorRef;
+import akka.util.Timeout;
+import com.google.common.base.Function;
+import com.google.common.base.Optional;
+import com.google.common.util.concurrent.CheckedFuture;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.MoreExecutors;
+import com.google.common.util.concurrent.Uninterruptibles;
 
 
 public class ShardTest extends AbstractActorTest {
@@ -611,7 +610,8 @@ public class ShardTest extends AbstractActorTest {
             // Simulate the ForwardedReadyTransaction message for the first Tx that would be sent
             // by the ShardTransaction.
 
-            shard.tell(new ForwardedReadyTransaction(transactionID1, cohort1, modification1, true), getRef());
+            shard.tell(new ForwardedReadyTransaction(transactionID1, CURRENT_CLIENT_VERSION,
+                    cohort1, modification1, true), getRef());
             ReadyTransactionReply readyReply = ReadyTransactionReply.fromSerializable(
                     expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS));
             assertEquals("Cohort path", shard.path().toString(), readyReply.getCohortPath());
@@ -625,10 +625,12 @@ public class ShardTest extends AbstractActorTest {
 
             // Send the ForwardedReadyTransaction for the next 2 Tx's.
 
-            shard.tell(new ForwardedReadyTransaction(transactionID2, cohort2, modification2, true), getRef());
+            shard.tell(new ForwardedReadyTransaction(transactionID2, CURRENT_CLIENT_VERSION,
+                    cohort2, modification2, true), getRef());
             expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS);
 
-            shard.tell(new ForwardedReadyTransaction(transactionID3, cohort3, modification3, true), getRef());
+            shard.tell(new ForwardedReadyTransaction(transactionID3, CURRENT_CLIENT_VERSION,
+                    cohort3, modification3, true), getRef());
             expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS);
 
             // Send the CanCommitTransaction message for the next 2 Tx's. These should get queued and
@@ -792,10 +794,12 @@ public class ShardTest extends AbstractActorTest {
             // Simulate the ForwardedReadyTransaction messages that would be sent
             // by the ShardTransaction.
 
-            shard.tell(new ForwardedReadyTransaction(transactionID1, cohort1, modification1, true), getRef());
+            shard.tell(new ForwardedReadyTransaction(transactionID1, CURRENT_CLIENT_VERSION,
+                    cohort1, modification1, true), getRef());
             expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS);
 
-            shard.tell(new ForwardedReadyTransaction(transactionID2, cohort2, modification2, true), getRef());
+            shard.tell(new ForwardedReadyTransaction(transactionID2, CURRENT_CLIENT_VERSION,
+                    cohort2, modification2, true), getRef());
             expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS);
 
             // Send the CanCommitTransaction message for the first Tx.
@@ -859,7 +863,8 @@ public class ShardTest extends AbstractActorTest {
             // Simulate the ForwardedReadyTransaction messages that would be sent
             // by the ShardTransaction.
 
-            shard.tell(new ForwardedReadyTransaction(transactionID, cohort, modification, true), getRef());
+            shard.tell(new ForwardedReadyTransaction(transactionID, CURRENT_CLIENT_VERSION,
+                    cohort, modification, true), getRef());
             expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS);
 
             // Send the CanCommitTransaction message.
@@ -902,7 +907,8 @@ public class ShardTest extends AbstractActorTest {
             // Simulate the ForwardedReadyTransaction messages that would be sent
             // by the ShardTransaction.
 
-            shard.tell(new ForwardedReadyTransaction(transactionID, cohort, modification, true), getRef());
+            shard.tell(new ForwardedReadyTransaction(transactionID, CURRENT_CLIENT_VERSION,
+                    cohort, modification, true), getRef());
             expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS);
 
             // Send the CanCommitTransaction message.
@@ -954,7 +960,8 @@ public class ShardTest extends AbstractActorTest {
                     TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME),
                     modification, preCommit);
 
-            shard.tell(new ForwardedReadyTransaction(transactionID, cohort, modification, true), getRef());
+            shard.tell(new ForwardedReadyTransaction(transactionID, CURRENT_CLIENT_VERSION,
+                    cohort, modification, true), getRef());
             expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS);
 
             shard.tell(new CanCommitTransaction(transactionID).toSerializable(), getRef());
@@ -1018,10 +1025,12 @@ public class ShardTest extends AbstractActorTest {
 
             // Ready the Tx's
 
-            shard.tell(new ForwardedReadyTransaction(transactionID1, cohort1, modification1, true), getRef());
+            shard.tell(new ForwardedReadyTransaction(transactionID1, CURRENT_CLIENT_VERSION,
+                    cohort1, modification1, true), getRef());
             expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS);
 
-            shard.tell(new ForwardedReadyTransaction(transactionID2, cohort2, modification2, true), getRef());
+            shard.tell(new ForwardedReadyTransaction(transactionID2, CURRENT_CLIENT_VERSION,
+                    cohort2, modification2, true), getRef());
             expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS);
 
             // canCommit 1st Tx. We don't send the commit so it should timeout.
@@ -1080,13 +1089,16 @@ public class ShardTest extends AbstractActorTest {
 
             // Ready the Tx's
 
-            shard.tell(new ForwardedReadyTransaction(transactionID1, cohort1, modification1, true), getRef());
+            shard.tell(new ForwardedReadyTransaction(transactionID1, CURRENT_CLIENT_VERSION,
+                    cohort1, modification1, true), getRef());
             expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS);
 
-            shard.tell(new ForwardedReadyTransaction(transactionID2, cohort2, modification2, true), getRef());
+            shard.tell(new ForwardedReadyTransaction(transactionID2, CURRENT_CLIENT_VERSION,
+                    cohort2, modification2, true), getRef());
             expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS);
 
-            shard.tell(new ForwardedReadyTransaction(transactionID3, cohort3, modification3, true), getRef());
+            shard.tell(new ForwardedReadyTransaction(transactionID3, CURRENT_CLIENT_VERSION,
+                    cohort3, modification3, true), getRef());
             expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS);
 
             // canCommit 1st Tx.
@@ -1149,10 +1161,12 @@ public class ShardTest extends AbstractActorTest {
             // Simulate the ForwardedReadyTransaction messages that would be sent
             // by the ShardTransaction.
 
-            shard.tell(new ForwardedReadyTransaction(transactionID1, cohort1, modification1, true), getRef());
+            shard.tell(new ForwardedReadyTransaction(transactionID1, CURRENT_CLIENT_VERSION,
+                    cohort1, modification1, true), getRef());
             expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS);
 
-            shard.tell(new ForwardedReadyTransaction(transactionID2, cohort2, modification2, true), getRef());
+            shard.tell(new ForwardedReadyTransaction(transactionID2, CURRENT_CLIENT_VERSION,
+                    cohort2, modification2, true), getRef());
             expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS);
 
             // Send the CanCommitTransaction message for the first Tx.
@@ -1345,7 +1359,7 @@ public class ShardTest extends AbstractActorTest {
         };
     }
 
-    private NormalizedNode<?,?> readStore(TestActorRef<Shard> shard, YangInstanceIdentifier id)
+    static NormalizedNode<?,?> readStore(TestActorRef<Shard> shard, YangInstanceIdentifier id)
             throws ExecutionException, InterruptedException {
         DOMStoreReadTransaction transaction = shard.underlyingActor().getDataStore().newReadOnlyTransaction();
 
index 6375e3c7fba51fa02c077ac49d01def5eed5d22a..261bfeffbac5020c72d82f0199c602da1c4be303 100644 (file)
 
 package org.opendaylight.controller.cluster.datastore;
 
-import akka.actor.ActorRef;
-import akka.actor.Props;
-import akka.pattern.AskTimeoutException;
-import akka.testkit.TestActorRef;
-import com.google.common.util.concurrent.ListeningExecutorService;
-import com.google.common.util.concurrent.MoreExecutors;
+import java.util.Collections;
+import java.util.concurrent.TimeUnit;
 import org.junit.BeforeClass;
 import org.junit.Test;
 import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
 import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shard.ShardStats;
+import org.opendaylight.controller.cluster.datastore.messages.CreateTransaction;
 import org.opendaylight.controller.cluster.datastore.node.utils.serialization.NormalizedNodeSerializer;
 import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
 import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
@@ -32,8 +29,12 @@ import org.opendaylight.yangtools.yang.model.api.SchemaContext;
 import scala.concurrent.Await;
 import scala.concurrent.Future;
 import scala.concurrent.duration.Duration;
-import java.util.Collections;
-import java.util.concurrent.TimeUnit;
+import akka.actor.ActorRef;
+import akka.actor.Props;
+import akka.pattern.AskTimeoutException;
+import akka.testkit.TestActorRef;
+import com.google.common.util.concurrent.ListeningExecutorService;
+import com.google.common.util.concurrent.MoreExecutors;
 
 /**
  * Covers negative test cases
@@ -75,7 +76,8 @@ public class ShardTransactionFailureTest extends AbstractActorTest {
 
         final ActorRef shard = createShard();
         final Props props = ShardTransaction.props(store.newReadOnlyTransaction(), shard,
-                testSchemaContext, datastoreContext, shardStats, "txn");
+                testSchemaContext, datastoreContext, shardStats, "txn",
+                CreateTransaction.CURRENT_CLIENT_VERSION);
 
         final TestActorRef<ShardTransaction> subject = TestActorRef
             .create(getSystem(), props,
@@ -104,7 +106,8 @@ public class ShardTransactionFailureTest extends AbstractActorTest {
 
         final ActorRef shard = createShard();
         final Props props = ShardTransaction.props(store.newReadWriteTransaction(), shard,
-                testSchemaContext, datastoreContext, shardStats, "txn");
+                testSchemaContext, datastoreContext, shardStats, "txn",
+                CreateTransaction.CURRENT_CLIENT_VERSION);
 
         final TestActorRef<ShardTransaction> subject = TestActorRef
             .create(getSystem(), props,
@@ -133,7 +136,8 @@ public class ShardTransactionFailureTest extends AbstractActorTest {
 
         final ActorRef shard = createShard();
         final Props props = ShardTransaction.props(store.newReadWriteTransaction(), shard,
-                testSchemaContext, datastoreContext, shardStats, "txn");
+                testSchemaContext, datastoreContext, shardStats, "txn",
+                CreateTransaction.CURRENT_CLIENT_VERSION);
 
         final TestActorRef<ShardTransaction> subject = TestActorRef
             .create(getSystem(), props,
@@ -162,7 +166,8 @@ public class ShardTransactionFailureTest extends AbstractActorTest {
 
         final ActorRef shard = createShard();
         final Props props = ShardTransaction.props(store.newWriteOnlyTransaction(), shard,
-                testSchemaContext, datastoreContext, shardStats, "txn");
+                testSchemaContext, datastoreContext, shardStats, "txn",
+                CreateTransaction.CURRENT_CLIENT_VERSION);
 
         final TestActorRef<ShardTransaction> subject = TestActorRef
             .create(getSystem(), props,
@@ -194,7 +199,8 @@ public class ShardTransactionFailureTest extends AbstractActorTest {
 
         final ActorRef shard = createShard();
         final Props props = ShardTransaction.props(store.newReadWriteTransaction(), shard,
-                testSchemaContext, datastoreContext, shardStats, "txn");
+                testSchemaContext, datastoreContext, shardStats, "txn",
+                CreateTransaction.CURRENT_CLIENT_VERSION);
 
         final TestActorRef<ShardTransaction> subject = TestActorRef
             .create(getSystem(), props,
@@ -231,7 +237,8 @@ public class ShardTransactionFailureTest extends AbstractActorTest {
 
         final ActorRef shard = createShard();
         final Props props = ShardTransaction.props(store.newReadWriteTransaction(), shard,
-                testSchemaContext, datastoreContext, shardStats, "txn");
+                testSchemaContext, datastoreContext, shardStats, "txn",
+                CreateTransaction.CURRENT_CLIENT_VERSION);
 
         final TestActorRef<ShardTransaction> subject = TestActorRef
             .create(getSystem(), props, "testNegativeMergeTransactionReady");
@@ -263,7 +270,8 @@ public class ShardTransactionFailureTest extends AbstractActorTest {
 
         final ActorRef shard = createShard();
         final Props props = ShardTransaction.props(store.newReadWriteTransaction(), shard,
-                testSchemaContext, datastoreContext, shardStats, "txn");
+                testSchemaContext, datastoreContext, shardStats, "txn",
+                CreateTransaction.CURRENT_CLIENT_VERSION);
 
         final TestActorRef<ShardTransaction> subject = TestActorRef
             .create(getSystem(), props,
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTransactionHeliumBackwardsCompatibilityTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTransactionHeliumBackwardsCompatibilityTest.java
new file mode 100644 (file)
index 0000000..af07aee
--- /dev/null
@@ -0,0 +1,184 @@
+/*
+ * Copyright (c) 2014 Brocade Communications Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.datastore;
+
+import java.util.Collections;
+import org.junit.Assert;
+import org.junit.Test;
+import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
+import org.opendaylight.controller.cluster.datastore.messages.AbortTransactionReply;
+import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransactionReply;
+import org.opendaylight.controller.cluster.datastore.messages.CommitTransactionReply;
+import org.opendaylight.controller.cluster.datastore.messages.PreCommitTransactionReply;
+import org.opendaylight.controller.cluster.datastore.messages.ReadyTransaction;
+import org.opendaylight.controller.cluster.datastore.messages.ReadyTransactionReply;
+import org.opendaylight.controller.cluster.datastore.messages.WriteData;
+import org.opendaylight.controller.cluster.datastore.messages.WriteDataReply;
+import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
+import org.opendaylight.controller.protobuff.messages.cohort3pc.ThreePhaseCommitCohortMessages;
+import org.opendaylight.controller.protobuff.messages.transaction.ShardTransactionMessages;
+import org.opendaylight.controller.protobuff.messages.transaction.ShardTransactionMessages.CreateTransactionReply;
+import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
+import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
+import org.opendaylight.yangtools.yang.model.api.SchemaContext;
+import scala.concurrent.duration.FiniteDuration;
+import akka.actor.ActorRef;
+import akka.actor.ActorSelection;
+import akka.actor.PoisonPill;
+import akka.actor.Props;
+import akka.dispatch.Dispatchers;
+import akka.testkit.TestActorRef;
+
+/**
+ * Tests backwards compatibility support from Helium-1 to Helium.
+ *
+ * In Helium-1, the 3-phase commit support was moved from the ThreePhaseCommitCohort actor to the
+ * Shard. As a consequence, a new transactionId field was added to the CanCommitTransaction,
+ * CommitTransaction and AbortTransaction messages. With a base Helium version node, these messages
+ * would be sans transactionId so this test verifies the Shard handles that properly.
+ *
+ * @author Thomas Pantelis
+ */
+public class ShardTransactionHeliumBackwardsCompatibilityTest extends AbstractActorTest {
+
+    @Test
+    public void testTransactionCommit() throws Exception {
+        new ShardTestKit(getSystem()) {{
+            SchemaContext schemaContext = TestModel.createTestContext();
+            Props shardProps = Shard.props(ShardIdentifier.builder().memberName("member-1").
+                    shardName("inventory").type("config").build(),
+                    Collections.<ShardIdentifier,String>emptyMap(),
+                    DatastoreContext.newBuilder().shardHeartbeatIntervalInMillis(100).build(),
+                    schemaContext).withDispatcher(Dispatchers.DefaultDispatcherId());
+
+            final TestActorRef<Shard> shard = TestActorRef.create(getSystem(), shardProps,
+                    "testTransactionCommit");
+
+            waitUntilLeader(shard);
+
+            // Send CreateTransaction message with no messages version
+
+            String transactionID = "txn-1";
+            shard.tell(ShardTransactionMessages.CreateTransaction.newBuilder()
+                    .setTransactionId(transactionID)
+                    .setTransactionType(TransactionProxy.TransactionType.WRITE_ONLY.ordinal())
+                    .setTransactionChainId("").build(), getRef());
+
+            final FiniteDuration duration = duration("5 seconds");
+
+            CreateTransactionReply reply = expectMsgClass(duration, CreateTransactionReply.class);
+
+            ActorSelection txActor = getSystem().actorSelection(reply.getTransactionActorPath());
+
+            // Write data to the Tx
+
+            txActor.tell(new WriteData(TestModel.TEST_PATH,
+                    ImmutableNodes.containerNode(TestModel.TEST_QNAME), schemaContext), getRef());
+
+            expectMsgClass(duration, WriteDataReply.class);
+
+            // Ready the Tx
+
+            txActor.tell(new ReadyTransaction().toSerializable(), getRef());
+
+            ReadyTransactionReply readyReply = ReadyTransactionReply.fromSerializable(expectMsgClass(
+                    duration, ReadyTransactionReply.SERIALIZABLE_CLASS));
+
+            ActorSelection cohortActor = getSystem().actorSelection(readyReply.getCohortPath());
+
+            // Send the CanCommitTransaction message with no transactionId.
+
+            cohortActor.tell(ThreePhaseCommitCohortMessages.CanCommitTransaction.newBuilder().build(),
+                    getRef());
+
+            expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS);
+
+            // Send the PreCommitTransaction message with no transactionId.
+
+            cohortActor.tell(ThreePhaseCommitCohortMessages.PreCommitTransaction.newBuilder().build(),
+                    getRef());
+
+            expectMsgClass(duration, PreCommitTransactionReply.SERIALIZABLE_CLASS);
+
+            // Send the CommitTransaction message with no transactionId.
+
+            cohortActor.tell(ThreePhaseCommitCohortMessages.CommitTransaction.newBuilder().build(),
+                    getRef());
+
+            expectMsgClass(duration, CommitTransactionReply.SERIALIZABLE_CLASS);
+
+            NormalizedNode<?, ?> node = ShardTest.readStore(shard, TestModel.TEST_PATH);
+            Assert.assertNotNull("Data not found in store", node);
+
+            shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
+        }};
+    }
+
+    @Test
+    public void testTransactionAbort() throws Exception {
+        new ShardTestKit(getSystem()) {{
+            SchemaContext schemaContext = TestModel.createTestContext();
+            Props shardProps = Shard.props(ShardIdentifier.builder().memberName("member-1").
+                    shardName("inventory").type("config").build(),
+                    Collections.<ShardIdentifier,String>emptyMap(),
+                    DatastoreContext.newBuilder().shardHeartbeatIntervalInMillis(100).build(),
+                    schemaContext).withDispatcher(Dispatchers.DefaultDispatcherId());
+
+            final TestActorRef<Shard> shard = TestActorRef.create(getSystem(), shardProps,
+                    "testTransactionAbort");
+
+            waitUntilLeader(shard);
+
+            // Send CreateTransaction message with no messages version
+
+            String transactionID = "txn-1";
+            shard.tell(ShardTransactionMessages.CreateTransaction.newBuilder()
+                    .setTransactionId(transactionID)
+                    .setTransactionType(TransactionProxy.TransactionType.WRITE_ONLY.ordinal())
+                    .setTransactionChainId("").build(), getRef());
+
+            final FiniteDuration duration = duration("5 seconds");
+
+            CreateTransactionReply reply = expectMsgClass(duration, CreateTransactionReply.class);
+
+            ActorSelection txActor = getSystem().actorSelection(reply.getTransactionActorPath());
+
+            // Write data to the Tx
+
+            txActor.tell(new WriteData(TestModel.TEST_PATH,
+                    ImmutableNodes.containerNode(TestModel.TEST_QNAME), schemaContext), getRef());
+
+            expectMsgClass(duration, WriteDataReply.class);
+
+            // Ready the Tx
+
+            txActor.tell(new ReadyTransaction().toSerializable(), getRef());
+
+            ReadyTransactionReply readyReply = ReadyTransactionReply.fromSerializable(expectMsgClass(
+                    duration, ReadyTransactionReply.SERIALIZABLE_CLASS));
+
+            ActorSelection cohortActor = getSystem().actorSelection(readyReply.getCohortPath());
+
+            // Send the CanCommitTransaction message with no transactionId.
+
+            cohortActor.tell(ThreePhaseCommitCohortMessages.CanCommitTransaction.newBuilder().build(),
+                    getRef());
+
+            expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS);
+
+            // Send the AbortTransaction message with no transactionId.
+
+            cohortActor.tell(ThreePhaseCommitCohortMessages.AbortTransaction.newBuilder().build(),
+                    getRef());
+
+            expectMsgClass(duration, AbortTransactionReply.SERIALIZABLE_CLASS);
+
+            shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
+        }};
+    }
+}
index 793df8e0ca9776f7d21c07fcbaf7788be8f562bd..cc3bb55e8571dee26cb57ef0fd145c8df9110fae 100644 (file)
@@ -1,12 +1,11 @@
 package org.opendaylight.controller.cluster.datastore;
 
-import akka.actor.ActorRef;
-import akka.actor.Props;
-import akka.actor.Terminated;
-import akka.testkit.JavaTestKit;
-import akka.testkit.TestActorRef;
-import com.google.common.util.concurrent.ListeningExecutorService;
-import com.google.common.util.concurrent.MoreExecutors;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+import java.util.Collections;
+import java.util.concurrent.TimeUnit;
 import org.junit.BeforeClass;
 import org.junit.Test;
 import org.opendaylight.controller.cluster.datastore.ShardWriteTransaction.GetCompositeModificationReply;
@@ -15,6 +14,7 @@ import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier
 import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shard.ShardStats;
 import org.opendaylight.controller.cluster.datastore.messages.CloseTransaction;
 import org.opendaylight.controller.cluster.datastore.messages.CloseTransactionReply;
+import org.opendaylight.controller.cluster.datastore.messages.CreateTransaction;
 import org.opendaylight.controller.cluster.datastore.messages.DataExists;
 import org.opendaylight.controller.cluster.datastore.messages.DataExistsReply;
 import org.opendaylight.controller.cluster.datastore.messages.DeleteData;
@@ -39,12 +39,13 @@ import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
 import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
 import scala.concurrent.duration.Duration;
-import java.util.Collections;
-import java.util.concurrent.TimeUnit;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertTrue;
+import akka.actor.ActorRef;
+import akka.actor.Props;
+import akka.actor.Terminated;
+import akka.testkit.JavaTestKit;
+import akka.testkit.TestActorRef;
+import com.google.common.util.concurrent.ListeningExecutorService;
+import com.google.common.util.concurrent.MoreExecutors;
 
 public class ShardTransactionTest extends AbstractActorTest {
     private static ListeningExecutorService storeExecutor =
@@ -78,12 +79,14 @@ public class ShardTransactionTest extends AbstractActorTest {
         new JavaTestKit(getSystem()) {{
             final ActorRef shard = createShard();
             Props props = ShardTransaction.props(store.newReadOnlyTransaction(), shard,
-                    testSchemaContext, datastoreContext, shardStats, "txn");
+                    testSchemaContext, datastoreContext, shardStats, "txn",
+                    CreateTransaction.CURRENT_CLIENT_VERSION);
 
             testOnReceiveReadData(getSystem().actorOf(props, "testReadDataRO"));
 
             props = ShardTransaction.props(store.newReadWriteTransaction(), shard,
-                    testSchemaContext, datastoreContext, shardStats, "txn");
+                    testSchemaContext, datastoreContext, shardStats, "txn",
+                    CreateTransaction.CURRENT_CLIENT_VERSION);
 
             testOnReceiveReadData(getSystem().actorOf(props, "testReadDataRW"));
         }
@@ -114,13 +117,15 @@ public class ShardTransactionTest extends AbstractActorTest {
         new JavaTestKit(getSystem()) {{
             final ActorRef shard = createShard();
             Props props = ShardTransaction.props( store.newReadOnlyTransaction(), shard,
-                    testSchemaContext, datastoreContext, shardStats, "txn");
+                    testSchemaContext, datastoreContext, shardStats, "txn",
+                    CreateTransaction.CURRENT_CLIENT_VERSION);
 
             testOnReceiveReadDataWhenDataNotFound(getSystem().actorOf(
                     props, "testReadDataWhenDataNotFoundRO"));
 
             props = ShardTransaction.props( store.newReadWriteTransaction(), shard,
-                    testSchemaContext, datastoreContext, shardStats, "txn");
+                    testSchemaContext, datastoreContext, shardStats, "txn",
+                    CreateTransaction.CURRENT_CLIENT_VERSION);
 
             testOnReceiveReadDataWhenDataNotFound(getSystem().actorOf(
                     props, "testReadDataWhenDataNotFoundRW"));
@@ -150,12 +155,14 @@ public class ShardTransactionTest extends AbstractActorTest {
         new JavaTestKit(getSystem()) {{
             final ActorRef shard = createShard();
             Props props = ShardTransaction.props(store.newReadOnlyTransaction(), shard,
-                    testSchemaContext, datastoreContext, shardStats, "txn");
+                    testSchemaContext, datastoreContext, shardStats, "txn",
+                    CreateTransaction.CURRENT_CLIENT_VERSION);
 
             testOnReceiveDataExistsPositive(getSystem().actorOf(props, "testDataExistsPositiveRO"));
 
             props = ShardTransaction.props(store.newReadWriteTransaction(), shard,
-                    testSchemaContext, datastoreContext, shardStats, "txn");
+                    testSchemaContext, datastoreContext, shardStats, "txn",
+                    CreateTransaction.CURRENT_CLIENT_VERSION);
 
             testOnReceiveDataExistsPositive(getSystem().actorOf(props, "testDataExistsPositiveRW"));
         }
@@ -183,12 +190,14 @@ public class ShardTransactionTest extends AbstractActorTest {
         new JavaTestKit(getSystem()) {{
             final ActorRef shard = createShard();
             Props props = ShardTransaction.props(store.newReadOnlyTransaction(), shard,
-                    testSchemaContext, datastoreContext, shardStats, "txn");
+                    testSchemaContext, datastoreContext, shardStats, "txn",
+                    CreateTransaction.CURRENT_CLIENT_VERSION);
 
             testOnReceiveDataExistsNegative(getSystem().actorOf(props, "testDataExistsNegativeRO"));
 
             props = ShardTransaction.props(store.newReadWriteTransaction(), shard,
-                    testSchemaContext, datastoreContext, shardStats, "txn");
+                    testSchemaContext, datastoreContext, shardStats, "txn",
+                    CreateTransaction.CURRENT_CLIENT_VERSION);
 
             testOnReceiveDataExistsNegative(getSystem().actorOf(props, "testDataExistsNegativeRW"));
         }
@@ -228,7 +237,8 @@ public class ShardTransactionTest extends AbstractActorTest {
         new JavaTestKit(getSystem()) {{
             final ActorRef shard = createShard();
             final Props props = ShardTransaction.props(store.newWriteOnlyTransaction(), shard,
-                    testSchemaContext, datastoreContext, shardStats, "txn");
+                    testSchemaContext, datastoreContext, shardStats, "txn",
+                    CreateTransaction.CURRENT_CLIENT_VERSION);
             final ActorRef transaction = getSystem().actorOf(props, "testWriteData");
 
             transaction.tell(new WriteData(TestModel.TEST_PATH,
@@ -254,7 +264,8 @@ public class ShardTransactionTest extends AbstractActorTest {
         new JavaTestKit(getSystem()) {{
             final ActorRef shard = createShard();
             final Props props = ShardTransaction.props(store.newReadWriteTransaction(), shard,
-                    testSchemaContext, datastoreContext, shardStats, "txn");
+                    testSchemaContext, datastoreContext, shardStats, "txn",
+                    CreateTransaction.CURRENT_CLIENT_VERSION);
             final ActorRef transaction = getSystem().actorOf(props, "testMergeData");
 
             transaction.tell(new MergeData(TestModel.TEST_PATH,
@@ -279,7 +290,8 @@ public class ShardTransactionTest extends AbstractActorTest {
         new JavaTestKit(getSystem()) {{
             final ActorRef shard = createShard();
             final Props props = ShardTransaction.props( store.newWriteOnlyTransaction(), shard,
-                    testSchemaContext, datastoreContext, shardStats, "txn");
+                    testSchemaContext, datastoreContext, shardStats, "txn",
+                    CreateTransaction.CURRENT_CLIENT_VERSION);
             final ActorRef transaction = getSystem().actorOf(props, "testDeleteData");
 
             transaction.tell(new DeleteData(TestModel.TEST_PATH).toSerializable(), getRef());
@@ -301,7 +313,8 @@ public class ShardTransactionTest extends AbstractActorTest {
         new JavaTestKit(getSystem()) {{
             final ActorRef shard = createShard();
             final Props props = ShardTransaction.props( store.newReadWriteTransaction(), shard,
-                    testSchemaContext, datastoreContext, shardStats, "txn");
+                    testSchemaContext, datastoreContext, shardStats, "txn",
+                    CreateTransaction.CURRENT_CLIENT_VERSION);
             final ActorRef transaction = getSystem().actorOf(props, "testReadyTransaction");
 
             watch(transaction);
@@ -318,7 +331,8 @@ public class ShardTransactionTest extends AbstractActorTest {
         new JavaTestKit(getSystem()) {{
             final ActorRef shard = createShard();
             final Props props = ShardTransaction.props( store.newReadWriteTransaction(), shard,
-                testSchemaContext, datastoreContext, shardStats, "txn");
+                testSchemaContext, datastoreContext, shardStats, "txn",
+                CreateTransaction.CURRENT_CLIENT_VERSION);
             final ActorRef transaction = getSystem().actorOf(props, "testReadyTransaction2");
 
             watch(transaction);
@@ -339,7 +353,8 @@ public class ShardTransactionTest extends AbstractActorTest {
         new JavaTestKit(getSystem()) {{
             final ActorRef shard = createShard();
             final Props props = ShardTransaction.props(store.newReadWriteTransaction(), shard,
-                    testSchemaContext, datastoreContext, shardStats, "txn");
+                    testSchemaContext, datastoreContext, shardStats, "txn",
+                    CreateTransaction.CURRENT_CLIENT_VERSION);
             final ActorRef transaction = getSystem().actorOf(props, "testCloseTransaction");
 
             watch(transaction);
@@ -355,7 +370,8 @@ public class ShardTransactionTest extends AbstractActorTest {
     public void testNegativePerformingWriteOperationOnReadTransaction() throws Exception {
         final ActorRef shard = createShard();
         final Props props = ShardTransaction.props(store.newReadOnlyTransaction(), shard,
-                testSchemaContext, datastoreContext, shardStats, "txn");
+                testSchemaContext, datastoreContext, shardStats, "txn",
+                CreateTransaction.CURRENT_CLIENT_VERSION);
         final TestActorRef<ShardTransaction> transaction = TestActorRef.apply(props,getSystem());
 
         transaction.receive(new DeleteData(TestModel.TEST_PATH).toSerializable(), ActorRef.noSender());
@@ -370,7 +386,8 @@ public class ShardTransactionTest extends AbstractActorTest {
         new JavaTestKit(getSystem()) {{
             final ActorRef shard = createShard();
             final Props props = ShardTransaction.props(store.newReadWriteTransaction(), shard,
-                    testSchemaContext, datastoreContext, shardStats, "txn");
+                    testSchemaContext, datastoreContext, shardStats, "txn",
+                    CreateTransaction.CURRENT_CLIENT_VERSION);
             final ActorRef transaction =
                 getSystem().actorOf(props, "testShardTransactionInactivity");