From: Tom Pantelis Date: Wed, 10 Sep 2014 16:50:57 +0000 (+0000) Subject: Merge "BUG 1712 - Distributed DataStore does not work properly with Transaction... X-Git-Tag: release/helium~113 X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=commitdiff_plain;h=93bec87a3187c32ed3a2a684523b66db3b46645a;hp=76cc965bd2d9978fedcbe19603c98e7752abf5a8 Merge "BUG 1712 - Distributed DataStore does not work properly with Transaction Chains" --- diff --git a/opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/protobuff/messages/transaction/ShardTransactionChainMessages.java b/opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/protobuff/messages/transaction/ShardTransactionChainMessages.java index 63dd5e7081..d956bb174b 100644 --- a/opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/protobuff/messages/transaction/ShardTransactionChainMessages.java +++ b/opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/protobuff/messages/transaction/ShardTransactionChainMessages.java @@ -10,6 +10,21 @@ public final class ShardTransactionChainMessages { } public interface CloseTransactionChainOrBuilder extends com.google.protobuf.MessageOrBuilder { + + // optional string transactionChainId = 1; + /** + * optional string transactionChainId = 1; + */ + boolean hasTransactionChainId(); + /** + * optional string transactionChainId = 1; + */ + java.lang.String getTransactionChainId(); + /** + * optional string transactionChainId = 1; + */ + com.google.protobuf.ByteString + getTransactionChainIdBytes(); } /** * Protobuf type {@code org.opendaylight.controller.mdsal.CloseTransactionChain} @@ -44,6 +59,7 @@ public final class ShardTransactionChainMessages { com.google.protobuf.ExtensionRegistryLite extensionRegistry) throws com.google.protobuf.InvalidProtocolBufferException { initFields(); + int mutable_bitField0_ = 0; com.google.protobuf.UnknownFieldSet.Builder unknownFields = com.google.protobuf.UnknownFieldSet.newBuilder(); try { @@ -61,6 +77,11 @@ public final class ShardTransactionChainMessages { } break; } + case 10: { + bitField0_ |= 0x00000001; + transactionChainId_ = input.readBytes(); + break; + } } } } catch (com.google.protobuf.InvalidProtocolBufferException e) { @@ -100,7 +121,52 @@ public final class ShardTransactionChainMessages { return PARSER; } + private int bitField0_; + // optional string transactionChainId = 1; + public static final int TRANSACTIONCHAINID_FIELD_NUMBER = 1; + private java.lang.Object transactionChainId_; + /** + * optional string transactionChainId = 1; + */ + public boolean hasTransactionChainId() { + return ((bitField0_ & 0x00000001) == 0x00000001); + } + /** + * optional string transactionChainId = 1; + */ + public java.lang.String getTransactionChainId() { + java.lang.Object ref = transactionChainId_; + if (ref instanceof java.lang.String) { + return (java.lang.String) ref; + } else { + com.google.protobuf.ByteString bs = + (com.google.protobuf.ByteString) ref; + java.lang.String s = bs.toStringUtf8(); + if (bs.isValidUtf8()) { + transactionChainId_ = s; + } + return s; + } + } + /** + * optional string transactionChainId = 1; + */ + public com.google.protobuf.ByteString + getTransactionChainIdBytes() { + java.lang.Object ref = transactionChainId_; + if (ref instanceof java.lang.String) { + com.google.protobuf.ByteString b = + com.google.protobuf.ByteString.copyFromUtf8( + (java.lang.String) ref); + transactionChainId_ = b; + return b; + } else { + return (com.google.protobuf.ByteString) ref; + } + } + private void initFields() { + transactionChainId_ = ""; } private byte memoizedIsInitialized = -1; public final boolean isInitialized() { @@ -114,6 +180,9 @@ public final class ShardTransactionChainMessages { public void writeTo(com.google.protobuf.CodedOutputStream output) throws java.io.IOException { getSerializedSize(); + if (((bitField0_ & 0x00000001) == 0x00000001)) { + output.writeBytes(1, getTransactionChainIdBytes()); + } getUnknownFields().writeTo(output); } @@ -123,6 +192,10 @@ public final class ShardTransactionChainMessages { if (size != -1) return size; size = 0; + if (((bitField0_ & 0x00000001) == 0x00000001)) { + size += com.google.protobuf.CodedOutputStream + .computeBytesSize(1, getTransactionChainIdBytes()); + } size += getUnknownFields().getSerializedSize(); memoizedSerializedSize = size; return size; @@ -239,6 +312,8 @@ public final class ShardTransactionChainMessages { public Builder clear() { super.clear(); + transactionChainId_ = ""; + bitField0_ = (bitField0_ & ~0x00000001); return this; } @@ -265,6 +340,13 @@ public final class ShardTransactionChainMessages { public org.opendaylight.controller.protobuff.messages.transaction.ShardTransactionChainMessages.CloseTransactionChain buildPartial() { org.opendaylight.controller.protobuff.messages.transaction.ShardTransactionChainMessages.CloseTransactionChain result = new org.opendaylight.controller.protobuff.messages.transaction.ShardTransactionChainMessages.CloseTransactionChain(this); + int from_bitField0_ = bitField0_; + int to_bitField0_ = 0; + if (((from_bitField0_ & 0x00000001) == 0x00000001)) { + to_bitField0_ |= 0x00000001; + } + result.transactionChainId_ = transactionChainId_; + result.bitField0_ = to_bitField0_; onBuilt(); return result; } @@ -280,6 +362,11 @@ public final class ShardTransactionChainMessages { public Builder mergeFrom(org.opendaylight.controller.protobuff.messages.transaction.ShardTransactionChainMessages.CloseTransactionChain other) { if (other == org.opendaylight.controller.protobuff.messages.transaction.ShardTransactionChainMessages.CloseTransactionChain.getDefaultInstance()) return this; + if (other.hasTransactionChainId()) { + bitField0_ |= 0x00000001; + transactionChainId_ = other.transactionChainId_; + onChanged(); + } this.mergeUnknownFields(other.getUnknownFields()); return this; } @@ -305,6 +392,81 @@ public final class ShardTransactionChainMessages { } return this; } + private int bitField0_; + + // optional string transactionChainId = 1; + private java.lang.Object transactionChainId_ = ""; + /** + * optional string transactionChainId = 1; + */ + public boolean hasTransactionChainId() { + return ((bitField0_ & 0x00000001) == 0x00000001); + } + /** + * optional string transactionChainId = 1; + */ + public java.lang.String getTransactionChainId() { + java.lang.Object ref = transactionChainId_; + if (!(ref instanceof java.lang.String)) { + java.lang.String s = ((com.google.protobuf.ByteString) ref) + .toStringUtf8(); + transactionChainId_ = s; + return s; + } else { + return (java.lang.String) ref; + } + } + /** + * optional string transactionChainId = 1; + */ + public com.google.protobuf.ByteString + getTransactionChainIdBytes() { + java.lang.Object ref = transactionChainId_; + if (ref instanceof String) { + com.google.protobuf.ByteString b = + com.google.protobuf.ByteString.copyFromUtf8( + (java.lang.String) ref); + transactionChainId_ = b; + return b; + } else { + return (com.google.protobuf.ByteString) ref; + } + } + /** + * optional string transactionChainId = 1; + */ + public Builder setTransactionChainId( + java.lang.String value) { + if (value == null) { + throw new NullPointerException(); + } + bitField0_ |= 0x00000001; + transactionChainId_ = value; + onChanged(); + return this; + } + /** + * optional string transactionChainId = 1; + */ + public Builder clearTransactionChainId() { + bitField0_ = (bitField0_ & ~0x00000001); + transactionChainId_ = getDefaultInstance().getTransactionChainId(); + onChanged(); + return this; + } + /** + * optional string transactionChainId = 1; + */ + public Builder setTransactionChainIdBytes( + com.google.protobuf.ByteString value) { + if (value == null) { + throw new NullPointerException(); + } + bitField0_ |= 0x00000001; + transactionChainId_ = value; + onChanged(); + return this; + } // @@protoc_insertion_point(builder_scope:org.opendaylight.controller.mdsal.CloseTransactionChain) } @@ -1444,13 +1606,14 @@ public final class ShardTransactionChainMessages { static { java.lang.String[] descriptorData = { "\n\033ShardTransactionChain.proto\022!org.opend" + - "aylight.controller.mdsal\"\027\n\025CloseTransac" + - "tionChain\"\034\n\032CloseTransactionChainReply\"" + - "\030\n\026CreateTransactionChain\";\n\033CreateTrans" + - "actionChainReply\022\034\n\024transactionChainPath" + - "\030\001 \002(\tB[\n:org.opendaylight.controller.pr" + - "otobuff.messages.transactionB\035ShardTrans" + - "actionChainMessages" + "aylight.controller.mdsal\"3\n\025CloseTransac" + + "tionChain\022\032\n\022transactionChainId\030\001 \001(\t\"\034\n" + + "\032CloseTransactionChainReply\"\030\n\026CreateTra" + + "nsactionChain\";\n\033CreateTransactionChainR" + + "eply\022\034\n\024transactionChainPath\030\001 \002(\tB[\n:or" + + "g.opendaylight.controller.protobuff.mess" + + "ages.transactionB\035ShardTransactionChainM" + + "essages" }; com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner assigner = new com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner() { @@ -1462,7 +1625,7 @@ public final class ShardTransactionChainMessages { internal_static_org_opendaylight_controller_mdsal_CloseTransactionChain_fieldAccessorTable = new com.google.protobuf.GeneratedMessage.FieldAccessorTable( internal_static_org_opendaylight_controller_mdsal_CloseTransactionChain_descriptor, - new java.lang.String[] { }); + new java.lang.String[] { "TransactionChainId", }); internal_static_org_opendaylight_controller_mdsal_CloseTransactionChainReply_descriptor = getDescriptor().getMessageTypes().get(1); internal_static_org_opendaylight_controller_mdsal_CloseTransactionChainReply_fieldAccessorTable = new diff --git a/opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/protobuff/messages/transaction/ShardTransactionMessages.java b/opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/protobuff/messages/transaction/ShardTransactionMessages.java index ded80713fb..96a39bddd3 100644 --- a/opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/protobuff/messages/transaction/ShardTransactionMessages.java +++ b/opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/protobuff/messages/transaction/ShardTransactionMessages.java @@ -653,6 +653,21 @@ public final class ShardTransactionMessages { * required int32 transactionType = 2; */ int getTransactionType(); + + // optional string transactionChainId = 3; + /** + * optional string transactionChainId = 3; + */ + boolean hasTransactionChainId(); + /** + * optional string transactionChainId = 3; + */ + java.lang.String getTransactionChainId(); + /** + * optional string transactionChainId = 3; + */ + com.google.protobuf.ByteString + getTransactionChainIdBytes(); } /** * Protobuf type {@code org.opendaylight.controller.mdsal.CreateTransaction} @@ -715,6 +730,11 @@ public final class ShardTransactionMessages { transactionType_ = input.readInt32(); break; } + case 26: { + bitField0_ |= 0x00000004; + transactionChainId_ = input.readBytes(); + break; + } } } } catch (com.google.protobuf.InvalidProtocolBufferException e) { @@ -814,9 +834,53 @@ public final class ShardTransactionMessages { return transactionType_; } + // optional string transactionChainId = 3; + public static final int TRANSACTIONCHAINID_FIELD_NUMBER = 3; + private java.lang.Object transactionChainId_; + /** + * optional string transactionChainId = 3; + */ + public boolean hasTransactionChainId() { + return ((bitField0_ & 0x00000004) == 0x00000004); + } + /** + * optional string transactionChainId = 3; + */ + public java.lang.String getTransactionChainId() { + java.lang.Object ref = transactionChainId_; + if (ref instanceof java.lang.String) { + return (java.lang.String) ref; + } else { + com.google.protobuf.ByteString bs = + (com.google.protobuf.ByteString) ref; + java.lang.String s = bs.toStringUtf8(); + if (bs.isValidUtf8()) { + transactionChainId_ = s; + } + return s; + } + } + /** + * optional string transactionChainId = 3; + */ + public com.google.protobuf.ByteString + getTransactionChainIdBytes() { + java.lang.Object ref = transactionChainId_; + if (ref instanceof java.lang.String) { + com.google.protobuf.ByteString b = + com.google.protobuf.ByteString.copyFromUtf8( + (java.lang.String) ref); + transactionChainId_ = b; + return b; + } else { + return (com.google.protobuf.ByteString) ref; + } + } + private void initFields() { transactionId_ = ""; transactionType_ = 0; + transactionChainId_ = ""; } private byte memoizedIsInitialized = -1; public final boolean isInitialized() { @@ -844,6 +908,9 @@ public final class ShardTransactionMessages { if (((bitField0_ & 0x00000002) == 0x00000002)) { output.writeInt32(2, transactionType_); } + if (((bitField0_ & 0x00000004) == 0x00000004)) { + output.writeBytes(3, getTransactionChainIdBytes()); + } getUnknownFields().writeTo(output); } @@ -861,6 +928,10 @@ public final class ShardTransactionMessages { size += com.google.protobuf.CodedOutputStream .computeInt32Size(2, transactionType_); } + if (((bitField0_ & 0x00000004) == 0x00000004)) { + size += com.google.protobuf.CodedOutputStream + .computeBytesSize(3, getTransactionChainIdBytes()); + } size += getUnknownFields().getSerializedSize(); memoizedSerializedSize = size; return size; @@ -981,6 +1052,8 @@ public final class ShardTransactionMessages { bitField0_ = (bitField0_ & ~0x00000001); transactionType_ = 0; bitField0_ = (bitField0_ & ~0x00000002); + transactionChainId_ = ""; + bitField0_ = (bitField0_ & ~0x00000004); return this; } @@ -1017,6 +1090,10 @@ public final class ShardTransactionMessages { to_bitField0_ |= 0x00000002; } result.transactionType_ = transactionType_; + if (((from_bitField0_ & 0x00000004) == 0x00000004)) { + to_bitField0_ |= 0x00000004; + } + result.transactionChainId_ = transactionChainId_; result.bitField0_ = to_bitField0_; onBuilt(); return result; @@ -1041,6 +1118,11 @@ public final class ShardTransactionMessages { if (other.hasTransactionType()) { setTransactionType(other.getTransactionType()); } + if (other.hasTransactionChainId()) { + bitField0_ |= 0x00000004; + transactionChainId_ = other.transactionChainId_; + onChanged(); + } this.mergeUnknownFields(other.getUnknownFields()); return this; } @@ -1183,6 +1265,80 @@ public final class ShardTransactionMessages { return this; } + // optional string transactionChainId = 3; + private java.lang.Object transactionChainId_ = ""; + /** + * optional string transactionChainId = 3; + */ + public boolean hasTransactionChainId() { + return ((bitField0_ & 0x00000004) == 0x00000004); + } + /** + * optional string transactionChainId = 3; + */ + public java.lang.String getTransactionChainId() { + java.lang.Object ref = transactionChainId_; + if (!(ref instanceof java.lang.String)) { + java.lang.String s = ((com.google.protobuf.ByteString) ref) + .toStringUtf8(); + transactionChainId_ = s; + return s; + } else { + return (java.lang.String) ref; + } + } + /** + * optional string transactionChainId = 3; + */ + public com.google.protobuf.ByteString + getTransactionChainIdBytes() { + java.lang.Object ref = transactionChainId_; + if (ref instanceof String) { + com.google.protobuf.ByteString b = + com.google.protobuf.ByteString.copyFromUtf8( + (java.lang.String) ref); + transactionChainId_ = b; + return b; + } else { + return (com.google.protobuf.ByteString) ref; + } + } + /** + * optional string transactionChainId = 3; + */ + public Builder setTransactionChainId( + java.lang.String value) { + if (value == null) { + throw new NullPointerException(); + } + bitField0_ |= 0x00000004; + transactionChainId_ = value; + onChanged(); + return this; + } + /** + * optional string transactionChainId = 3; + */ + public Builder clearTransactionChainId() { + bitField0_ = (bitField0_ & ~0x00000004); + transactionChainId_ = getDefaultInstance().getTransactionChainId(); + onChanged(); + return this; + } + /** + * optional string transactionChainId = 3; + */ + public Builder setTransactionChainIdBytes( + com.google.protobuf.ByteString value) { + if (value == null) { + throw new NullPointerException(); + } + bitField0_ |= 0x00000004; + transactionChainId_ = value; + onChanged(); + return this; + } + // @@protoc_insertion_point(builder_scope:org.opendaylight.controller.mdsal.CreateTransaction) } @@ -7597,36 +7753,37 @@ public final class ShardTransactionMessages { java.lang.String[] descriptorData = { "\n\026ShardTransaction.proto\022!org.opendaylig" + "ht.controller.mdsal\032\014Common.proto\"\022\n\020Clo" + - "seTransaction\"\027\n\025CloseTransactionReply\"C" + + "seTransaction\"\027\n\025CloseTransactionReply\"_" + "\n\021CreateTransaction\022\025\n\rtransactionId\030\001 \002" + - "(\t\022\027\n\017transactionType\030\002 \002(\005\"M\n\026CreateTra" + - "nsactionReply\022\034\n\024transactionActorPath\030\001 " + - "\002(\t\022\025\n\rtransactionId\030\002 \002(\t\"\022\n\020ReadyTrans" + - "action\"*\n\025ReadyTransactionReply\022\021\n\tactor" + - "Path\030\001 \002(\t\"l\n\nDeleteData\022^\n\037instanceIden" + - "tifierPathArguments\030\001 \002(\01325.org.opendayl", - "ight.controller.mdsal.InstanceIdentifier" + - "\"\021\n\017DeleteDataReply\"j\n\010ReadData\022^\n\037insta" + - "nceIdentifierPathArguments\030\001 \002(\01325.org.o" + - "pendaylight.controller.mdsal.InstanceIde" + - "ntifier\"P\n\rReadDataReply\022?\n\016normalizedNo" + - "de\030\001 \001(\0132\'.org.opendaylight.controller.m" + - "dsal.Node\"\254\001\n\tWriteData\022^\n\037instanceIdent" + - "ifierPathArguments\030\001 \002(\01325.org.opendayli" + - "ght.controller.mdsal.InstanceIdentifier\022" + - "?\n\016normalizedNode\030\002 \002(\0132\'.org.opendaylig", - "ht.controller.mdsal.Node\"\020\n\016WriteDataRep" + - "ly\"\254\001\n\tMergeData\022^\n\037instanceIdentifierPa" + - "thArguments\030\001 \002(\01325.org.opendaylight.con" + - "troller.mdsal.InstanceIdentifier\022?\n\016norm" + - "alizedNode\030\002 \002(\0132\'.org.opendaylight.cont" + - "roller.mdsal.Node\"\020\n\016MergeDataReply\"l\n\nD" + - "ataExists\022^\n\037instanceIdentifierPathArgum" + - "ents\030\001 \002(\01325.org.opendaylight.controller" + - ".mdsal.InstanceIdentifier\"!\n\017DataExistsR" + - "eply\022\016\n\006exists\030\001 \002(\010BV\n:org.opendaylight", - ".controller.protobuff.messages.transacti" + - "onB\030ShardTransactionMessages" + "(\t\022\027\n\017transactionType\030\002 \002(\005\022\032\n\022transacti" + + "onChainId\030\003 \001(\t\"M\n\026CreateTransactionRepl" + + "y\022\034\n\024transactionActorPath\030\001 \002(\t\022\025\n\rtrans" + + "actionId\030\002 \002(\t\"\022\n\020ReadyTransaction\"*\n\025Re" + + "adyTransactionReply\022\021\n\tactorPath\030\001 \002(\t\"l" + + "\n\nDeleteData\022^\n\037instanceIdentifierPathAr", + "guments\030\001 \002(\01325.org.opendaylight.control" + + "ler.mdsal.InstanceIdentifier\"\021\n\017DeleteDa" + + "taReply\"j\n\010ReadData\022^\n\037instanceIdentifie" + + "rPathArguments\030\001 \002(\01325.org.opendaylight." + + "controller.mdsal.InstanceIdentifier\"P\n\rR" + + "eadDataReply\022?\n\016normalizedNode\030\001 \001(\0132\'.o" + + "rg.opendaylight.controller.mdsal.Node\"\254\001" + + "\n\tWriteData\022^\n\037instanceIdentifierPathArg" + + "uments\030\001 \002(\01325.org.opendaylight.controll" + + "er.mdsal.InstanceIdentifier\022?\n\016normalize", + "dNode\030\002 \002(\0132\'.org.opendaylight.controlle" + + "r.mdsal.Node\"\020\n\016WriteDataReply\"\254\001\n\tMerge" + + "Data\022^\n\037instanceIdentifierPathArguments\030" + + "\001 \002(\01325.org.opendaylight.controller.mdsa" + + "l.InstanceIdentifier\022?\n\016normalizedNode\030\002" + + " \002(\0132\'.org.opendaylight.controller.mdsal" + + ".Node\"\020\n\016MergeDataReply\"l\n\nDataExists\022^\n" + + "\037instanceIdentifierPathArguments\030\001 \002(\01325" + + ".org.opendaylight.controller.mdsal.Insta" + + "nceIdentifier\"!\n\017DataExistsReply\022\016\n\006exis", + "ts\030\001 \002(\010BV\n:org.opendaylight.controller." + + "protobuff.messages.transactionB\030ShardTra" + + "nsactionMessages" }; com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner assigner = new com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner() { @@ -7650,7 +7807,7 @@ public final class ShardTransactionMessages { internal_static_org_opendaylight_controller_mdsal_CreateTransaction_fieldAccessorTable = new com.google.protobuf.GeneratedMessage.FieldAccessorTable( internal_static_org_opendaylight_controller_mdsal_CreateTransaction_descriptor, - new java.lang.String[] { "TransactionId", "TransactionType", }); + new java.lang.String[] { "TransactionId", "TransactionType", "TransactionChainId", }); internal_static_org_opendaylight_controller_mdsal_CreateTransactionReply_descriptor = getDescriptor().getMessageTypes().get(3); internal_static_org_opendaylight_controller_mdsal_CreateTransactionReply_fieldAccessorTable = new diff --git a/opendaylight/md-sal/sal-clustering-commons/src/main/resources/ShardTransaction.proto b/opendaylight/md-sal/sal-clustering-commons/src/main/resources/ShardTransaction.proto index 63b75ac430..26581478d9 100644 --- a/opendaylight/md-sal/sal-clustering-commons/src/main/resources/ShardTransaction.proto +++ b/opendaylight/md-sal/sal-clustering-commons/src/main/resources/ShardTransaction.proto @@ -15,6 +15,7 @@ message CloseTransactionReply{ message CreateTransaction{ required string transactionId = 1; required int32 transactionType =2; + optional string transactionChainId = 3; } message CreateTransactionReply{ diff --git a/opendaylight/md-sal/sal-clustering-commons/src/main/resources/ShardTransactionChain.proto b/opendaylight/md-sal/sal-clustering-commons/src/main/resources/ShardTransactionChain.proto index 42f87cbda6..5dc67aa98f 100644 --- a/opendaylight/md-sal/sal-clustering-commons/src/main/resources/ShardTransactionChain.proto +++ b/opendaylight/md-sal/sal-clustering-commons/src/main/resources/ShardTransactionChain.proto @@ -4,20 +4,10 @@ option java_package = "org.opendaylight.controller.protobuff.messages.transactio option java_outer_classname = "ShardTransactionChainMessages"; message CloseTransactionChain { - + optional string transactionChainId = 1; } message CloseTransactionChainReply{ - -} - -message CreateTransactionChain { - -} - -message CreateTransactionChainReply{ -required string transactionChainPath = 1; - } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Configuration.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Configuration.java index 34239070a3..b264606053 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Configuration.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Configuration.java @@ -13,6 +13,7 @@ import org.opendaylight.controller.cluster.datastore.shardstrategy.ShardStrategy import java.util.List; import java.util.Map; +import java.util.Set; public interface Configuration { @@ -52,4 +53,10 @@ public interface Configuration { * @return */ List getMembersFromShardName(String shardName); + + /** + * + * @return + */ + Set getAllShardNames(); } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ConfigurationImpl.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ConfigurationImpl.java index 37b565d213..1a0a5dd659 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ConfigurationImpl.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ConfigurationImpl.java @@ -23,8 +23,10 @@ import java.io.File; import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; +import java.util.LinkedHashSet; import java.util.List; import java.util.Map; +import java.util.Set; public class ConfigurationImpl implements Configuration { @@ -161,6 +163,16 @@ public class ConfigurationImpl implements Configuration { return Collections.EMPTY_LIST; } + @Override public Set getAllShardNames() { + Set shardNames = new LinkedHashSet<>(); + for(ModuleShard ms : moduleShards){ + for(Shard s : ms.getShards()) { + shardNames.add(s.getName()); + } + } + return shardNames; + } + private void readModules(Config modulesConfig) { diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java index a1858f5f91..e6ddd8fa19 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java @@ -32,9 +32,9 @@ import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier import org.opendaylight.controller.cluster.datastore.identifiers.ShardTransactionIdentifier; import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shard.ShardMBeanFactory; import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shard.ShardStats; +import org.opendaylight.controller.cluster.datastore.messages.CloseTransactionChain; import org.opendaylight.controller.cluster.datastore.messages.CommitTransactionReply; import org.opendaylight.controller.cluster.datastore.messages.CreateTransaction; -import org.opendaylight.controller.cluster.datastore.messages.CreateTransactionChain; import org.opendaylight.controller.cluster.datastore.messages.CreateTransactionChainReply; import org.opendaylight.controller.cluster.datastore.messages.CreateTransactionReply; import org.opendaylight.controller.cluster.datastore.messages.EnableNotification; @@ -61,6 +61,7 @@ import org.opendaylight.controller.protobuff.messages.common.NormalizedNodeMessa import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadTransaction; import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort; import org.opendaylight.controller.sal.core.spi.data.DOMStoreTransactionChain; +import org.opendaylight.controller.sal.core.spi.data.DOMStoreTransactionFactory; import org.opendaylight.controller.sal.core.spi.data.DOMStoreWriteTransaction; import org.opendaylight.yangtools.concepts.ListenerRegistration; import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; @@ -109,11 +110,12 @@ public class Shard extends RaftActor { private final DatastoreContext datastoreContext; - private SchemaContext schemaContext; private ActorRef createSnapshotTransaction; + private final Map transactionChains = new HashMap<>(); + private Shard(ShardIdentifier name, Map peerAddresses, DatastoreContext datastoreContext, SchemaContext schemaContext) { super(name.toString(), mapPeerAddresses(peerAddresses), Optional.of(configParams)); @@ -183,22 +185,19 @@ public class Shard extends RaftActor { LOG.debug("onReceiveCommand: Received message {} from {}", message.getClass().toString(), getSender()); - if (message.getClass() - .equals(CreateTransactionChain.SERIALIZABLE_CLASS)) { - if (isLeader()) { - createTransactionChain(); - } else if (getLeader() != null) { - getLeader().forward(message, getContext()); - } - } else if(message.getClass().equals(ReadDataReply.SERIALIZABLE_CLASS)) { + if(message.getClass().equals(ReadDataReply.SERIALIZABLE_CLASS)) { // This must be for install snapshot. Don't want to open this up and trigger // deSerialization - self().tell(new CaptureSnapshotReply(ReadDataReply.getNormalizedNodeByteString(message)), self()); + self() + .tell(new CaptureSnapshotReply(ReadDataReply.getNormalizedNodeByteString(message)), + self()); // Send a PoisonPill instead of sending close transaction because we do not really need // a response getSender().tell(PoisonPill.getInstance(), self()); + } else if (message.getClass().equals(CloseTransactionChain.SERIALIZABLE_CLASS)){ + closeTransactionChain(CloseTransactionChain.fromSerializable(message)); } else if (message instanceof RegisterChangeListener) { registerChangeListener((RegisterChangeListener) message); } else if (message instanceof UpdateSchemaContext) { @@ -221,9 +220,30 @@ public class Shard extends RaftActor { } } + private void closeTransactionChain(CloseTransactionChain closeTransactionChain) { + DOMStoreTransactionChain chain = + transactionChains.remove(closeTransactionChain.getTransactionChainId()); + + if(chain != null) { + chain.close(); + } + } + private ActorRef createTypedTransactionActor( int transactionType, - ShardTransactionIdentifier transactionId) { + ShardTransactionIdentifier transactionId, + String transactionChainId ) { + + DOMStoreTransactionFactory factory = store; + + if(!transactionChainId.isEmpty()) { + factory = transactionChains.get(transactionChainId); + if(factory == null){ + DOMStoreTransactionChain transactionChain = store.createTransactionChain(); + transactionChains.put(transactionChainId, transactionChain); + factory = transactionChain; + } + } if(this.schemaContext == null){ throw new NullPointerException("schemaContext should not be null"); @@ -235,7 +255,7 @@ public class Shard extends RaftActor { shardMBean.incrementReadOnlyTransactionCount(); return getContext().actorOf( - ShardTransaction.props(store.newReadOnlyTransaction(), getSelf(), + ShardTransaction.props(factory.newReadOnlyTransaction(), getSelf(), schemaContext,datastoreContext, shardMBean), transactionId.toString()); } else if (transactionType @@ -244,7 +264,7 @@ public class Shard extends RaftActor { shardMBean.incrementReadWriteTransactionCount(); return getContext().actorOf( - ShardTransaction.props(store.newReadWriteTransaction(), getSelf(), + ShardTransaction.props(factory.newReadWriteTransaction(), getSelf(), schemaContext, datastoreContext, shardMBean), transactionId.toString()); @@ -254,7 +274,7 @@ public class Shard extends RaftActor { shardMBean.incrementWriteOnlyTransactionCount(); return getContext().actorOf( - ShardTransaction.props(store.newWriteOnlyTransaction(), getSelf(), + ShardTransaction.props(factory.newWriteOnlyTransaction(), getSelf(), schemaContext, datastoreContext, shardMBean), transactionId.toString()); } else { throw new IllegalArgumentException( @@ -265,10 +285,10 @@ public class Shard extends RaftActor { private void createTransaction(CreateTransaction createTransaction) { createTransaction(createTransaction.getTransactionType(), - createTransaction.getTransactionId()); + createTransaction.getTransactionId(), createTransaction.getTransactionChainId()); } - private ActorRef createTransaction(int transactionType, String remoteTransactionId) { + private ActorRef createTransaction(int transactionType, String remoteTransactionId, String transactionChainId) { ShardTransactionIdentifier transactionId = ShardTransactionIdentifier.builder() @@ -276,7 +296,7 @@ public class Shard extends RaftActor { .build(); LOG.debug("Creating transaction : {} ", transactionId); ActorRef transactionActor = - createTypedTransactionActor(transactionType, transactionId); + createTypedTransactionActor(transactionType, transactionId, transactionChainId); getSender() .tell(new CreateTransactionReply( @@ -458,7 +478,7 @@ public class Shard extends RaftActor { // so that this actor does not get block building the snapshot createSnapshotTransaction = createTransaction( TransactionProxy.TransactionType.READ_ONLY.ordinal(), - "createSnapshot"); + "createSnapshot", ""); createSnapshotTransaction.tell( new ReadData(YangInstanceIdentifier.builder().build()).toSerializable(), self()); @@ -499,6 +519,16 @@ public class Shard extends RaftActor { shardMBean.setRaftState(getRaftState().name()); shardMBean.setCurrentTerm(getCurrentTerm()); + + // If this actor is no longer the leader close all the transaction chains + if(!isLeader()){ + for(Map.Entry entry : transactionChains.entrySet()){ + LOG.debug("onStateChanged: Closing transaction chain {} because shard {} is no longer the leader", entry.getKey(), getId()); + entry.getValue().close(); + } + + transactionChains.clear(); + } } @Override public String persistenceId() { diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionChainProxy.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionChainProxy.java index 9b4610a99c..b74c89d727 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionChainProxy.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionChainProxy.java @@ -8,43 +8,72 @@ package org.opendaylight.controller.cluster.datastore; +import akka.actor.ActorPath; +import akka.dispatch.Futures; +import org.opendaylight.controller.cluster.datastore.messages.CloseTransactionChain; import org.opendaylight.controller.cluster.datastore.utils.ActorContext; import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadTransaction; import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadWriteTransaction; import org.opendaylight.controller.sal.core.spi.data.DOMStoreTransactionChain; import org.opendaylight.controller.sal.core.spi.data.DOMStoreWriteTransaction; +import scala.concurrent.Await; +import scala.concurrent.Future; + +import java.util.Collections; +import java.util.List; /** * TransactionChainProxy acts as a proxy for a DOMStoreTransactionChain created on a remote shard */ public class TransactionChainProxy implements DOMStoreTransactionChain{ private final ActorContext actorContext; + private final String transactionChainId; + private volatile List> cohortPathFutures = Collections.emptyList(); public TransactionChainProxy(ActorContext actorContext) { this.actorContext = actorContext; + transactionChainId = actorContext.getCurrentMemberName() + "-" + System.currentTimeMillis(); } @Override public DOMStoreReadTransaction newReadOnlyTransaction() { return new TransactionProxy(actorContext, - TransactionProxy.TransactionType.READ_ONLY); + TransactionProxy.TransactionType.READ_ONLY, this); } @Override public DOMStoreReadWriteTransaction newReadWriteTransaction() { return new TransactionProxy(actorContext, - TransactionProxy.TransactionType.READ_WRITE); + TransactionProxy.TransactionType.READ_WRITE, this); } @Override public DOMStoreWriteTransaction newWriteOnlyTransaction() { return new TransactionProxy(actorContext, - TransactionProxy.TransactionType.WRITE_ONLY); + TransactionProxy.TransactionType.WRITE_ONLY, this); } @Override public void close() { - // FIXME : The problem here is don't know which shard the transaction chain is to be created on ??? - throw new UnsupportedOperationException("close - not sure what to do here?"); + // Send a close transaction chain request to each and every shard + actorContext.broadcast(new CloseTransactionChain(transactionChainId)); + } + + public String getTransactionChainId() { + return transactionChainId; + } + + public void onTransactionReady(List> cohortPathFutures){ + this.cohortPathFutures = cohortPathFutures; + } + + public void waitTillCurrentTransactionReady(){ + try { + Await.result(Futures + .sequence(this.cohortPathFutures, actorContext.getActorSystem().dispatcher()), + actorContext.getOperationDuration()); + } catch (Exception e) { + throw new IllegalStateException("Failed when waiting for transaction on a chain to become ready", e); + } } } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionProxy.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionProxy.java index a8b20c030e..97a9ff0bf3 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionProxy.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionProxy.java @@ -71,6 +71,11 @@ import java.util.concurrent.atomic.AtomicLong; *

*/ public class TransactionProxy implements DOMStoreReadWriteTransaction { + + private final TransactionChainProxy transactionChainProxy; + + + public enum TransactionType { READ_ONLY, WRITE_ONLY, @@ -177,12 +182,27 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction { private boolean inReadyState; public TransactionProxy(ActorContext actorContext, TransactionType transactionType) { + this(actorContext, transactionType, null); + } + + @VisibleForTesting + List> getRecordedOperationFutures() { + List> recordedOperationFutures = Lists.newArrayList(); + for(TransactionContext transactionContext : remoteTransactionPaths.values()) { + recordedOperationFutures.addAll(transactionContext.getRecordedOperationFutures()); + } + + return recordedOperationFutures; + } + + public TransactionProxy(ActorContext actorContext, TransactionType transactionType, TransactionChainProxy transactionChainProxy) { this.actorContext = Preconditions.checkNotNull(actorContext, - "actorContext should not be null"); + "actorContext should not be null"); this.transactionType = Preconditions.checkNotNull(transactionType, - "transactionType should not be null"); + "transactionType should not be null"); this.schemaContext = Preconditions.checkNotNull(actorContext.getSchemaContext(), - "schemaContext should not be null"); + "schemaContext should not be null"); + this.transactionChainProxy = transactionChainProxy; String memberName = actorContext.getCurrentMemberName(); if(memberName == null){ @@ -190,7 +210,7 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction { } this.identifier = TransactionIdentifier.builder().memberName(memberName).counter( - counter.getAndIncrement()).build(); + counter.getAndIncrement()).build(); if(transactionType == TransactionType.READ_ONLY) { // Read-only Tx's aren't explicitly closed by the client so we create a PhantomReference @@ -201,23 +221,13 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction { remoteTransactionActorsMB = new AtomicBoolean(); TransactionProxyCleanupPhantomReference cleanup = - new TransactionProxyCleanupPhantomReference(this); + new TransactionProxyCleanupPhantomReference(this); phantomReferenceCache.put(cleanup, cleanup); } LOG.debug("Created txn {} of type {}", identifier, transactionType); } - @VisibleForTesting - List> getRecordedOperationFutures() { - List> recordedOperationFutures = Lists.newArrayList(); - for(TransactionContext transactionContext : remoteTransactionPaths.values()) { - recordedOperationFutures.addAll(transactionContext.getRecordedOperationFutures()); - } - - return recordedOperationFutures; - } - @Override public CheckedFuture>, ReadFailedException> read( final YangInstanceIdentifier path) { @@ -308,6 +318,10 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction { cohortPathFutures.add(transactionContext.readyTransaction()); } + if(transactionChainProxy != null){ + transactionChainProxy.onTransactionReady(cohortPathFutures); + } + return new ThreePhaseCommitCohortProxy(actorContext, cohortPathFutures, identifier.toString()); } @@ -340,20 +354,27 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction { return ShardStrategyFactory.getStrategy(path).findShard(path); } - private void createTransactionIfMissing(ActorContext actorContext, YangInstanceIdentifier path) { + private void createTransactionIfMissing(ActorContext actorContext, + YangInstanceIdentifier path) { + + if(transactionChainProxy != null){ + transactionChainProxy.waitTillCurrentTransactionReady(); + } + String shardName = ShardStrategyFactory.getStrategy(path).findShard(path); TransactionContext transactionContext = remoteTransactionPaths.get(shardName); - if(transactionContext != null){ + if (transactionContext != null) { // A transaction already exists with that shard return; } try { Object response = actorContext.executeShardOperation(shardName, - new CreateTransaction(identifier.toString(),this.transactionType.ordinal() ).toSerializable()); + new CreateTransaction(identifier.toString(), this.transactionType.ordinal(), + getTransactionChainId()).toSerializable()); if (response.getClass().equals(CreateTransactionReply.SERIALIZABLE_CLASS)) { CreateTransactionReply reply = CreateTransactionReply.fromSerializable(response); @@ -364,7 +385,7 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction { ActorSelection transactionActor = actorContext.actorSelection(transactionPath); - if(transactionType == TransactionType.READ_ONLY) { + if (transactionType == TransactionType.READ_ONLY) { // Add the actor to the remoteTransactionActors list for access by the // cleanup PhantonReference. remoteTransactionActors.add(transactionActor); @@ -375,19 +396,28 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction { } transactionContext = new TransactionContextImpl(shardName, transactionPath, - transactionActor, identifier, actorContext, schemaContext); + transactionActor, identifier, actorContext, schemaContext); remoteTransactionPaths.put(shardName, transactionContext); } else { throw new IllegalArgumentException(String.format( - "Invalid reply type {} for CreateTransaction", response.getClass())); + "Invalid reply type {} for CreateTransaction", response.getClass())); } - } catch(Exception e){ + } catch (Exception e) { LOG.debug("Tx {} Creating NoOpTransaction because of : {}", identifier, e.getMessage()); - remoteTransactionPaths.put(shardName, new NoOpTransactionContext(shardName, e, identifier)); + remoteTransactionPaths + .put(shardName, new NoOpTransactionContext(shardName, e, identifier)); } } + public String getTransactionChainId() { + if(transactionChainProxy == null){ + return ""; + } + return transactionChainProxy.getTransactionChainId(); + } + + private interface TransactionContext { String getShardName(); diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/CloseTransactionChain.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/CloseTransactionChain.java index efa51fde20..74de6c5aea 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/CloseTransactionChain.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/CloseTransactionChain.java @@ -10,10 +10,29 @@ package org.opendaylight.controller.cluster.datastore.messages; import org.opendaylight.controller.protobuff.messages.transaction.ShardTransactionChainMessages; -public class CloseTransactionChain implements SerializableMessage{ - public static final Class SERIALIZABLE_CLASS = ShardTransactionChainMessages.CloseTransactionChain.class; - @Override - public Object toSerializable() { - return ShardTransactionChainMessages.CloseTransactionChain.newBuilder().build(); - } +public class CloseTransactionChain implements SerializableMessage { + public static final Class SERIALIZABLE_CLASS = + ShardTransactionChainMessages.CloseTransactionChain.class; + private final String transactionChainId; + + public CloseTransactionChain(String transactionChainId){ + this.transactionChainId = transactionChainId; + } + + @Override + public Object toSerializable() { + return ShardTransactionChainMessages.CloseTransactionChain.newBuilder() + .setTransactionChainId(transactionChainId).build(); + } + + public static CloseTransactionChain fromSerializable(Object message){ + ShardTransactionChainMessages.CloseTransactionChain closeTransactionChain + = (ShardTransactionChainMessages.CloseTransactionChain) message; + + return new CloseTransactionChain(closeTransactionChain.getTransactionChainId()); + } + + public String getTransactionChainId() { + return transactionChainId; + } } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/CreateTransaction.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/CreateTransaction.java index d5c9e21611..361d406ac8 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/CreateTransaction.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/CreateTransaction.java @@ -13,30 +13,48 @@ import org.opendaylight.controller.protobuff.messages.transaction.ShardTransacti public class CreateTransaction implements SerializableMessage { - public static final Class SERIALIZABLE_CLASS = ShardTransactionMessages.CreateTransaction.class; - private final String transactionId; - private final int transactionType; - - public CreateTransaction(String transactionId, int transactionType){ - - this.transactionId = transactionId; - this.transactionType = transactionType; - } - - public String getTransactionId() { - return transactionId; - } - - public int getTransactionType() { return transactionType;} - - @Override - public Object toSerializable() { - return ShardTransactionMessages.CreateTransaction.newBuilder().setTransactionId(transactionId).setTransactionType(transactionType).build(); - } - - public static CreateTransaction fromSerializable(Object message){ - ShardTransactionMessages.CreateTransaction createTransaction = (ShardTransactionMessages.CreateTransaction)message; - return new CreateTransaction(createTransaction.getTransactionId(),createTransaction.getTransactionType() ); - } - + public static final Class SERIALIZABLE_CLASS = ShardTransactionMessages.CreateTransaction.class; + private final String transactionId; + private final int transactionType; + private final String transactionChainId; + + public CreateTransaction(String transactionId, int transactionType) { + this(transactionId, transactionType, ""); + } + + public CreateTransaction(String transactionId, int transactionType, String transactionChainId) { + + this.transactionId = transactionId; + this.transactionType = transactionType; + this.transactionChainId = transactionChainId; + + } + + + public String getTransactionId() { + return transactionId; + } + + public int getTransactionType() { + return transactionType; + } + + @Override + public Object toSerializable() { + return ShardTransactionMessages.CreateTransaction.newBuilder() + .setTransactionId(transactionId) + .setTransactionType(transactionType) + .setTransactionChainId(transactionChainId).build(); + } + + public static CreateTransaction fromSerializable(Object message) { + ShardTransactionMessages.CreateTransaction createTransaction = + (ShardTransactionMessages.CreateTransaction) message; + return new CreateTransaction(createTransaction.getTransactionId(), + createTransaction.getTransactionType(), createTransaction.getTransactionChainId()); + } + + public String getTransactionChainId() { + return transactionChainId; + } } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/utils/ActorContext.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/utils/ActorContext.java index b87dc4f608..c989b275df 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/utils/ActorContext.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/utils/ActorContext.java @@ -176,7 +176,8 @@ public class ActorContext { */ public Object executeRemoteOperation(ActorSelection actor, Object message) { - LOG.debug("Sending remote message {} to {}", message.getClass().toString(), actor.toString()); + LOG.debug("Sending remote message {} to {}", message.getClass().toString(), + actor.toString()); Future future = ask(actor, message, operationTimeout); @@ -213,6 +214,13 @@ public class ActorContext { actor.tell(message, ActorRef.noSender()); } + public void sendShardOperationAsync(String shardName, Object message) { + ActorSelection primary = findPrimary(shardName); + + primary.tell(message, ActorRef.noSender()); + } + + /** * Execute an operation on the primary for a given shard *

@@ -295,4 +303,22 @@ public class ActorContext { return clusterWrapper.getCurrentMemberName(); } + /** + * Send the message to each and every shard + * + * @param message + */ + public void broadcast(Object message){ + for(String shardName : configuration.getAllShardNames()){ + try { + sendShardOperationAsync(shardName, message); + } catch(Exception e){ + LOG.warn("broadcast failed to send message " + message.getClass().getSimpleName() + " to shard " + shardName, e); + } + } + } + + public FiniteDuration getOperationDuration() { + return operationDuration; + } } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/BasicIntegrationTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/BasicIntegrationTest.java index 50367e66ce..7b826302f5 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/BasicIntegrationTest.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/BasicIntegrationTest.java @@ -14,13 +14,10 @@ import akka.actor.ActorSelection; import akka.actor.Props; import akka.event.Logging; import akka.testkit.JavaTestKit; - import org.junit.Test; import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier; import org.opendaylight.controller.cluster.datastore.messages.CommitTransaction; import org.opendaylight.controller.cluster.datastore.messages.CreateTransaction; -import org.opendaylight.controller.cluster.datastore.messages.CreateTransactionChain; -import org.opendaylight.controller.cluster.datastore.messages.CreateTransactionChainReply; import org.opendaylight.controller.cluster.datastore.messages.CreateTransactionReply; import org.opendaylight.controller.cluster.datastore.messages.PreCommitTransaction; import org.opendaylight.controller.cluster.datastore.messages.PreCommitTransactionReply; @@ -32,7 +29,6 @@ import org.opendaylight.controller.cluster.datastore.messages.WriteDataReply; import org.opendaylight.controller.md.cluster.datastore.model.TestModel; import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes; import org.opendaylight.yangtools.yang.model.api.SchemaContext; - import scala.concurrent.Await; import scala.concurrent.Future; import scala.concurrent.duration.FiniteDuration; @@ -87,31 +83,8 @@ public class BasicIntegrationTest extends AbstractActorTest { assertEquals(true, result); - // 1. Create a TransactionChain - shard.tell(new CreateTransactionChain().toSerializable(), getRef()); - - final ActorSelection transactionChain = - new ExpectMsg(duration("3 seconds"), "CreateTransactionChainReply") { - @Override - protected ActorSelection match(Object in) { - if (in.getClass().equals(CreateTransactionChainReply.SERIALIZABLE_CLASS)) { - ActorPath transactionChainPath = - CreateTransactionChainReply.fromSerializable(getSystem(),in) - .getTransactionChainPath(); - return getSystem() - .actorSelection(transactionChainPath); - } else { - throw noMatch(); - } - } - }.get(); // this extracts the received message - - assertNotNull(transactionChain); - - System.out.println("Successfully created transaction chain"); - - // 2. Create a Transaction on the TransactionChain - transactionChain.tell(new CreateTransaction("txn-1", TransactionProxy.TransactionType.WRITE_ONLY.ordinal() ).toSerializable(), getRef()); + // Create a transaction on the shard + shard.tell(new CreateTransaction("txn-1", TransactionProxy.TransactionType.WRITE_ONLY.ordinal() ).toSerializable(), getRef()); final ActorSelection transaction = new ExpectMsg(duration("3 seconds"), "CreateTransactionReply") { diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ConfigurationImplTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ConfigurationImplTest.java index 17329611b0..8c253596b8 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ConfigurationImplTest.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ConfigurationImplTest.java @@ -7,6 +7,7 @@ import org.junit.Test; import java.io.File; import java.util.List; +import java.util.Set; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; @@ -83,4 +84,15 @@ public class ConfigurationImplTest { File f = new File("./module-shards.conf"); ConfigFactory.parseFile(f); } + + @Test + public void testGetAllShardNames(){ + Set allShardNames = configuration.getAllShardNames(); + + assertEquals(4, allShardNames.size()); + assertTrue(allShardNames.contains("default")); + assertTrue(allShardNames.contains("people-1")); + assertTrue(allShardNames.contains("cars-1")); + assertTrue(allShardNames.contains("test-1")); + } } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DistributedDataStoreIntegrationTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DistributedDataStoreIntegrationTest.java index 8a7b50d20c..ec8aee2b09 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DistributedDataStoreIntegrationTest.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DistributedDataStoreIntegrationTest.java @@ -21,6 +21,7 @@ import org.opendaylight.controller.md.cluster.datastore.model.SchemaContextHelpe import org.opendaylight.controller.md.cluster.datastore.model.TestModel; import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadWriteTransaction; import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort; +import org.opendaylight.controller.sal.core.spi.data.DOMStoreTransactionChain; import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode; import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes; @@ -144,6 +145,94 @@ public class DistributedDataStoreIntegrationTest { } + @Test + public void transactionChainIntegrationTest() throws Exception { + final Configuration configuration = new ConfigurationImpl("module-shards.conf", "modules.conf"); + ShardStrategyFactory.setConfiguration(configuration); + + + + new JavaTestKit(getSystem()) { + { + + new Within(duration("10 seconds")) { + @Override + protected void run() { + try { + final DistributedDataStore distributedDataStore = + new DistributedDataStore(getSystem(), "config", + new MockClusterWrapper(), configuration, + new DatastoreContext()); + + distributedDataStore.onGlobalContextUpdated(TestModel.createTestContext()); + + // Wait for a specific log message to show up + final boolean result = + new JavaTestKit.EventFilter(Logging.Info.class + ) { + @Override + protected Boolean run() { + return true; + } + }.from("akka://test/user/shardmanager-config/member-1-shard-test-1-config") + .message("Switching from state Candidate to Leader") + .occurrences(1).exec(); + + assertEquals(true, result); + + DOMStoreTransactionChain transactionChain = + distributedDataStore.createTransactionChain(); + + DOMStoreReadWriteTransaction transaction = + transactionChain.newReadWriteTransaction(); + + transaction + .write(TestModel.TEST_PATH, ImmutableNodes + .containerNode(TestModel.TEST_QNAME)); + + ListenableFuture>> + future = + transaction.read(TestModel.TEST_PATH); + + Optional> optional = + future.get(); + + Assert.assertTrue("Node not found", optional.isPresent()); + + NormalizedNode normalizedNode = + optional.get(); + + assertEquals(TestModel.TEST_QNAME, + normalizedNode.getNodeType()); + + DOMStoreThreePhaseCommitCohort ready = + transaction.ready(); + + ListenableFuture canCommit = + ready.canCommit(); + + assertTrue(canCommit.get(5, TimeUnit.SECONDS)); + + ListenableFuture preCommit = + ready.preCommit(); + + preCommit.get(5, TimeUnit.SECONDS); + + ListenableFuture commit = ready.commit(); + + commit.get(5, TimeUnit.SECONDS); + + transactionChain.close(); + } catch (ExecutionException | TimeoutException | InterruptedException e){ + fail(e.getMessage()); + } + } + }; + } + }; + + } + //FIXME : Disabling test because it's flaky //@Test diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTest.java index 766dcb7268..06bcac8d78 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTest.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTest.java @@ -14,8 +14,6 @@ import org.junit.Assert; import org.junit.Test; import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier; import org.opendaylight.controller.cluster.datastore.messages.CreateTransaction; -import org.opendaylight.controller.cluster.datastore.messages.CreateTransactionChain; -import org.opendaylight.controller.cluster.datastore.messages.CreateTransactionChainReply; import org.opendaylight.controller.cluster.datastore.messages.EnableNotification; import org.opendaylight.controller.cluster.datastore.messages.PeerAddressResolved; import org.opendaylight.controller.cluster.datastore.messages.RegisterChangeListener; @@ -53,65 +51,6 @@ public class ShardTest extends AbstractActorTest { private static final DatastoreContext DATA_STORE_CONTEXT = new DatastoreContext(); - @Test - public void testOnReceiveCreateTransactionChain() throws Exception { - new JavaTestKit(getSystem()) {{ - final ShardIdentifier identifier = - ShardIdentifier.builder().memberName("member-1") - .shardName("inventory").type("config").build(); - - final Props props = Shard.props(identifier, Collections.EMPTY_MAP, DATA_STORE_CONTEXT, TestModel.createTestContext()); - final ActorRef subject = - getSystem().actorOf(props, "testCreateTransactionChain"); - - - // Wait for a specific log message to show up - final boolean result = - new JavaTestKit.EventFilter(Logging.Info.class - ) { - @Override - protected Boolean run() { - return true; - } - }.from(subject.path().toString()) - .message("Switching from state Candidate to Leader") - .occurrences(1).exec(); - - Assert.assertEquals(true, result); - - new Within(duration("3 seconds")) { - @Override - protected void run() { - - subject.tell(new CreateTransactionChain().toSerializable(), getRef()); - - final String out = new ExpectMsg(duration("3 seconds"), "match hint") { - // do not put code outside this method, will run afterwards - @Override - protected String match(Object in) { - if (in.getClass().equals(CreateTransactionChainReply.SERIALIZABLE_CLASS)){ - CreateTransactionChainReply reply = - CreateTransactionChainReply.fromSerializable(getSystem(),in); - return reply.getTransactionChainPath() - .toString(); - } else { - throw noMatch(); - } - } - }.get(); // this extracts the received message - - assertEquals("Unexpected transaction path " + out, - "akka://test/user/testCreateTransactionChain/$a", - out); - - expectNoMsg(); - } - - - }; - }}; - } - @Test public void testOnReceiveRegisterListener() throws Exception { new JavaTestKit(getSystem()) {{ @@ -233,6 +172,65 @@ public class ShardTest extends AbstractActorTest { }}; } + @Test + public void testCreateTransactionOnChain(){ + new JavaTestKit(getSystem()) {{ + final ShardIdentifier identifier = + ShardIdentifier.builder().memberName("member-1") + .shardName("inventory").type("config").build(); + + final Props props = Shard.props(identifier, Collections.EMPTY_MAP, DATA_STORE_CONTEXT, TestModel.createTestContext()); + final ActorRef subject = + getSystem().actorOf(props, "testCreateTransactionOnChain"); + + // Wait for a specific log message to show up + final boolean result = + new JavaTestKit.EventFilter(Logging.Info.class + ) { + @Override + protected Boolean run() { + return true; + } + }.from(subject.path().toString()) + .message("Switching from state Candidate to Leader") + .occurrences(1).exec(); + + Assert.assertEquals(true, result); + + new Within(duration("3 seconds")) { + @Override + protected void run() { + + subject.tell( + new UpdateSchemaContext(TestModel.createTestContext()), + getRef()); + + subject.tell(new CreateTransaction("txn-1", TransactionProxy.TransactionType.READ_ONLY.ordinal() , "foobar").toSerializable(), + getRef()); + + final String out = new ExpectMsg(duration("3 seconds"), "match hint") { + // do not put code outside this method, will run afterwards + @Override + protected String match(Object in) { + if (in instanceof CreateTransactionReply) { + CreateTransactionReply reply = + (CreateTransactionReply) in; + return reply.getTransactionActorPath() + .toString(); + } else { + throw noMatch(); + } + } + }.get(); // this extracts the received message + + assertTrue("Unexpected transaction path " + out, + out.contains("akka://test/user/testCreateTransactionOnChain/shard-txn-1")); + expectNoMsg(); + } + }; + }}; + } + @Test public void testPeerAddressResolved(){ new JavaTestKit(getSystem()) {{ diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTransactionChainTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTransactionChainTest.java deleted file mode 100644 index c5968c358f..0000000000 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTransactionChainTest.java +++ /dev/null @@ -1,115 +0,0 @@ -package org.opendaylight.controller.cluster.datastore; - -import akka.actor.ActorRef; -import akka.actor.Props; -import akka.testkit.JavaTestKit; - -import com.google.common.util.concurrent.ListeningExecutorService; -import com.google.common.util.concurrent.MoreExecutors; - -import org.junit.BeforeClass; -import org.junit.Test; -import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shard.ShardStats; -import org.opendaylight.controller.cluster.datastore.messages.CloseTransactionChain; -import org.opendaylight.controller.cluster.datastore.messages.CloseTransactionChainReply; -import org.opendaylight.controller.cluster.datastore.messages.CreateTransaction; -import org.opendaylight.controller.cluster.datastore.messages.CreateTransactionReply; -import org.opendaylight.controller.md.cluster.datastore.model.TestModel; -import org.opendaylight.controller.md.sal.dom.store.impl.InMemoryDOMDataStore; -import org.opendaylight.yangtools.yang.model.api.SchemaContext; - -import static org.junit.Assert.assertEquals; - -public class ShardTransactionChainTest extends AbstractActorTest { - - private static ListeningExecutorService storeExecutor = MoreExecutors.listeningDecorator(MoreExecutors.sameThreadExecutor()); - - private static final InMemoryDOMDataStore store = new InMemoryDOMDataStore("OPER", storeExecutor, - MoreExecutors.sameThreadExecutor()); - - private static final SchemaContext testSchemaContext = TestModel.createTestContext(); - - private static final DatastoreContext DATA_STORE_CONTEXT = new DatastoreContext(); - - private static final String mockShardName = "mockShardName"; - - private final ShardStats shardStats = new ShardStats(mockShardName, "DataStore"); - - @BeforeClass - public static void staticSetup() { - store.onGlobalContextUpdated(testSchemaContext); - } - - @Test - public void testOnReceiveCreateTransaction() throws Exception { - new JavaTestKit(getSystem()) {{ - final Props props = ShardTransactionChain.props(store.createTransactionChain(), - testSchemaContext, DATA_STORE_CONTEXT, shardStats); - final ActorRef subject = getSystem().actorOf(props, "testCreateTransaction"); - - new Within(duration("1 seconds")) { - @Override - protected void run() { - - subject.tell(new CreateTransaction("txn-1", TransactionProxy.TransactionType.READ_ONLY.ordinal() ).toSerializable(), getRef()); - - final String out = new ExpectMsg(duration("1 seconds"), "match hint") { - // do not put code outside this method, will run afterwards - @Override - protected String match(Object in) { - if (in.getClass().equals(CreateTransactionReply.SERIALIZABLE_CLASS)) { - return CreateTransactionReply.fromSerializable(in).getTransactionPath(); - }else{ - throw noMatch(); - } - } - }.get(); // this extracts the received message - - assertEquals("Unexpected transaction path " + out, - "akka://test/user/testCreateTransaction/shard-txn-1", - out); - - // Will wait for the rest of the 3 seconds - expectNoMsg(); - } - - - }; - }}; - } - - @Test - public void testOnReceiveCloseTransactionChain() throws Exception { - new JavaTestKit(getSystem()) {{ - final Props props = ShardTransactionChain.props(store.createTransactionChain(), - testSchemaContext, DATA_STORE_CONTEXT, shardStats ); - final ActorRef subject = getSystem().actorOf(props, "testCloseTransactionChain"); - - new Within(duration("1 seconds")) { - @Override - protected void run() { - - subject.tell(new CloseTransactionChain().toSerializable(), getRef()); - - final String out = new ExpectMsg(duration("1 seconds"), "match hint") { - // do not put code outside this method, will run afterwards - @Override - protected String match(Object in) { - if (in.getClass().equals(CloseTransactionChainReply.SERIALIZABLE_CLASS)) { - return "match"; - } else { - throw noMatch(); - } - } - }.get(); // this extracts the received message - - assertEquals("match", out); - // Will wait for the rest of the 3 seconds - expectNoMsg(); - } - - - }; - }}; - } -} diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/TransactionChainProxyTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/TransactionChainProxyTest.java index 93145bdd6d..4cca1bf9ad 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/TransactionChainProxyTest.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/TransactionChainProxyTest.java @@ -10,12 +10,9 @@ package org.opendaylight.controller.cluster.datastore; -import static org.mockito.Mockito.doReturn; - import org.junit.Assert; import org.junit.Before; import org.junit.Test; -import org.mockito.Mockito; import org.opendaylight.controller.cluster.datastore.utils.ActorContext; import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadTransaction; import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadWriteTransaction; @@ -23,9 +20,15 @@ import org.opendaylight.controller.sal.core.spi.data.DOMStoreTransaction; import org.opendaylight.controller.sal.core.spi.data.DOMStoreWriteTransaction; import org.opendaylight.yangtools.yang.model.api.SchemaContext; +import static org.mockito.Matchers.anyObject; +import static org.mockito.Mockito.doReturn; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; + public class TransactionChainProxyTest { - ActorContext actorContext = Mockito.mock(ActorContext.class); - SchemaContext schemaContext = Mockito.mock(SchemaContext.class); + ActorContext actorContext = mock(ActorContext.class); + SchemaContext schemaContext = mock(SchemaContext.class); @Before public void setUp() { @@ -57,8 +60,12 @@ public class TransactionChainProxyTest { } - @Test(expected=UnsupportedOperationException.class) + @Test public void testClose() throws Exception { - new TransactionChainProxy(actorContext).close(); + ActorContext context = mock(ActorContext.class); + + new TransactionChainProxy(context).close(); + + verify(context, times(1)).broadcast(anyObject()); } } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/utils/MockConfiguration.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/utils/MockConfiguration.java index 8d49c6fac3..06c5767bd0 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/utils/MockConfiguration.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/utils/MockConfiguration.java @@ -16,6 +16,7 @@ import java.util.Arrays; import java.util.Collections; import java.util.List; import java.util.Map; +import java.util.Set; public class MockConfiguration implements Configuration{ @Override public List getMemberShardNames(String memberName) { @@ -46,4 +47,8 @@ public class MockConfiguration implements Configuration{ return Collections.EMPTY_LIST; } + + @Override public Set getAllShardNames() { + return Collections.emptySet(); + } }