From: tpantelis Date: Thu, 30 Oct 2014 11:30:32 +0000 (-0400) Subject: Bug 2294: Handle Shard backwards compatibility X-Git-Tag: release/helium-sr1~13 X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=commitdiff_plain;h=d4e21afd4eaca76118f111db1d1ee569ba72d22c;p=controller.git Bug 2294: Handle Shard backwards compatibility Implemented as outlined in Bug 2294. Change-Id: I14aa8ef5f320f9d165492396ece4ea63cce9b0c3 Signed-off-by: tpantelis --- diff --git a/opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/protobuff/messages/cohort3pc/ThreePhaseCommitCohortMessages.java b/opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/protobuff/messages/cohort3pc/ThreePhaseCommitCohortMessages.java index e43b44582d..2a79a5b827 100644 --- a/opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/protobuff/messages/cohort3pc/ThreePhaseCommitCohortMessages.java +++ b/opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/protobuff/messages/cohort3pc/ThreePhaseCommitCohortMessages.java @@ -11,17 +11,17 @@ public final class ThreePhaseCommitCohortMessages { public interface CanCommitTransactionOrBuilder extends com.google.protobuf.MessageOrBuilder { - // required string transactionId = 1; + // optional string transactionId = 1; /** - * required string transactionId = 1; + * optional string transactionId = 1; */ boolean hasTransactionId(); /** - * required string transactionId = 1; + * optional string transactionId = 1; */ java.lang.String getTransactionId(); /** - * required string transactionId = 1; + * optional string transactionId = 1; */ com.google.protobuf.ByteString getTransactionIdBytes(); @@ -122,17 +122,17 @@ public final class ThreePhaseCommitCohortMessages { } private int bitField0_; - // required string transactionId = 1; + // optional string transactionId = 1; public static final int TRANSACTIONID_FIELD_NUMBER = 1; private java.lang.Object transactionId_; /** - * required string transactionId = 1; + * optional string transactionId = 1; */ public boolean hasTransactionId() { return ((bitField0_ & 0x00000001) == 0x00000001); } /** - * required string transactionId = 1; + * optional string transactionId = 1; */ public java.lang.String getTransactionId() { java.lang.Object ref = transactionId_; @@ -149,7 +149,7 @@ public final class ThreePhaseCommitCohortMessages { } } /** - * required string transactionId = 1; + * optional string transactionId = 1; */ public com.google.protobuf.ByteString getTransactionIdBytes() { @@ -173,10 +173,6 @@ public final class ThreePhaseCommitCohortMessages { byte isInitialized = memoizedIsInitialized; if (isInitialized != -1) return isInitialized == 1; - if (!hasTransactionId()) { - memoizedIsInitialized = 0; - return false; - } memoizedIsInitialized = 1; return true; } @@ -376,10 +372,6 @@ public final class ThreePhaseCommitCohortMessages { } public final boolean isInitialized() { - if (!hasTransactionId()) { - - return false; - } return true; } @@ -402,16 +394,16 @@ public final class ThreePhaseCommitCohortMessages { } private int bitField0_; - // required string transactionId = 1; + // optional string transactionId = 1; private java.lang.Object transactionId_ = ""; /** - * required string transactionId = 1; + * optional string transactionId = 1; */ public boolean hasTransactionId() { return ((bitField0_ & 0x00000001) == 0x00000001); } /** - * required string transactionId = 1; + * optional string transactionId = 1; */ public java.lang.String getTransactionId() { java.lang.Object ref = transactionId_; @@ -425,7 +417,7 @@ public final class ThreePhaseCommitCohortMessages { } } /** - * required string transactionId = 1; + * optional string transactionId = 1; */ public com.google.protobuf.ByteString getTransactionIdBytes() { @@ -441,7 +433,7 @@ public final class ThreePhaseCommitCohortMessages { } } /** - * required string transactionId = 1; + * optional string transactionId = 1; */ public Builder setTransactionId( java.lang.String value) { @@ -454,7 +446,7 @@ public final class ThreePhaseCommitCohortMessages { return this; } /** - * required string transactionId = 1; + * optional string transactionId = 1; */ public Builder clearTransactionId() { bitField0_ = (bitField0_ & ~0x00000001); @@ -463,7 +455,7 @@ public final class ThreePhaseCommitCohortMessages { return this; } /** - * required string transactionId = 1; + * optional string transactionId = 1; */ public Builder setTransactionIdBytes( com.google.protobuf.ByteString value) { @@ -894,17 +886,17 @@ public final class ThreePhaseCommitCohortMessages { public interface AbortTransactionOrBuilder extends com.google.protobuf.MessageOrBuilder { - // required string transactionId = 1; + // optional string transactionId = 1; /** - * required string transactionId = 1; + * optional string transactionId = 1; */ boolean hasTransactionId(); /** - * required string transactionId = 1; + * optional string transactionId = 1; */ java.lang.String getTransactionId(); /** - * required string transactionId = 1; + * optional string transactionId = 1; */ com.google.protobuf.ByteString getTransactionIdBytes(); @@ -1005,17 +997,17 @@ public final class ThreePhaseCommitCohortMessages { } private int bitField0_; - // required string transactionId = 1; + // optional string transactionId = 1; public static final int TRANSACTIONID_FIELD_NUMBER = 1; private java.lang.Object transactionId_; /** - * required string transactionId = 1; + * optional string transactionId = 1; */ public boolean hasTransactionId() { return ((bitField0_ & 0x00000001) == 0x00000001); } /** - * required string transactionId = 1; + * optional string transactionId = 1; */ public java.lang.String getTransactionId() { java.lang.Object ref = transactionId_; @@ -1032,7 +1024,7 @@ public final class ThreePhaseCommitCohortMessages { } } /** - * required string transactionId = 1; + * optional string transactionId = 1; */ public com.google.protobuf.ByteString getTransactionIdBytes() { @@ -1056,10 +1048,6 @@ public final class ThreePhaseCommitCohortMessages { byte isInitialized = memoizedIsInitialized; if (isInitialized != -1) return isInitialized == 1; - if (!hasTransactionId()) { - memoizedIsInitialized = 0; - return false; - } memoizedIsInitialized = 1; return true; } @@ -1259,10 +1247,6 @@ public final class ThreePhaseCommitCohortMessages { } public final boolean isInitialized() { - if (!hasTransactionId()) { - - return false; - } return true; } @@ -1285,16 +1269,16 @@ public final class ThreePhaseCommitCohortMessages { } private int bitField0_; - // required string transactionId = 1; + // optional string transactionId = 1; private java.lang.Object transactionId_ = ""; /** - * required string transactionId = 1; + * optional string transactionId = 1; */ public boolean hasTransactionId() { return ((bitField0_ & 0x00000001) == 0x00000001); } /** - * required string transactionId = 1; + * optional string transactionId = 1; */ public java.lang.String getTransactionId() { java.lang.Object ref = transactionId_; @@ -1308,7 +1292,7 @@ public final class ThreePhaseCommitCohortMessages { } } /** - * required string transactionId = 1; + * optional string transactionId = 1; */ public com.google.protobuf.ByteString getTransactionIdBytes() { @@ -1324,7 +1308,7 @@ public final class ThreePhaseCommitCohortMessages { } } /** - * required string transactionId = 1; + * optional string transactionId = 1; */ public Builder setTransactionId( java.lang.String value) { @@ -1337,7 +1321,7 @@ public final class ThreePhaseCommitCohortMessages { return this; } /** - * required string transactionId = 1; + * optional string transactionId = 1; */ public Builder clearTransactionId() { bitField0_ = (bitField0_ & ~0x00000001); @@ -1346,7 +1330,7 @@ public final class ThreePhaseCommitCohortMessages { return this; } /** - * required string transactionId = 1; + * optional string transactionId = 1; */ public Builder setTransactionIdBytes( com.google.protobuf.ByteString value) { @@ -1682,17 +1666,17 @@ public final class ThreePhaseCommitCohortMessages { public interface CommitTransactionOrBuilder extends com.google.protobuf.MessageOrBuilder { - // required string transactionId = 1; + // optional string transactionId = 1; /** - * required string transactionId = 1; + * optional string transactionId = 1; */ boolean hasTransactionId(); /** - * required string transactionId = 1; + * optional string transactionId = 1; */ java.lang.String getTransactionId(); /** - * required string transactionId = 1; + * optional string transactionId = 1; */ com.google.protobuf.ByteString getTransactionIdBytes(); @@ -1793,17 +1777,17 @@ public final class ThreePhaseCommitCohortMessages { } private int bitField0_; - // required string transactionId = 1; + // optional string transactionId = 1; public static final int TRANSACTIONID_FIELD_NUMBER = 1; private java.lang.Object transactionId_; /** - * required string transactionId = 1; + * optional string transactionId = 1; */ public boolean hasTransactionId() { return ((bitField0_ & 0x00000001) == 0x00000001); } /** - * required string transactionId = 1; + * optional string transactionId = 1; */ public java.lang.String getTransactionId() { java.lang.Object ref = transactionId_; @@ -1820,7 +1804,7 @@ public final class ThreePhaseCommitCohortMessages { } } /** - * required string transactionId = 1; + * optional string transactionId = 1; */ public com.google.protobuf.ByteString getTransactionIdBytes() { @@ -1844,10 +1828,6 @@ public final class ThreePhaseCommitCohortMessages { byte isInitialized = memoizedIsInitialized; if (isInitialized != -1) return isInitialized == 1; - if (!hasTransactionId()) { - memoizedIsInitialized = 0; - return false; - } memoizedIsInitialized = 1; return true; } @@ -2047,10 +2027,6 @@ public final class ThreePhaseCommitCohortMessages { } public final boolean isInitialized() { - if (!hasTransactionId()) { - - return false; - } return true; } @@ -2073,16 +2049,16 @@ public final class ThreePhaseCommitCohortMessages { } private int bitField0_; - // required string transactionId = 1; + // optional string transactionId = 1; private java.lang.Object transactionId_ = ""; /** - * required string transactionId = 1; + * optional string transactionId = 1; */ public boolean hasTransactionId() { return ((bitField0_ & 0x00000001) == 0x00000001); } /** - * required string transactionId = 1; + * optional string transactionId = 1; */ public java.lang.String getTransactionId() { java.lang.Object ref = transactionId_; @@ -2096,7 +2072,7 @@ public final class ThreePhaseCommitCohortMessages { } } /** - * required string transactionId = 1; + * optional string transactionId = 1; */ public com.google.protobuf.ByteString getTransactionIdBytes() { @@ -2112,7 +2088,7 @@ public final class ThreePhaseCommitCohortMessages { } } /** - * required string transactionId = 1; + * optional string transactionId = 1; */ public Builder setTransactionId( java.lang.String value) { @@ -2125,7 +2101,7 @@ public final class ThreePhaseCommitCohortMessages { return this; } /** - * required string transactionId = 1; + * optional string transactionId = 1; */ public Builder clearTransactionId() { bitField0_ = (bitField0_ & ~0x00000001); @@ -2134,7 +2110,7 @@ public final class ThreePhaseCommitCohortMessages { return this; } /** - * required string transactionId = 1; + * optional string transactionId = 1; */ public Builder setTransactionIdBytes( com.google.protobuf.ByteString value) { @@ -3136,11 +3112,11 @@ public final class ThreePhaseCommitCohortMessages { java.lang.String[] descriptorData = { "\n\014Cohort.proto\022!org.opendaylight.control" + "ler.mdsal\"-\n\024CanCommitTransaction\022\025\n\rtra" + - "nsactionId\030\001 \002(\t\".\n\031CanCommitTransaction" + + "nsactionId\030\001 \001(\t\".\n\031CanCommitTransaction" + "Reply\022\021\n\tcanCommit\030\001 \002(\010\")\n\020AbortTransac" + - "tion\022\025\n\rtransactionId\030\001 \002(\t\"\027\n\025AbortTran" + + "tion\022\025\n\rtransactionId\030\001 \001(\t\"\027\n\025AbortTran" + "sactionReply\"*\n\021CommitTransaction\022\025\n\rtra" + - "nsactionId\030\001 \002(\t\"\030\n\026CommitTransactionRep" + + "nsactionId\030\001 \001(\t\"\030\n\026CommitTransactionRep" + "ly\"\026\n\024PreCommitTransaction\"\033\n\031PreCommitT" + "ransactionReplyBZ\n8org.opendaylight.cont" + "roller.protobuff.messages.cohort3pcB\036Thr", diff --git a/opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/protobuff/messages/transaction/ShardTransactionMessages.java b/opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/protobuff/messages/transaction/ShardTransactionMessages.java index 96a39bddd3..3cd290c6de 100644 --- a/opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/protobuff/messages/transaction/ShardTransactionMessages.java +++ b/opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/protobuff/messages/transaction/ShardTransactionMessages.java @@ -668,6 +668,16 @@ public final class ShardTransactionMessages { */ com.google.protobuf.ByteString getTransactionChainIdBytes(); + + // optional int32 messageVersion = 4; + /** + * optional int32 messageVersion = 4; + */ + boolean hasMessageVersion(); + /** + * optional int32 messageVersion = 4; + */ + int getMessageVersion(); } /** * Protobuf type {@code org.opendaylight.controller.mdsal.CreateTransaction} @@ -735,6 +745,11 @@ public final class ShardTransactionMessages { transactionChainId_ = input.readBytes(); break; } + case 32: { + bitField0_ |= 0x00000008; + messageVersion_ = input.readInt32(); + break; + } } } } catch (com.google.protobuf.InvalidProtocolBufferException e) { @@ -877,10 +892,27 @@ public final class ShardTransactionMessages { } } + // optional int32 messageVersion = 4; + public static final int MESSAGEVERSION_FIELD_NUMBER = 4; + private int messageVersion_; + /** + * optional int32 messageVersion = 4; + */ + public boolean hasMessageVersion() { + return ((bitField0_ & 0x00000008) == 0x00000008); + } + /** + * optional int32 messageVersion = 4; + */ + public int getMessageVersion() { + return messageVersion_; + } + private void initFields() { transactionId_ = ""; transactionType_ = 0; transactionChainId_ = ""; + messageVersion_ = 0; } private byte memoizedIsInitialized = -1; public final boolean isInitialized() { @@ -911,6 +943,9 @@ public final class ShardTransactionMessages { if (((bitField0_ & 0x00000004) == 0x00000004)) { output.writeBytes(3, getTransactionChainIdBytes()); } + if (((bitField0_ & 0x00000008) == 0x00000008)) { + output.writeInt32(4, messageVersion_); + } getUnknownFields().writeTo(output); } @@ -932,6 +967,10 @@ public final class ShardTransactionMessages { size += com.google.protobuf.CodedOutputStream .computeBytesSize(3, getTransactionChainIdBytes()); } + if (((bitField0_ & 0x00000008) == 0x00000008)) { + size += com.google.protobuf.CodedOutputStream + .computeInt32Size(4, messageVersion_); + } size += getUnknownFields().getSerializedSize(); memoizedSerializedSize = size; return size; @@ -1054,6 +1093,8 @@ public final class ShardTransactionMessages { bitField0_ = (bitField0_ & ~0x00000002); transactionChainId_ = ""; bitField0_ = (bitField0_ & ~0x00000004); + messageVersion_ = 0; + bitField0_ = (bitField0_ & ~0x00000008); return this; } @@ -1094,6 +1135,10 @@ public final class ShardTransactionMessages { to_bitField0_ |= 0x00000004; } result.transactionChainId_ = transactionChainId_; + if (((from_bitField0_ & 0x00000008) == 0x00000008)) { + to_bitField0_ |= 0x00000008; + } + result.messageVersion_ = messageVersion_; result.bitField0_ = to_bitField0_; onBuilt(); return result; @@ -1123,6 +1168,9 @@ public final class ShardTransactionMessages { transactionChainId_ = other.transactionChainId_; onChanged(); } + if (other.hasMessageVersion()) { + setMessageVersion(other.getMessageVersion()); + } this.mergeUnknownFields(other.getUnknownFields()); return this; } @@ -1339,6 +1387,39 @@ public final class ShardTransactionMessages { return this; } + // optional int32 messageVersion = 4; + private int messageVersion_ ; + /** + * optional int32 messageVersion = 4; + */ + public boolean hasMessageVersion() { + return ((bitField0_ & 0x00000008) == 0x00000008); + } + /** + * optional int32 messageVersion = 4; + */ + public int getMessageVersion() { + return messageVersion_; + } + /** + * optional int32 messageVersion = 4; + */ + public Builder setMessageVersion(int value) { + bitField0_ |= 0x00000008; + messageVersion_ = value; + onChanged(); + return this; + } + /** + * optional int32 messageVersion = 4; + */ + public Builder clearMessageVersion() { + bitField0_ = (bitField0_ & ~0x00000008); + messageVersion_ = 0; + onChanged(); + return this; + } + // @@protoc_insertion_point(builder_scope:org.opendaylight.controller.mdsal.CreateTransaction) } @@ -7753,37 +7834,37 @@ public final class ShardTransactionMessages { java.lang.String[] descriptorData = { "\n\026ShardTransaction.proto\022!org.opendaylig" + "ht.controller.mdsal\032\014Common.proto\"\022\n\020Clo" + - "seTransaction\"\027\n\025CloseTransactionReply\"_" + + "seTransaction\"\027\n\025CloseTransactionReply\"w" + "\n\021CreateTransaction\022\025\n\rtransactionId\030\001 \002" + "(\t\022\027\n\017transactionType\030\002 \002(\005\022\032\n\022transacti" + - "onChainId\030\003 \001(\t\"M\n\026CreateTransactionRepl" + - "y\022\034\n\024transactionActorPath\030\001 \002(\t\022\025\n\rtrans" + - "actionId\030\002 \002(\t\"\022\n\020ReadyTransaction\"*\n\025Re" + - "adyTransactionReply\022\021\n\tactorPath\030\001 \002(\t\"l" + - "\n\nDeleteData\022^\n\037instanceIdentifierPathAr", - "guments\030\001 \002(\01325.org.opendaylight.control" + - "ler.mdsal.InstanceIdentifier\"\021\n\017DeleteDa" + - "taReply\"j\n\010ReadData\022^\n\037instanceIdentifie" + - "rPathArguments\030\001 \002(\01325.org.opendaylight." + - "controller.mdsal.InstanceIdentifier\"P\n\rR" + - "eadDataReply\022?\n\016normalizedNode\030\001 \001(\0132\'.o" + - "rg.opendaylight.controller.mdsal.Node\"\254\001" + - "\n\tWriteData\022^\n\037instanceIdentifierPathArg" + - "uments\030\001 \002(\01325.org.opendaylight.controll" + - "er.mdsal.InstanceIdentifier\022?\n\016normalize", - "dNode\030\002 \002(\0132\'.org.opendaylight.controlle" + - "r.mdsal.Node\"\020\n\016WriteDataReply\"\254\001\n\tMerge" + - "Data\022^\n\037instanceIdentifierPathArguments\030" + - "\001 \002(\01325.org.opendaylight.controller.mdsa" + - "l.InstanceIdentifier\022?\n\016normalizedNode\030\002" + - " \002(\0132\'.org.opendaylight.controller.mdsal" + - ".Node\"\020\n\016MergeDataReply\"l\n\nDataExists\022^\n" + - "\037instanceIdentifierPathArguments\030\001 \002(\01325" + - ".org.opendaylight.controller.mdsal.Insta" + - "nceIdentifier\"!\n\017DataExistsReply\022\016\n\006exis", - "ts\030\001 \002(\010BV\n:org.opendaylight.controller." + - "protobuff.messages.transactionB\030ShardTra" + - "nsactionMessages" + "onChainId\030\003 \001(\t\022\026\n\016messageVersion\030\004 \001(\005\"" + + "M\n\026CreateTransactionReply\022\034\n\024transaction" + + "ActorPath\030\001 \002(\t\022\025\n\rtransactionId\030\002 \002(\t\"\022" + + "\n\020ReadyTransaction\"*\n\025ReadyTransactionRe" + + "ply\022\021\n\tactorPath\030\001 \002(\t\"l\n\nDeleteData\022^\n\037", + "instanceIdentifierPathArguments\030\001 \002(\01325." + + "org.opendaylight.controller.mdsal.Instan" + + "ceIdentifier\"\021\n\017DeleteDataReply\"j\n\010ReadD" + + "ata\022^\n\037instanceIdentifierPathArguments\030\001" + + " \002(\01325.org.opendaylight.controller.mdsal" + + ".InstanceIdentifier\"P\n\rReadDataReply\022?\n\016" + + "normalizedNode\030\001 \001(\0132\'.org.opendaylight." + + "controller.mdsal.Node\"\254\001\n\tWriteData\022^\n\037i" + + "nstanceIdentifierPathArguments\030\001 \002(\01325.o" + + "rg.opendaylight.controller.mdsal.Instanc", + "eIdentifier\022?\n\016normalizedNode\030\002 \002(\0132\'.or" + + "g.opendaylight.controller.mdsal.Node\"\020\n\016" + + "WriteDataReply\"\254\001\n\tMergeData\022^\n\037instance" + + "IdentifierPathArguments\030\001 \002(\01325.org.open" + + "daylight.controller.mdsal.InstanceIdenti" + + "fier\022?\n\016normalizedNode\030\002 \002(\0132\'.org.opend" + + "aylight.controller.mdsal.Node\"\020\n\016MergeDa" + + "taReply\"l\n\nDataExists\022^\n\037instanceIdentif" + + "ierPathArguments\030\001 \002(\01325.org.opendayligh" + + "t.controller.mdsal.InstanceIdentifier\"!\n", + "\017DataExistsReply\022\016\n\006exists\030\001 \002(\010BV\n:org." + + "opendaylight.controller.protobuff.messag" + + "es.transactionB\030ShardTransactionMessages" }; com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner assigner = new com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner() { @@ -7807,7 +7888,7 @@ public final class ShardTransactionMessages { internal_static_org_opendaylight_controller_mdsal_CreateTransaction_fieldAccessorTable = new com.google.protobuf.GeneratedMessage.FieldAccessorTable( internal_static_org_opendaylight_controller_mdsal_CreateTransaction_descriptor, - new java.lang.String[] { "TransactionId", "TransactionType", "TransactionChainId", }); + new java.lang.String[] { "TransactionId", "TransactionType", "TransactionChainId", "MessageVersion", }); internal_static_org_opendaylight_controller_mdsal_CreateTransactionReply_descriptor = getDescriptor().getMessageTypes().get(3); internal_static_org_opendaylight_controller_mdsal_CreateTransactionReply_fieldAccessorTable = new diff --git a/opendaylight/md-sal/sal-clustering-commons/src/main/resources/Cohort.proto b/opendaylight/md-sal/sal-clustering-commons/src/main/resources/Cohort.proto index 49c6cd07a8..222f68ab4f 100644 --- a/opendaylight/md-sal/sal-clustering-commons/src/main/resources/Cohort.proto +++ b/opendaylight/md-sal/sal-clustering-commons/src/main/resources/Cohort.proto @@ -5,7 +5,7 @@ option java_outer_classname = "ThreePhaseCommitCohortMessages"; message CanCommitTransaction{ - required string transactionId = 1; + optional string transactionId = 1; } message CanCommitTransactionReply{ @@ -14,7 +14,7 @@ message CanCommitTransactionReply{ } message AbortTransaction{ - required string transactionId = 1; + optional string transactionId = 1; } message AbortTransactionReply { @@ -22,7 +22,7 @@ message AbortTransactionReply { } message CommitTransaction{ - required string transactionId = 1; + optional string transactionId = 1; } message CommitTransactionReply{ diff --git a/opendaylight/md-sal/sal-clustering-commons/src/main/resources/ShardTransaction.proto b/opendaylight/md-sal/sal-clustering-commons/src/main/resources/ShardTransaction.proto index 26581478d9..cd1132d99d 100644 --- a/opendaylight/md-sal/sal-clustering-commons/src/main/resources/ShardTransaction.proto +++ b/opendaylight/md-sal/sal-clustering-commons/src/main/resources/ShardTransaction.proto @@ -16,6 +16,7 @@ message CreateTransaction{ required string transactionId = 1; required int32 transactionType =2; optional string transactionChainId = 3; + optional int32 messageVersion = 4; } message CreateTransactionReply{ diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java index 7d67e0856f..1bf32e7fca 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java @@ -8,29 +8,18 @@ package org.opendaylight.controller.cluster.datastore; -import akka.actor.ActorRef; -import akka.actor.ActorSelection; -import akka.actor.Cancellable; -import akka.actor.PoisonPill; -import akka.actor.Props; -import akka.event.Logging; -import akka.event.LoggingAdapter; -import akka.japi.Creator; -import akka.persistence.RecoveryFailure; -import akka.serialization.Serialization; -import com.google.common.annotations.VisibleForTesting; -import com.google.common.base.Optional; -import com.google.common.base.Preconditions; -import com.google.common.collect.Lists; -import com.google.common.util.concurrent.FutureCallback; -import com.google.common.util.concurrent.Futures; -import com.google.common.util.concurrent.ListenableFuture; -import com.google.protobuf.ByteString; -import com.google.protobuf.InvalidProtocolBufferException; +import java.util.Collection; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; +import javax.annotation.Nonnull; import org.opendaylight.controller.cluster.DataPersistenceProvider; import org.opendaylight.controller.cluster.common.actor.CommonConfig; import org.opendaylight.controller.cluster.common.actor.MeteringBehavior; import org.opendaylight.controller.cluster.datastore.ShardCommitCoordinator.CohortEntry; +import org.opendaylight.controller.cluster.datastore.compat.BackwardsCompatibleThreePhaseCommitCohort; import org.opendaylight.controller.cluster.datastore.exceptions.NoShardLeaderException; import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier; import org.opendaylight.controller.cluster.datastore.identifiers.ShardTransactionIdentifier; @@ -76,14 +65,25 @@ import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode; import org.opendaylight.yangtools.yang.model.api.SchemaContext; import scala.concurrent.duration.Duration; import scala.concurrent.duration.FiniteDuration; - -import javax.annotation.Nonnull; -import java.util.Collection; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.TimeUnit; +import akka.actor.ActorRef; +import akka.actor.ActorSelection; +import akka.actor.Cancellable; +import akka.actor.PoisonPill; +import akka.actor.Props; +import akka.event.Logging; +import akka.event.LoggingAdapter; +import akka.japi.Creator; +import akka.persistence.RecoveryFailure; +import akka.serialization.Serialization; +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Optional; +import com.google.common.base.Preconditions; +import com.google.common.collect.Lists; +import com.google.common.util.concurrent.FutureCallback; +import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.ListenableFuture; +import com.google.protobuf.ByteString; +import com.google.protobuf.InvalidProtocolBufferException; /** * A Shard represents a portion of the logical data tree
@@ -93,6 +93,8 @@ import java.util.concurrent.TimeUnit; */ public class Shard extends RaftActor { + private static final int HELIUM_1_TX_VERSION = 1; + private static final Object COMMIT_TRANSACTION_REPLY = new CommitTransactionReply().toSerializable(); private static final Object TX_COMMIT_TIMEOUT_CHECK_MESSAGE = "txCommitTimeoutCheck"; @@ -374,7 +376,8 @@ public class Shard extends RaftActor { } private void handleForwardedReadyTransaction(ForwardedReadyTransaction ready) { - LOG.debug("Readying transaction {}", ready.getTransactionID()); + LOG.debug("Readying transaction {}, client version {}", ready.getTransactionID(), + ready.getTxnClientVersion()); // This message is forwarded by the ShardTransaction on ready. We cache the cohort in the // commitCoordinator in preparation for the subsequent three phase commit initiated by @@ -382,12 +385,22 @@ public class Shard extends RaftActor { commitCoordinator.transactionReady(ready.getTransactionID(), ready.getCohort(), ready.getModification()); - // Return our actor path as we'll handle the three phase commit. - ReadyTransactionReply readyTransactionReply = - new ReadyTransactionReply(Serialization.serializedActorPath(self())); - getSender().tell( - ready.isReturnSerialized() ? readyTransactionReply.toSerializable() : readyTransactionReply, - getSelf()); + // Return our actor path as we'll handle the three phase commit, except if the Tx client + // version < 1 (Helium-1 version). This means the Tx was initiated by a base Helium version + // node. In that case, the subsequent 3-phase commit messages won't contain the + // transactionId so to maintain backwards compatibility, we create a separate cohort actor + // to provide the compatible behavior. + ActorRef replyActorPath = self(); + if(ready.getTxnClientVersion() < HELIUM_1_TX_VERSION) { + LOG.debug("Creating BackwardsCompatibleThreePhaseCommitCohort"); + replyActorPath = getContext().actorOf(BackwardsCompatibleThreePhaseCommitCohort.props( + ready.getTransactionID())); + } + + ReadyTransactionReply readyTransactionReply = new ReadyTransactionReply( + Serialization.serializedActorPath(replyActorPath)); + getSender().tell(ready.isReturnSerialized() ? readyTransactionReply.toSerializable() : + readyTransactionReply, getSelf()); } private void handleAbortTransaction(AbortTransaction abort) { @@ -465,10 +478,8 @@ public class Shard extends RaftActor { } } - private ActorRef createTypedTransactionActor( - int transactionType, - ShardTransactionIdentifier transactionId, - String transactionChainId ) { + private ActorRef createTypedTransactionActor(int transactionType, + ShardTransactionIdentifier transactionId, String transactionChainId, int clientVersion ) { DOMStoreTransactionFactory factory = store; @@ -492,7 +503,8 @@ public class Shard extends RaftActor { return getContext().actorOf( ShardTransaction.props(factory.newReadOnlyTransaction(), getSelf(), schemaContext,datastoreContext, shardMBean, - transactionId.getRemoteTransactionId()), transactionId.toString()); + transactionId.getRemoteTransactionId(), clientVersion), + transactionId.toString()); } else if (transactionType == TransactionProxy.TransactionType.READ_WRITE.ordinal()) { @@ -501,7 +513,8 @@ public class Shard extends RaftActor { return getContext().actorOf( ShardTransaction.props(factory.newReadWriteTransaction(), getSelf(), schemaContext, datastoreContext, shardMBean, - transactionId.getRemoteTransactionId()), transactionId.toString()); + transactionId.getRemoteTransactionId(), clientVersion), + transactionId.toString()); } else if (transactionType == TransactionProxy.TransactionType.WRITE_ONLY.ordinal()) { @@ -511,7 +524,8 @@ public class Shard extends RaftActor { return getContext().actorOf( ShardTransaction.props(factory.newWriteOnlyTransaction(), getSelf(), schemaContext, datastoreContext, shardMBean, - transactionId.getRemoteTransactionId()), transactionId.toString()); + transactionId.getRemoteTransactionId(), clientVersion), + transactionId.toString()); } else { throw new IllegalArgumentException( "Shard="+name + ":CreateTransaction message has unidentified transaction type=" @@ -521,10 +535,12 @@ public class Shard extends RaftActor { private void createTransaction(CreateTransaction createTransaction) { createTransaction(createTransaction.getTransactionType(), - createTransaction.getTransactionId(), createTransaction.getTransactionChainId()); + createTransaction.getTransactionId(), createTransaction.getTransactionChainId(), + createTransaction.getClientVersion()); } - private ActorRef createTransaction(int transactionType, String remoteTransactionId, String transactionChainId) { + private ActorRef createTransaction(int transactionType, String remoteTransactionId, + String transactionChainId, int clientVersion) { ShardTransactionIdentifier transactionId = ShardTransactionIdentifier.builder() @@ -533,8 +549,8 @@ public class Shard extends RaftActor { if(LOG.isDebugEnabled()) { LOG.debug("Creating transaction : {} ", transactionId); } - ActorRef transactionActor = - createTypedTransactionActor(transactionType, transactionId, transactionChainId); + ActorRef transactionActor = createTypedTransactionActor(transactionType, transactionId, + transactionChainId, clientVersion); getSender() .tell(new CreateTransactionReply( @@ -765,7 +781,8 @@ public class Shard extends RaftActor { // so that this actor does not get block building the snapshot createSnapshotTransaction = createTransaction( TransactionProxy.TransactionType.READ_ONLY.ordinal(), - "createSnapshot" + ++createSnapshotTransactionCounter, ""); + "createSnapshot" + ++createSnapshotTransactionCounter, "", + CreateTransaction.CURRENT_CLIENT_VERSION); createSnapshotTransaction.tell( new ReadData(YangInstanceIdentifier.builder().build()).toSerializable(), self()); diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardReadTransaction.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardReadTransaction.java index d12e9997bb..9d8f57252a 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardReadTransaction.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardReadTransaction.java @@ -26,8 +26,9 @@ public class ShardReadTransaction extends ShardTransaction { private final DOMStoreReadTransaction transaction; public ShardReadTransaction(DOMStoreReadTransaction transaction, ActorRef shardActor, - SchemaContext schemaContext, ShardStats shardStats, String transactionID) { - super(shardActor, schemaContext, shardStats, transactionID); + SchemaContext schemaContext, ShardStats shardStats, String transactionID, + int txnClientVersion) { + super(shardActor, schemaContext, shardStats, transactionID, txnClientVersion); this.transaction = transaction; } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardReadWriteTransaction.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardReadWriteTransaction.java index b1fd02d217..e558677ebb 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardReadWriteTransaction.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardReadWriteTransaction.java @@ -26,8 +26,9 @@ public class ShardReadWriteTransaction extends ShardWriteTransaction { private final DOMStoreReadWriteTransaction transaction; public ShardReadWriteTransaction(DOMStoreReadWriteTransaction transaction, ActorRef shardActor, - SchemaContext schemaContext, ShardStats shardStats, String transactionID) { - super(transaction, shardActor, schemaContext, shardStats, transactionID); + SchemaContext schemaContext, ShardStats shardStats, String transactionID, + int txnClientVersion) { + super(transaction, shardActor, schemaContext, shardStats, transactionID, txnClientVersion); this.transaction = transaction; } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardTransaction.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardTransaction.java index 32de47f451..59bb4bfd77 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardTransaction.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardTransaction.java @@ -57,26 +57,29 @@ import org.opendaylight.yangtools.yang.model.api.SchemaContext; */ public abstract class ShardTransaction extends AbstractUntypedActorWithMetering { + protected static final boolean SERIALIZED_REPLY = true; + private final ActorRef shardActor; private final SchemaContext schemaContext; private final ShardStats shardStats; private final String transactionID; - protected static final boolean SERIALIZED_REPLY = true; + private final int txnClientVersion; protected ShardTransaction(ActorRef shardActor, SchemaContext schemaContext, - ShardStats shardStats, String transactionID) { + ShardStats shardStats, String transactionID, int txnClientVersion) { super("shard-tx"); //actor name override used for metering. This does not change the "real" actor name this.shardActor = shardActor; this.schemaContext = schemaContext; this.shardStats = shardStats; this.transactionID = transactionID; + this.txnClientVersion = txnClientVersion; } public static Props props(DOMStoreTransaction transaction, ActorRef shardActor, SchemaContext schemaContext,DatastoreContext datastoreContext, ShardStats shardStats, - String transactionID) { + String transactionID, int txnClientVersion) { return Props.create(new ShardTransactionCreator(transaction, shardActor, schemaContext, - datastoreContext, shardStats, transactionID)); + datastoreContext, shardStats, transactionID, txnClientVersion)); } protected abstract DOMStoreTransaction getDOMStoreTransaction(); @@ -93,6 +96,10 @@ public abstract class ShardTransaction extends AbstractUntypedActorWithMetering return schemaContext; } + protected int getTxnClientVersion() { + return txnClientVersion; + } + @Override public void handleReceive(Object message) throws Exception { if (message.getClass().equals(CloseTransaction.SERIALIZABLE_CLASS)) { @@ -169,16 +176,18 @@ public abstract class ShardTransaction extends AbstractUntypedActorWithMetering final DatastoreContext datastoreContext; final ShardStats shardStats; final String transactionID; + final int txnClientVersion; ShardTransactionCreator(DOMStoreTransaction transaction, ActorRef shardActor, SchemaContext schemaContext, DatastoreContext datastoreContext, - ShardStats shardStats, String transactionID) { + ShardStats shardStats, String transactionID, int txnClientVersion) { this.transaction = transaction; this.shardActor = shardActor; this.shardStats = shardStats; this.schemaContext = schemaContext; this.datastoreContext = datastoreContext; this.transactionID = transactionID; + this.txnClientVersion = txnClientVersion; } @Override @@ -186,13 +195,13 @@ public abstract class ShardTransaction extends AbstractUntypedActorWithMetering ShardTransaction tx; if(transaction instanceof DOMStoreReadWriteTransaction) { tx = new ShardReadWriteTransaction((DOMStoreReadWriteTransaction)transaction, - shardActor, schemaContext, shardStats, transactionID); + shardActor, schemaContext, shardStats, transactionID, txnClientVersion); } else if(transaction instanceof DOMStoreReadTransaction) { tx = new ShardReadTransaction((DOMStoreReadTransaction)transaction, shardActor, - schemaContext, shardStats, transactionID); + schemaContext, shardStats, transactionID, txnClientVersion); } else { tx = new ShardWriteTransaction((DOMStoreWriteTransaction)transaction, - shardActor, schemaContext, shardStats, transactionID); + shardActor, schemaContext, shardStats, transactionID, txnClientVersion); } tx.getContext().setReceiveTimeout(datastoreContext.getShardTransactionIdleTimeout()); diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardTransactionChain.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardTransactionChain.java index 943a82f6f9..4bf184e9a7 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardTransactionChain.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardTransactionChain.java @@ -63,19 +63,22 @@ public class ShardTransactionChain extends AbstractUntypedActor { return getContext().actorOf( ShardTransaction.props( chain.newReadOnlyTransaction(), getShardActor(), schemaContext, datastoreContext, shardStats, - createTransaction.getTransactionId()), transactionName); + createTransaction.getTransactionId(), + createTransaction.getClientVersion()), transactionName); } else if (createTransaction.getTransactionType() == TransactionProxy.TransactionType.READ_WRITE.ordinal()) { return getContext().actorOf( ShardTransaction.props( chain.newReadWriteTransaction(), getShardActor(), schemaContext, datastoreContext, shardStats, - createTransaction.getTransactionId()), transactionName); + createTransaction.getTransactionId(), + createTransaction.getClientVersion()), transactionName); } else if (createTransaction.getTransactionType() == TransactionProxy.TransactionType.WRITE_ONLY.ordinal()) { return getContext().actorOf( ShardTransaction.props( chain.newWriteOnlyTransaction(), getShardActor(), schemaContext, datastoreContext, shardStats, - createTransaction.getTransactionId()), transactionName); + createTransaction.getTransactionId(), + createTransaction.getClientVersion()), transactionName); } else { throw new IllegalArgumentException ( "CreateTransaction message has unidentified transaction type=" + diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardWriteTransaction.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardWriteTransaction.java index b0eaf98d59..44f2c7bd0a 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardWriteTransaction.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardWriteTransaction.java @@ -42,8 +42,9 @@ public class ShardWriteTransaction extends ShardTransaction { private final DOMStoreWriteTransaction transaction; public ShardWriteTransaction(DOMStoreWriteTransaction transaction, ActorRef shardActor, - SchemaContext schemaContext, ShardStats shardStats, String transactionID) { - super(shardActor, schemaContext, shardStats, transactionID); + SchemaContext schemaContext, ShardStats shardStats, String transactionID, + int txnClientVersion) { + super(shardActor, schemaContext, shardStats, transactionID, txnClientVersion); this.transaction = transaction; } @@ -144,8 +145,8 @@ public class ShardWriteTransaction extends ShardTransaction { DOMStoreThreePhaseCommitCohort cohort = transaction.ready(); - getShardActor().forward(new ForwardedReadyTransaction(transactionID, cohort, modification, - returnSerialized), getContext()); + getShardActor().forward(new ForwardedReadyTransaction(transactionID, getTxnClientVersion(), + cohort, modification, returnSerialized), getContext()); // The shard will handle the commit from here so we're no longer needed - self-destruct. getSelf().tell(PoisonPill.getInstance(), getSelf()); diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/compat/BackwardsCompatibleThreePhaseCommitCohort.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/compat/BackwardsCompatibleThreePhaseCommitCohort.java new file mode 100644 index 0000000000..30ab97ceb1 --- /dev/null +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/compat/BackwardsCompatibleThreePhaseCommitCohort.java @@ -0,0 +1,87 @@ +/* + * Copyright (c) 2014 Brocade Communications Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.controller.cluster.datastore.compat; + +import org.opendaylight.controller.cluster.common.actor.AbstractUntypedActor; +import org.opendaylight.controller.cluster.datastore.messages.AbortTransaction; +import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransaction; +import org.opendaylight.controller.cluster.datastore.messages.CommitTransaction; +import org.opendaylight.controller.cluster.datastore.messages.PreCommitTransaction; +import org.opendaylight.controller.cluster.datastore.messages.PreCommitTransactionReply; +import akka.actor.PoisonPill; +import akka.actor.Props; +import akka.event.Logging; +import akka.event.LoggingAdapter; +import akka.japi.Creator; + +/** + * An actor to maintain backwards compatibility for the base Helium version where the 3-phase commit + * messages don't contain the transactionId. This actor just forwards a new message containing the + * transactionId to the parent Shard. + * + * @author Thomas Pantelis + */ +public class BackwardsCompatibleThreePhaseCommitCohort extends AbstractUntypedActor { + + private final LoggingAdapter LOG = Logging.getLogger(getContext().system(), this); + + private final String transactionId; + + private BackwardsCompatibleThreePhaseCommitCohort(String transactionId) { + this.transactionId = transactionId; + } + + @Override + public void handleReceive(Object message) throws Exception { + if(message.getClass().equals(CanCommitTransaction.SERIALIZABLE_CLASS)) { + LOG.debug("BackwardsCompatibleThreePhaseCommitCohort CanCommitTransaction"); + + getContext().parent().forward(new CanCommitTransaction(transactionId).toSerializable(), + getContext()); + } else if(message.getClass().equals(PreCommitTransaction.SERIALIZABLE_CLASS)) { + LOG.debug("BackwardsCompatibleThreePhaseCommitCohort PreCommitTransaction"); + + // The Shard doesn't need the PreCommitTransaction message so just return the reply here. + getSender().tell(new PreCommitTransactionReply().toSerializable(), self()); + } else if(message.getClass().equals(CommitTransaction.SERIALIZABLE_CLASS)) { + LOG.debug("BackwardsCompatibleThreePhaseCommitCohort CommitTransaction"); + + getContext().parent().forward(new CommitTransaction(transactionId).toSerializable(), + getContext()); + + // We're done now - we can self-destruct + self().tell(PoisonPill.getInstance(), self()); + } else if(message.getClass().equals(AbortTransaction.SERIALIZABLE_CLASS)) { + LOG.debug("BackwardsCompatibleThreePhaseCommitCohort AbortTransaction"); + + getContext().parent().forward(new AbortTransaction(transactionId).toSerializable(), + getContext()); + self().tell(PoisonPill.getInstance(), self()); + } + } + + public static Props props(String transactionId) { + return Props.create(new BackwardsCompatibleThreePhaseCommitCohortCreator(transactionId)); + } + + private static class BackwardsCompatibleThreePhaseCommitCohortCreator + implements Creator { + private static final long serialVersionUID = 1L; + + private final String transactionId; + + BackwardsCompatibleThreePhaseCommitCohortCreator(String transactionId) { + this.transactionId = transactionId; + } + + @Override + public BackwardsCompatibleThreePhaseCommitCohort create() throws Exception { + return new BackwardsCompatibleThreePhaseCommitCohort(transactionId); + } + } +} diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/CreateTransaction.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/CreateTransaction.java index 361d406ac8..cbb881f788 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/CreateTransaction.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/CreateTransaction.java @@ -13,24 +13,32 @@ import org.opendaylight.controller.protobuff.messages.transaction.ShardTransacti public class CreateTransaction implements SerializableMessage { - public static final Class SERIALIZABLE_CLASS = ShardTransactionMessages.CreateTransaction.class; + public static final Class SERIALIZABLE_CLASS = + ShardTransactionMessages.CreateTransaction.class; + + public static final int CURRENT_CLIENT_VERSION = 1; + private final String transactionId; private final int transactionType; private final String transactionChainId; + private final int clientVersion; public CreateTransaction(String transactionId, int transactionType) { this(transactionId, transactionType, ""); } public CreateTransaction(String transactionId, int transactionType, String transactionChainId) { + this(transactionId, transactionType, transactionChainId, CURRENT_CLIENT_VERSION); + } + private CreateTransaction(String transactionId, int transactionType, String transactionChainId, + int clientVersion) { this.transactionId = transactionId; this.transactionType = transactionType; this.transactionChainId = transactionChainId; - + this.clientVersion = clientVersion; } - public String getTransactionId() { return transactionId; } @@ -39,19 +47,25 @@ public class CreateTransaction implements SerializableMessage { return transactionType; } + public int getClientVersion() { + return clientVersion; + } + @Override public Object toSerializable() { return ShardTransactionMessages.CreateTransaction.newBuilder() .setTransactionId(transactionId) .setTransactionType(transactionType) - .setTransactionChainId(transactionChainId).build(); + .setTransactionChainId(transactionChainId) + .setMessageVersion(clientVersion).build(); } public static CreateTransaction fromSerializable(Object message) { ShardTransactionMessages.CreateTransaction createTransaction = (ShardTransactionMessages.CreateTransaction) message; return new CreateTransaction(createTransaction.getTransactionId(), - createTransaction.getTransactionType(), createTransaction.getTransactionChainId()); + createTransaction.getTransactionType(), createTransaction.getTransactionChainId(), + createTransaction.getMessageVersion()); } public String getTransactionChainId() { diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/ForwardedReadyTransaction.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/ForwardedReadyTransaction.java index 180108f218..38886c9a58 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/ForwardedReadyTransaction.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/ForwardedReadyTransaction.java @@ -20,14 +20,16 @@ public class ForwardedReadyTransaction { private final DOMStoreThreePhaseCommitCohort cohort; private final Modification modification; private final boolean returnSerialized; + private final int txnClientVersion; - public ForwardedReadyTransaction(String transactionID, DOMStoreThreePhaseCommitCohort cohort, - Modification modification, boolean returnSerialized) { + public ForwardedReadyTransaction(String transactionID, int txnClientVersion, + DOMStoreThreePhaseCommitCohort cohort, Modification modification, + boolean returnSerialized) { this.transactionID = transactionID; this.cohort = cohort; this.modification = modification; this.returnSerialized = returnSerialized; - + this.txnClientVersion = txnClientVersion; } public String getTransactionID() { @@ -45,4 +47,8 @@ public class ForwardedReadyTransaction { public boolean isReturnSerialized() { return returnSerialized; } + + public int getTxnClientVersion() { + return txnClientVersion; + } } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTest.java index cd8a658447..557a75a075 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTest.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTest.java @@ -1,21 +1,26 @@ package org.opendaylight.controller.cluster.datastore; -import akka.actor.ActorRef; -import akka.actor.PoisonPill; -import akka.actor.Props; -import akka.dispatch.Dispatchers; -import akka.dispatch.OnComplete; -import akka.japi.Creator; -import akka.pattern.Patterns; -import akka.testkit.TestActorRef; -import akka.util.Timeout; -import com.google.common.base.Function; -import com.google.common.base.Optional; -import com.google.common.util.concurrent.CheckedFuture; -import com.google.common.util.concurrent.Futures; -import com.google.common.util.concurrent.ListenableFuture; -import com.google.common.util.concurrent.MoreExecutors; -import com.google.common.util.concurrent.Uninterruptibles; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; +import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.doReturn; +import static org.mockito.Mockito.inOrder; +import static org.mockito.Mockito.mock; +import static org.opendaylight.controller.cluster.datastore.messages.CreateTransaction.CURRENT_CLIENT_VERSION; +import java.io.IOException; +import java.util.Collections; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicReference; import org.junit.After; import org.junit.Before; import org.junit.Test; @@ -79,28 +84,22 @@ import org.opendaylight.yangtools.yang.model.api.SchemaContext; import scala.concurrent.Await; import scala.concurrent.Future; import scala.concurrent.duration.FiniteDuration; - -import java.io.IOException; -import java.util.Collections; -import java.util.HashSet; -import java.util.Map; -import java.util.Set; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicInteger; -import java.util.concurrent.atomic.AtomicReference; - -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertNotNull; -import static org.junit.Assert.assertNull; -import static org.junit.Assert.assertTrue; -import static org.junit.Assert.fail; -import static org.mockito.Mockito.doAnswer; -import static org.mockito.Mockito.doReturn; -import static org.mockito.Mockito.inOrder; -import static org.mockito.Mockito.mock; +import akka.actor.ActorRef; +import akka.actor.PoisonPill; +import akka.actor.Props; +import akka.dispatch.Dispatchers; +import akka.dispatch.OnComplete; +import akka.japi.Creator; +import akka.pattern.Patterns; +import akka.testkit.TestActorRef; +import akka.util.Timeout; +import com.google.common.base.Function; +import com.google.common.base.Optional; +import com.google.common.util.concurrent.CheckedFuture; +import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.MoreExecutors; +import com.google.common.util.concurrent.Uninterruptibles; public class ShardTest extends AbstractActorTest { @@ -611,7 +610,8 @@ public class ShardTest extends AbstractActorTest { // Simulate the ForwardedReadyTransaction message for the first Tx that would be sent // by the ShardTransaction. - shard.tell(new ForwardedReadyTransaction(transactionID1, cohort1, modification1, true), getRef()); + shard.tell(new ForwardedReadyTransaction(transactionID1, CURRENT_CLIENT_VERSION, + cohort1, modification1, true), getRef()); ReadyTransactionReply readyReply = ReadyTransactionReply.fromSerializable( expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS)); assertEquals("Cohort path", shard.path().toString(), readyReply.getCohortPath()); @@ -625,10 +625,12 @@ public class ShardTest extends AbstractActorTest { // Send the ForwardedReadyTransaction for the next 2 Tx's. - shard.tell(new ForwardedReadyTransaction(transactionID2, cohort2, modification2, true), getRef()); + shard.tell(new ForwardedReadyTransaction(transactionID2, CURRENT_CLIENT_VERSION, + cohort2, modification2, true), getRef()); expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS); - shard.tell(new ForwardedReadyTransaction(transactionID3, cohort3, modification3, true), getRef()); + shard.tell(new ForwardedReadyTransaction(transactionID3, CURRENT_CLIENT_VERSION, + cohort3, modification3, true), getRef()); expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS); // Send the CanCommitTransaction message for the next 2 Tx's. These should get queued and @@ -792,10 +794,12 @@ public class ShardTest extends AbstractActorTest { // Simulate the ForwardedReadyTransaction messages that would be sent // by the ShardTransaction. - shard.tell(new ForwardedReadyTransaction(transactionID1, cohort1, modification1, true), getRef()); + shard.tell(new ForwardedReadyTransaction(transactionID1, CURRENT_CLIENT_VERSION, + cohort1, modification1, true), getRef()); expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS); - shard.tell(new ForwardedReadyTransaction(transactionID2, cohort2, modification2, true), getRef()); + shard.tell(new ForwardedReadyTransaction(transactionID2, CURRENT_CLIENT_VERSION, + cohort2, modification2, true), getRef()); expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS); // Send the CanCommitTransaction message for the first Tx. @@ -859,7 +863,8 @@ public class ShardTest extends AbstractActorTest { // Simulate the ForwardedReadyTransaction messages that would be sent // by the ShardTransaction. - shard.tell(new ForwardedReadyTransaction(transactionID, cohort, modification, true), getRef()); + shard.tell(new ForwardedReadyTransaction(transactionID, CURRENT_CLIENT_VERSION, + cohort, modification, true), getRef()); expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS); // Send the CanCommitTransaction message. @@ -902,7 +907,8 @@ public class ShardTest extends AbstractActorTest { // Simulate the ForwardedReadyTransaction messages that would be sent // by the ShardTransaction. - shard.tell(new ForwardedReadyTransaction(transactionID, cohort, modification, true), getRef()); + shard.tell(new ForwardedReadyTransaction(transactionID, CURRENT_CLIENT_VERSION, + cohort, modification, true), getRef()); expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS); // Send the CanCommitTransaction message. @@ -954,7 +960,8 @@ public class ShardTest extends AbstractActorTest { TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME), modification, preCommit); - shard.tell(new ForwardedReadyTransaction(transactionID, cohort, modification, true), getRef()); + shard.tell(new ForwardedReadyTransaction(transactionID, CURRENT_CLIENT_VERSION, + cohort, modification, true), getRef()); expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS); shard.tell(new CanCommitTransaction(transactionID).toSerializable(), getRef()); @@ -1018,10 +1025,12 @@ public class ShardTest extends AbstractActorTest { // Ready the Tx's - shard.tell(new ForwardedReadyTransaction(transactionID1, cohort1, modification1, true), getRef()); + shard.tell(new ForwardedReadyTransaction(transactionID1, CURRENT_CLIENT_VERSION, + cohort1, modification1, true), getRef()); expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS); - shard.tell(new ForwardedReadyTransaction(transactionID2, cohort2, modification2, true), getRef()); + shard.tell(new ForwardedReadyTransaction(transactionID2, CURRENT_CLIENT_VERSION, + cohort2, modification2, true), getRef()); expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS); // canCommit 1st Tx. We don't send the commit so it should timeout. @@ -1080,13 +1089,16 @@ public class ShardTest extends AbstractActorTest { // Ready the Tx's - shard.tell(new ForwardedReadyTransaction(transactionID1, cohort1, modification1, true), getRef()); + shard.tell(new ForwardedReadyTransaction(transactionID1, CURRENT_CLIENT_VERSION, + cohort1, modification1, true), getRef()); expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS); - shard.tell(new ForwardedReadyTransaction(transactionID2, cohort2, modification2, true), getRef()); + shard.tell(new ForwardedReadyTransaction(transactionID2, CURRENT_CLIENT_VERSION, + cohort2, modification2, true), getRef()); expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS); - shard.tell(new ForwardedReadyTransaction(transactionID3, cohort3, modification3, true), getRef()); + shard.tell(new ForwardedReadyTransaction(transactionID3, CURRENT_CLIENT_VERSION, + cohort3, modification3, true), getRef()); expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS); // canCommit 1st Tx. @@ -1149,10 +1161,12 @@ public class ShardTest extends AbstractActorTest { // Simulate the ForwardedReadyTransaction messages that would be sent // by the ShardTransaction. - shard.tell(new ForwardedReadyTransaction(transactionID1, cohort1, modification1, true), getRef()); + shard.tell(new ForwardedReadyTransaction(transactionID1, CURRENT_CLIENT_VERSION, + cohort1, modification1, true), getRef()); expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS); - shard.tell(new ForwardedReadyTransaction(transactionID2, cohort2, modification2, true), getRef()); + shard.tell(new ForwardedReadyTransaction(transactionID2, CURRENT_CLIENT_VERSION, + cohort2, modification2, true), getRef()); expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS); // Send the CanCommitTransaction message for the first Tx. @@ -1345,7 +1359,7 @@ public class ShardTest extends AbstractActorTest { }; } - private NormalizedNode readStore(TestActorRef shard, YangInstanceIdentifier id) + static NormalizedNode readStore(TestActorRef shard, YangInstanceIdentifier id) throws ExecutionException, InterruptedException { DOMStoreReadTransaction transaction = shard.underlyingActor().getDataStore().newReadOnlyTransaction(); diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTransactionFailureTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTransactionFailureTest.java index 6375e3c7fb..261bfeffba 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTransactionFailureTest.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTransactionFailureTest.java @@ -10,16 +10,13 @@ package org.opendaylight.controller.cluster.datastore; -import akka.actor.ActorRef; -import akka.actor.Props; -import akka.pattern.AskTimeoutException; -import akka.testkit.TestActorRef; -import com.google.common.util.concurrent.ListeningExecutorService; -import com.google.common.util.concurrent.MoreExecutors; +import java.util.Collections; +import java.util.concurrent.TimeUnit; import org.junit.BeforeClass; import org.junit.Test; import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier; import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shard.ShardStats; +import org.opendaylight.controller.cluster.datastore.messages.CreateTransaction; import org.opendaylight.controller.cluster.datastore.node.utils.serialization.NormalizedNodeSerializer; import org.opendaylight.controller.md.cluster.datastore.model.TestModel; import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException; @@ -32,8 +29,12 @@ import org.opendaylight.yangtools.yang.model.api.SchemaContext; import scala.concurrent.Await; import scala.concurrent.Future; import scala.concurrent.duration.Duration; -import java.util.Collections; -import java.util.concurrent.TimeUnit; +import akka.actor.ActorRef; +import akka.actor.Props; +import akka.pattern.AskTimeoutException; +import akka.testkit.TestActorRef; +import com.google.common.util.concurrent.ListeningExecutorService; +import com.google.common.util.concurrent.MoreExecutors; /** * Covers negative test cases @@ -75,7 +76,8 @@ public class ShardTransactionFailureTest extends AbstractActorTest { final ActorRef shard = createShard(); final Props props = ShardTransaction.props(store.newReadOnlyTransaction(), shard, - testSchemaContext, datastoreContext, shardStats, "txn"); + testSchemaContext, datastoreContext, shardStats, "txn", + CreateTransaction.CURRENT_CLIENT_VERSION); final TestActorRef subject = TestActorRef .create(getSystem(), props, @@ -104,7 +106,8 @@ public class ShardTransactionFailureTest extends AbstractActorTest { final ActorRef shard = createShard(); final Props props = ShardTransaction.props(store.newReadWriteTransaction(), shard, - testSchemaContext, datastoreContext, shardStats, "txn"); + testSchemaContext, datastoreContext, shardStats, "txn", + CreateTransaction.CURRENT_CLIENT_VERSION); final TestActorRef subject = TestActorRef .create(getSystem(), props, @@ -133,7 +136,8 @@ public class ShardTransactionFailureTest extends AbstractActorTest { final ActorRef shard = createShard(); final Props props = ShardTransaction.props(store.newReadWriteTransaction(), shard, - testSchemaContext, datastoreContext, shardStats, "txn"); + testSchemaContext, datastoreContext, shardStats, "txn", + CreateTransaction.CURRENT_CLIENT_VERSION); final TestActorRef subject = TestActorRef .create(getSystem(), props, @@ -162,7 +166,8 @@ public class ShardTransactionFailureTest extends AbstractActorTest { final ActorRef shard = createShard(); final Props props = ShardTransaction.props(store.newWriteOnlyTransaction(), shard, - testSchemaContext, datastoreContext, shardStats, "txn"); + testSchemaContext, datastoreContext, shardStats, "txn", + CreateTransaction.CURRENT_CLIENT_VERSION); final TestActorRef subject = TestActorRef .create(getSystem(), props, @@ -194,7 +199,8 @@ public class ShardTransactionFailureTest extends AbstractActorTest { final ActorRef shard = createShard(); final Props props = ShardTransaction.props(store.newReadWriteTransaction(), shard, - testSchemaContext, datastoreContext, shardStats, "txn"); + testSchemaContext, datastoreContext, shardStats, "txn", + CreateTransaction.CURRENT_CLIENT_VERSION); final TestActorRef subject = TestActorRef .create(getSystem(), props, @@ -231,7 +237,8 @@ public class ShardTransactionFailureTest extends AbstractActorTest { final ActorRef shard = createShard(); final Props props = ShardTransaction.props(store.newReadWriteTransaction(), shard, - testSchemaContext, datastoreContext, shardStats, "txn"); + testSchemaContext, datastoreContext, shardStats, "txn", + CreateTransaction.CURRENT_CLIENT_VERSION); final TestActorRef subject = TestActorRef .create(getSystem(), props, "testNegativeMergeTransactionReady"); @@ -263,7 +270,8 @@ public class ShardTransactionFailureTest extends AbstractActorTest { final ActorRef shard = createShard(); final Props props = ShardTransaction.props(store.newReadWriteTransaction(), shard, - testSchemaContext, datastoreContext, shardStats, "txn"); + testSchemaContext, datastoreContext, shardStats, "txn", + CreateTransaction.CURRENT_CLIENT_VERSION); final TestActorRef subject = TestActorRef .create(getSystem(), props, diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTransactionHeliumBackwardsCompatibilityTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTransactionHeliumBackwardsCompatibilityTest.java new file mode 100644 index 0000000000..af07aeebcf --- /dev/null +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTransactionHeliumBackwardsCompatibilityTest.java @@ -0,0 +1,184 @@ +/* + * Copyright (c) 2014 Brocade Communications Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.controller.cluster.datastore; + +import java.util.Collections; +import org.junit.Assert; +import org.junit.Test; +import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier; +import org.opendaylight.controller.cluster.datastore.messages.AbortTransactionReply; +import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransactionReply; +import org.opendaylight.controller.cluster.datastore.messages.CommitTransactionReply; +import org.opendaylight.controller.cluster.datastore.messages.PreCommitTransactionReply; +import org.opendaylight.controller.cluster.datastore.messages.ReadyTransaction; +import org.opendaylight.controller.cluster.datastore.messages.ReadyTransactionReply; +import org.opendaylight.controller.cluster.datastore.messages.WriteData; +import org.opendaylight.controller.cluster.datastore.messages.WriteDataReply; +import org.opendaylight.controller.md.cluster.datastore.model.TestModel; +import org.opendaylight.controller.protobuff.messages.cohort3pc.ThreePhaseCommitCohortMessages; +import org.opendaylight.controller.protobuff.messages.transaction.ShardTransactionMessages; +import org.opendaylight.controller.protobuff.messages.transaction.ShardTransactionMessages.CreateTransactionReply; +import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode; +import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes; +import org.opendaylight.yangtools.yang.model.api.SchemaContext; +import scala.concurrent.duration.FiniteDuration; +import akka.actor.ActorRef; +import akka.actor.ActorSelection; +import akka.actor.PoisonPill; +import akka.actor.Props; +import akka.dispatch.Dispatchers; +import akka.testkit.TestActorRef; + +/** + * Tests backwards compatibility support from Helium-1 to Helium. + * + * In Helium-1, the 3-phase commit support was moved from the ThreePhaseCommitCohort actor to the + * Shard. As a consequence, a new transactionId field was added to the CanCommitTransaction, + * CommitTransaction and AbortTransaction messages. With a base Helium version node, these messages + * would be sans transactionId so this test verifies the Shard handles that properly. + * + * @author Thomas Pantelis + */ +public class ShardTransactionHeliumBackwardsCompatibilityTest extends AbstractActorTest { + + @Test + public void testTransactionCommit() throws Exception { + new ShardTestKit(getSystem()) {{ + SchemaContext schemaContext = TestModel.createTestContext(); + Props shardProps = Shard.props(ShardIdentifier.builder().memberName("member-1"). + shardName("inventory").type("config").build(), + Collections.emptyMap(), + DatastoreContext.newBuilder().shardHeartbeatIntervalInMillis(100).build(), + schemaContext).withDispatcher(Dispatchers.DefaultDispatcherId()); + + final TestActorRef shard = TestActorRef.create(getSystem(), shardProps, + "testTransactionCommit"); + + waitUntilLeader(shard); + + // Send CreateTransaction message with no messages version + + String transactionID = "txn-1"; + shard.tell(ShardTransactionMessages.CreateTransaction.newBuilder() + .setTransactionId(transactionID) + .setTransactionType(TransactionProxy.TransactionType.WRITE_ONLY.ordinal()) + .setTransactionChainId("").build(), getRef()); + + final FiniteDuration duration = duration("5 seconds"); + + CreateTransactionReply reply = expectMsgClass(duration, CreateTransactionReply.class); + + ActorSelection txActor = getSystem().actorSelection(reply.getTransactionActorPath()); + + // Write data to the Tx + + txActor.tell(new WriteData(TestModel.TEST_PATH, + ImmutableNodes.containerNode(TestModel.TEST_QNAME), schemaContext), getRef()); + + expectMsgClass(duration, WriteDataReply.class); + + // Ready the Tx + + txActor.tell(new ReadyTransaction().toSerializable(), getRef()); + + ReadyTransactionReply readyReply = ReadyTransactionReply.fromSerializable(expectMsgClass( + duration, ReadyTransactionReply.SERIALIZABLE_CLASS)); + + ActorSelection cohortActor = getSystem().actorSelection(readyReply.getCohortPath()); + + // Send the CanCommitTransaction message with no transactionId. + + cohortActor.tell(ThreePhaseCommitCohortMessages.CanCommitTransaction.newBuilder().build(), + getRef()); + + expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS); + + // Send the PreCommitTransaction message with no transactionId. + + cohortActor.tell(ThreePhaseCommitCohortMessages.PreCommitTransaction.newBuilder().build(), + getRef()); + + expectMsgClass(duration, PreCommitTransactionReply.SERIALIZABLE_CLASS); + + // Send the CommitTransaction message with no transactionId. + + cohortActor.tell(ThreePhaseCommitCohortMessages.CommitTransaction.newBuilder().build(), + getRef()); + + expectMsgClass(duration, CommitTransactionReply.SERIALIZABLE_CLASS); + + NormalizedNode node = ShardTest.readStore(shard, TestModel.TEST_PATH); + Assert.assertNotNull("Data not found in store", node); + + shard.tell(PoisonPill.getInstance(), ActorRef.noSender()); + }}; + } + + @Test + public void testTransactionAbort() throws Exception { + new ShardTestKit(getSystem()) {{ + SchemaContext schemaContext = TestModel.createTestContext(); + Props shardProps = Shard.props(ShardIdentifier.builder().memberName("member-1"). + shardName("inventory").type("config").build(), + Collections.emptyMap(), + DatastoreContext.newBuilder().shardHeartbeatIntervalInMillis(100).build(), + schemaContext).withDispatcher(Dispatchers.DefaultDispatcherId()); + + final TestActorRef shard = TestActorRef.create(getSystem(), shardProps, + "testTransactionAbort"); + + waitUntilLeader(shard); + + // Send CreateTransaction message with no messages version + + String transactionID = "txn-1"; + shard.tell(ShardTransactionMessages.CreateTransaction.newBuilder() + .setTransactionId(transactionID) + .setTransactionType(TransactionProxy.TransactionType.WRITE_ONLY.ordinal()) + .setTransactionChainId("").build(), getRef()); + + final FiniteDuration duration = duration("5 seconds"); + + CreateTransactionReply reply = expectMsgClass(duration, CreateTransactionReply.class); + + ActorSelection txActor = getSystem().actorSelection(reply.getTransactionActorPath()); + + // Write data to the Tx + + txActor.tell(new WriteData(TestModel.TEST_PATH, + ImmutableNodes.containerNode(TestModel.TEST_QNAME), schemaContext), getRef()); + + expectMsgClass(duration, WriteDataReply.class); + + // Ready the Tx + + txActor.tell(new ReadyTransaction().toSerializable(), getRef()); + + ReadyTransactionReply readyReply = ReadyTransactionReply.fromSerializable(expectMsgClass( + duration, ReadyTransactionReply.SERIALIZABLE_CLASS)); + + ActorSelection cohortActor = getSystem().actorSelection(readyReply.getCohortPath()); + + // Send the CanCommitTransaction message with no transactionId. + + cohortActor.tell(ThreePhaseCommitCohortMessages.CanCommitTransaction.newBuilder().build(), + getRef()); + + expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS); + + // Send the AbortTransaction message with no transactionId. + + cohortActor.tell(ThreePhaseCommitCohortMessages.AbortTransaction.newBuilder().build(), + getRef()); + + expectMsgClass(duration, AbortTransactionReply.SERIALIZABLE_CLASS); + + shard.tell(PoisonPill.getInstance(), ActorRef.noSender()); + }}; + } +} diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTransactionTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTransactionTest.java index 793df8e0ca..cc3bb55e85 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTransactionTest.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTransactionTest.java @@ -1,12 +1,11 @@ package org.opendaylight.controller.cluster.datastore; -import akka.actor.ActorRef; -import akka.actor.Props; -import akka.actor.Terminated; -import akka.testkit.JavaTestKit; -import akka.testkit.TestActorRef; -import com.google.common.util.concurrent.ListeningExecutorService; -import com.google.common.util.concurrent.MoreExecutors; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; +import java.util.Collections; +import java.util.concurrent.TimeUnit; import org.junit.BeforeClass; import org.junit.Test; import org.opendaylight.controller.cluster.datastore.ShardWriteTransaction.GetCompositeModificationReply; @@ -15,6 +14,7 @@ import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shard.ShardStats; import org.opendaylight.controller.cluster.datastore.messages.CloseTransaction; import org.opendaylight.controller.cluster.datastore.messages.CloseTransactionReply; +import org.opendaylight.controller.cluster.datastore.messages.CreateTransaction; import org.opendaylight.controller.cluster.datastore.messages.DataExists; import org.opendaylight.controller.cluster.datastore.messages.DataExistsReply; import org.opendaylight.controller.cluster.datastore.messages.DeleteData; @@ -39,12 +39,13 @@ import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes; import org.opendaylight.yangtools.yang.model.api.SchemaContext; import scala.concurrent.duration.Duration; -import java.util.Collections; -import java.util.concurrent.TimeUnit; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertNotNull; -import static org.junit.Assert.assertTrue; +import akka.actor.ActorRef; +import akka.actor.Props; +import akka.actor.Terminated; +import akka.testkit.JavaTestKit; +import akka.testkit.TestActorRef; +import com.google.common.util.concurrent.ListeningExecutorService; +import com.google.common.util.concurrent.MoreExecutors; public class ShardTransactionTest extends AbstractActorTest { private static ListeningExecutorService storeExecutor = @@ -78,12 +79,14 @@ public class ShardTransactionTest extends AbstractActorTest { new JavaTestKit(getSystem()) {{ final ActorRef shard = createShard(); Props props = ShardTransaction.props(store.newReadOnlyTransaction(), shard, - testSchemaContext, datastoreContext, shardStats, "txn"); + testSchemaContext, datastoreContext, shardStats, "txn", + CreateTransaction.CURRENT_CLIENT_VERSION); testOnReceiveReadData(getSystem().actorOf(props, "testReadDataRO")); props = ShardTransaction.props(store.newReadWriteTransaction(), shard, - testSchemaContext, datastoreContext, shardStats, "txn"); + testSchemaContext, datastoreContext, shardStats, "txn", + CreateTransaction.CURRENT_CLIENT_VERSION); testOnReceiveReadData(getSystem().actorOf(props, "testReadDataRW")); } @@ -114,13 +117,15 @@ public class ShardTransactionTest extends AbstractActorTest { new JavaTestKit(getSystem()) {{ final ActorRef shard = createShard(); Props props = ShardTransaction.props( store.newReadOnlyTransaction(), shard, - testSchemaContext, datastoreContext, shardStats, "txn"); + testSchemaContext, datastoreContext, shardStats, "txn", + CreateTransaction.CURRENT_CLIENT_VERSION); testOnReceiveReadDataWhenDataNotFound(getSystem().actorOf( props, "testReadDataWhenDataNotFoundRO")); props = ShardTransaction.props( store.newReadWriteTransaction(), shard, - testSchemaContext, datastoreContext, shardStats, "txn"); + testSchemaContext, datastoreContext, shardStats, "txn", + CreateTransaction.CURRENT_CLIENT_VERSION); testOnReceiveReadDataWhenDataNotFound(getSystem().actorOf( props, "testReadDataWhenDataNotFoundRW")); @@ -150,12 +155,14 @@ public class ShardTransactionTest extends AbstractActorTest { new JavaTestKit(getSystem()) {{ final ActorRef shard = createShard(); Props props = ShardTransaction.props(store.newReadOnlyTransaction(), shard, - testSchemaContext, datastoreContext, shardStats, "txn"); + testSchemaContext, datastoreContext, shardStats, "txn", + CreateTransaction.CURRENT_CLIENT_VERSION); testOnReceiveDataExistsPositive(getSystem().actorOf(props, "testDataExistsPositiveRO")); props = ShardTransaction.props(store.newReadWriteTransaction(), shard, - testSchemaContext, datastoreContext, shardStats, "txn"); + testSchemaContext, datastoreContext, shardStats, "txn", + CreateTransaction.CURRENT_CLIENT_VERSION); testOnReceiveDataExistsPositive(getSystem().actorOf(props, "testDataExistsPositiveRW")); } @@ -183,12 +190,14 @@ public class ShardTransactionTest extends AbstractActorTest { new JavaTestKit(getSystem()) {{ final ActorRef shard = createShard(); Props props = ShardTransaction.props(store.newReadOnlyTransaction(), shard, - testSchemaContext, datastoreContext, shardStats, "txn"); + testSchemaContext, datastoreContext, shardStats, "txn", + CreateTransaction.CURRENT_CLIENT_VERSION); testOnReceiveDataExistsNegative(getSystem().actorOf(props, "testDataExistsNegativeRO")); props = ShardTransaction.props(store.newReadWriteTransaction(), shard, - testSchemaContext, datastoreContext, shardStats, "txn"); + testSchemaContext, datastoreContext, shardStats, "txn", + CreateTransaction.CURRENT_CLIENT_VERSION); testOnReceiveDataExistsNegative(getSystem().actorOf(props, "testDataExistsNegativeRW")); } @@ -228,7 +237,8 @@ public class ShardTransactionTest extends AbstractActorTest { new JavaTestKit(getSystem()) {{ final ActorRef shard = createShard(); final Props props = ShardTransaction.props(store.newWriteOnlyTransaction(), shard, - testSchemaContext, datastoreContext, shardStats, "txn"); + testSchemaContext, datastoreContext, shardStats, "txn", + CreateTransaction.CURRENT_CLIENT_VERSION); final ActorRef transaction = getSystem().actorOf(props, "testWriteData"); transaction.tell(new WriteData(TestModel.TEST_PATH, @@ -254,7 +264,8 @@ public class ShardTransactionTest extends AbstractActorTest { new JavaTestKit(getSystem()) {{ final ActorRef shard = createShard(); final Props props = ShardTransaction.props(store.newReadWriteTransaction(), shard, - testSchemaContext, datastoreContext, shardStats, "txn"); + testSchemaContext, datastoreContext, shardStats, "txn", + CreateTransaction.CURRENT_CLIENT_VERSION); final ActorRef transaction = getSystem().actorOf(props, "testMergeData"); transaction.tell(new MergeData(TestModel.TEST_PATH, @@ -279,7 +290,8 @@ public class ShardTransactionTest extends AbstractActorTest { new JavaTestKit(getSystem()) {{ final ActorRef shard = createShard(); final Props props = ShardTransaction.props( store.newWriteOnlyTransaction(), shard, - testSchemaContext, datastoreContext, shardStats, "txn"); + testSchemaContext, datastoreContext, shardStats, "txn", + CreateTransaction.CURRENT_CLIENT_VERSION); final ActorRef transaction = getSystem().actorOf(props, "testDeleteData"); transaction.tell(new DeleteData(TestModel.TEST_PATH).toSerializable(), getRef()); @@ -301,7 +313,8 @@ public class ShardTransactionTest extends AbstractActorTest { new JavaTestKit(getSystem()) {{ final ActorRef shard = createShard(); final Props props = ShardTransaction.props( store.newReadWriteTransaction(), shard, - testSchemaContext, datastoreContext, shardStats, "txn"); + testSchemaContext, datastoreContext, shardStats, "txn", + CreateTransaction.CURRENT_CLIENT_VERSION); final ActorRef transaction = getSystem().actorOf(props, "testReadyTransaction"); watch(transaction); @@ -318,7 +331,8 @@ public class ShardTransactionTest extends AbstractActorTest { new JavaTestKit(getSystem()) {{ final ActorRef shard = createShard(); final Props props = ShardTransaction.props( store.newReadWriteTransaction(), shard, - testSchemaContext, datastoreContext, shardStats, "txn"); + testSchemaContext, datastoreContext, shardStats, "txn", + CreateTransaction.CURRENT_CLIENT_VERSION); final ActorRef transaction = getSystem().actorOf(props, "testReadyTransaction2"); watch(transaction); @@ -339,7 +353,8 @@ public class ShardTransactionTest extends AbstractActorTest { new JavaTestKit(getSystem()) {{ final ActorRef shard = createShard(); final Props props = ShardTransaction.props(store.newReadWriteTransaction(), shard, - testSchemaContext, datastoreContext, shardStats, "txn"); + testSchemaContext, datastoreContext, shardStats, "txn", + CreateTransaction.CURRENT_CLIENT_VERSION); final ActorRef transaction = getSystem().actorOf(props, "testCloseTransaction"); watch(transaction); @@ -355,7 +370,8 @@ public class ShardTransactionTest extends AbstractActorTest { public void testNegativePerformingWriteOperationOnReadTransaction() throws Exception { final ActorRef shard = createShard(); final Props props = ShardTransaction.props(store.newReadOnlyTransaction(), shard, - testSchemaContext, datastoreContext, shardStats, "txn"); + testSchemaContext, datastoreContext, shardStats, "txn", + CreateTransaction.CURRENT_CLIENT_VERSION); final TestActorRef transaction = TestActorRef.apply(props,getSystem()); transaction.receive(new DeleteData(TestModel.TEST_PATH).toSerializable(), ActorRef.noSender()); @@ -370,7 +386,8 @@ public class ShardTransactionTest extends AbstractActorTest { new JavaTestKit(getSystem()) {{ final ActorRef shard = createShard(); final Props props = ShardTransaction.props(store.newReadWriteTransaction(), shard, - testSchemaContext, datastoreContext, shardStats, "txn"); + testSchemaContext, datastoreContext, shardStats, "txn", + CreateTransaction.CURRENT_CLIENT_VERSION); final ActorRef transaction = getSystem().actorOf(props, "testShardTransactionInactivity");