From: Tony Tkacik Date: Fri, 7 Nov 2014 10:00:48 +0000 (+0000) Subject: Merge "Fix warnings in neconf-util" X-Git-Tag: release/lithium~876^2~27 X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=commitdiff_plain;h=c8e8271331ce7d3d8352f02e488ab767cbdd0631;hp=96fbbc3abbb929109b4026f0ff0bcdcc1ce8a4c2 Merge "Fix warnings in neconf-util" --- diff --git a/opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/datastore/node/utils/serialization/ValueSerializer.java b/opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/datastore/node/utils/serialization/ValueSerializer.java index 6a843f57c7..aac45e18b5 100644 --- a/opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/datastore/node/utils/serialization/ValueSerializer.java +++ b/opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/datastore/node/utils/serialization/ValueSerializer.java @@ -8,6 +8,7 @@ package org.opendaylight.controller.cluster.datastore.node.utils.serialization; +import com.google.protobuf.ByteString; import org.opendaylight.controller.cluster.datastore.node.utils.QNameFactory; import org.opendaylight.controller.cluster.datastore.util.InstanceIdentifierUtils; import org.opendaylight.controller.protobuff.messages.common.NormalizedNodeMessages; @@ -28,16 +29,18 @@ public class ValueSerializer { InstanceIdentifierUtils.toSerializable((YangInstanceIdentifier) value, context)); } else if(value instanceof Set) { Set set = (Set) value; - if(!set.isEmpty()){ - for(Object o : set){ - if(o instanceof String){ + if (!set.isEmpty()) { + for (Object o : set) { + if (o instanceof String) { builder.addBitsValue(o.toString()); } else { throw new IllegalArgumentException("Expected value type to be Bits but was : " + - value.toString()); + value.toString()); } } } + } else if(value instanceof byte[]){ + builder.setBytesValue(ByteString.copyFrom((byte[]) value)); } else { builder.setValue(value.toString()); } @@ -57,6 +60,8 @@ public class ValueSerializer { node.getInstanceIdentifierValue(), context); } else if(node.getIntValueType() == ValueType.BITS_TYPE.ordinal()){ return new HashSet(node.getBitsValueList()); + } else if(node.getIntValueType() == ValueType.BINARY_TYPE.ordinal()){ + return node.getBytesValue().toByteArray(); } return deSerializeBasicTypes(node.getIntValueType(), node.getValue()); } diff --git a/opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/datastore/node/utils/serialization/ValueType.java b/opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/datastore/node/utils/serialization/ValueType.java index 49db8967a6..6c884734e2 100644 --- a/opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/datastore/node/utils/serialization/ValueType.java +++ b/opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/datastore/node/utils/serialization/ValueType.java @@ -29,7 +29,8 @@ public enum ValueType { YANG_IDENTIFIER_TYPE, STRING_TYPE, BIG_INTEGER_TYPE, - BIG_DECIMAL_TYPE; + BIG_DECIMAL_TYPE, + BINARY_TYPE; private static Map types = new HashMap<>(); @@ -45,6 +46,7 @@ public enum ValueType { types.put(Short.class,SHORT_TYPE); types.put(BigInteger.class, BIG_INTEGER_TYPE); types.put(BigDecimal.class, BIG_DECIMAL_TYPE); + types.put(byte[].class, BINARY_TYPE); } public static final ValueType getSerializableType(Object node){ diff --git a/opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/protobuff/messages/common/NormalizedNodeMessages.java b/opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/protobuff/messages/common/NormalizedNodeMessages.java index e7f2c361ae..8285009d35 100644 --- a/opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/protobuff/messages/common/NormalizedNodeMessages.java +++ b/opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/protobuff/messages/common/NormalizedNodeMessages.java @@ -5410,6 +5410,16 @@ public final class NormalizedNodeMessages { */ com.google.protobuf.ByteString getCodeBytes(int index); + + // optional bytes bytesValue = 13; + /** + * optional bytes bytesValue = 13; + */ + boolean hasBytesValue(); + /** + * optional bytes bytesValue = 13; + */ + com.google.protobuf.ByteString getBytesValue(); } /** * Protobuf type {@code org.opendaylight.controller.mdsal.Node} @@ -5550,6 +5560,11 @@ public final class NormalizedNodeMessages { code_.add(input.readBytes()); break; } + case 106: { + bitField0_ |= 0x00000100; + bytesValue_ = input.readBytes(); + break; + } } } } catch (com.google.protobuf.InvalidProtocolBufferException e) { @@ -6070,6 +6085,22 @@ public final class NormalizedNodeMessages { return code_.getByteString(index); } + // optional bytes bytesValue = 13; + public static final int BYTESVALUE_FIELD_NUMBER = 13; + private com.google.protobuf.ByteString bytesValue_; + /** + * optional bytes bytesValue = 13; + */ + public boolean hasBytesValue() { + return ((bitField0_ & 0x00000100) == 0x00000100); + } + /** + * optional bytes bytesValue = 13; + */ + public com.google.protobuf.ByteString getBytesValue() { + return bytesValue_; + } + private void initFields() { path_ = ""; type_ = ""; @@ -6083,6 +6114,7 @@ public final class NormalizedNodeMessages { instanceIdentifierValue_ = org.opendaylight.controller.protobuff.messages.common.NormalizedNodeMessages.InstanceIdentifier.getDefaultInstance(); bitsValue_ = com.google.protobuf.LazyStringArrayList.EMPTY; code_ = com.google.protobuf.LazyStringArrayList.EMPTY; + bytesValue_ = com.google.protobuf.ByteString.EMPTY; } private byte memoizedIsInitialized = -1; public final boolean isInitialized() { @@ -6156,6 +6188,9 @@ public final class NormalizedNodeMessages { for (int i = 0; i < code_.size(); i++) { output.writeBytes(12, code_.getByteString(i)); } + if (((bitField0_ & 0x00000100) == 0x00000100)) { + output.writeBytes(13, bytesValue_); + } getUnknownFields().writeTo(output); } @@ -6223,6 +6258,10 @@ public final class NormalizedNodeMessages { size += dataSize; size += 1 * getCodeList().size(); } + if (((bitField0_ & 0x00000100) == 0x00000100)) { + size += com.google.protobuf.CodedOutputStream + .computeBytesSize(13, bytesValue_); + } size += getUnknownFields().getSerializedSize(); memoizedSerializedSize = size; return size; @@ -6383,6 +6422,8 @@ public final class NormalizedNodeMessages { bitField0_ = (bitField0_ & ~0x00000400); code_ = com.google.protobuf.LazyStringArrayList.EMPTY; bitField0_ = (bitField0_ & ~0x00000800); + bytesValue_ = com.google.protobuf.ByteString.EMPTY; + bitField0_ = (bitField0_ & ~0x00001000); return this; } @@ -6481,6 +6522,10 @@ public final class NormalizedNodeMessages { bitField0_ = (bitField0_ & ~0x00000800); } result.code_ = code_; + if (((from_bitField0_ & 0x00001000) == 0x00001000)) { + to_bitField0_ |= 0x00000100; + } + result.bytesValue_ = bytesValue_; result.bitField0_ = to_bitField0_; onBuilt(); return result; @@ -6601,6 +6646,9 @@ public final class NormalizedNodeMessages { } onChanged(); } + if (other.hasBytesValue()) { + setBytesValue(other.getBytesValue()); + } this.mergeUnknownFields(other.getUnknownFields()); return this; } @@ -8110,6 +8158,42 @@ public final class NormalizedNodeMessages { return this; } + // optional bytes bytesValue = 13; + private com.google.protobuf.ByteString bytesValue_ = com.google.protobuf.ByteString.EMPTY; + /** + * optional bytes bytesValue = 13; + */ + public boolean hasBytesValue() { + return ((bitField0_ & 0x00001000) == 0x00001000); + } + /** + * optional bytes bytesValue = 13; + */ + public com.google.protobuf.ByteString getBytesValue() { + return bytesValue_; + } + /** + * optional bytes bytesValue = 13; + */ + public Builder setBytesValue(com.google.protobuf.ByteString value) { + if (value == null) { + throw new NullPointerException(); + } + bitField0_ |= 0x00001000; + bytesValue_ = value; + onChanged(); + return this; + } + /** + * optional bytes bytesValue = 13; + */ + public Builder clearBytesValue() { + bitField0_ = (bitField0_ & ~0x00001000); + bytesValue_ = getDefaultInstance().getBytesValue(); + onChanged(); + return this; + } + // @@protoc_insertion_point(builder_scope:org.opendaylight.controller.mdsal.Node) } @@ -10287,7 +10371,7 @@ public final class NormalizedNodeMessages { "ntroller.mdsal.Attribute\022\017\n\007intType\030\006 \001(" + "\005\"f\n\022InstanceIdentifier\022B\n\targuments\030\001 \003" + "(\0132/.org.opendaylight.controller.mdsal.P" + - "athArgument\022\014\n\004code\030\002 \003(\t\"\245\003\n\004Node\022\014\n\004pa" + + "athArgument\022\014\n\004code\030\002 \003(\t\"\271\003\n\004Node\022\014\n\004pa" + "th\030\001 \001(\t\022\014\n\004type\030\002 \001(\t\022E\n\014pathArgument\030\003" + " \001(\0132/.org.opendaylight.controller.mdsal" + ".PathArgument\022\017\n\007intType\030\004 \001(\005\022@\n\nattrib", @@ -10298,17 +10382,18 @@ public final class NormalizedNodeMessages { "pe\030\t \001(\005\022V\n\027instanceIdentifierValue\030\n \001(" + "\01325.org.opendaylight.controller.mdsal.In" + "stanceIdentifier\022\021\n\tbitsValue\030\013 \003(\t\022\014\n\004c" + - "ode\030\014 \003(\t\"`\n\tContainer\022\022\n\nparentPath\030\001 \002" + - "(\t\022?\n\016normalizedNode\030\002 \001(\0132\'.org.openday" + - "light.controller.mdsal.Node\"\246\001\n\014NodeMapE", - "ntry\022U\n\026instanceIdentifierPath\030\001 \002(\01325.o" + - "rg.opendaylight.controller.mdsal.Instanc" + - "eIdentifier\022?\n\016normalizedNode\030\002 \001(\0132\'.or" + - "g.opendaylight.controller.mdsal.Node\"N\n\007" + - "NodeMap\022C\n\nmapEntries\030\001 \003(\0132/.org.openda" + - "ylight.controller.mdsal.NodeMapEntryBO\n5" + - "org.opendaylight.controller.protobuff.me" + - "ssages.commonB\026NormalizedNodeMessages" + "ode\030\014 \003(\t\022\022\n\nbytesValue\030\r \001(\014\"`\n\tContain" + + "er\022\022\n\nparentPath\030\001 \002(\t\022?\n\016normalizedNode" + + "\030\002 \001(\0132\'.org.opendaylight.controller.mds", + "al.Node\"\246\001\n\014NodeMapEntry\022U\n\026instanceIden" + + "tifierPath\030\001 \002(\01325.org.opendaylight.cont" + + "roller.mdsal.InstanceIdentifier\022?\n\016norma" + + "lizedNode\030\002 \001(\0132\'.org.opendaylight.contr" + + "oller.mdsal.Node\"N\n\007NodeMap\022C\n\nmapEntrie" + + "s\030\001 \003(\0132/.org.opendaylight.controller.md" + + "sal.NodeMapEntryBO\n5org.opendaylight.con" + + "troller.protobuff.messages.commonB\026Norma" + + "lizedNodeMessages" }; com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner assigner = new com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner() { @@ -10350,7 +10435,7 @@ public final class NormalizedNodeMessages { internal_static_org_opendaylight_controller_mdsal_Node_fieldAccessorTable = new com.google.protobuf.GeneratedMessage.FieldAccessorTable( internal_static_org_opendaylight_controller_mdsal_Node_descriptor, - new java.lang.String[] { "Path", "Type", "PathArgument", "IntType", "Attributes", "Child", "Value", "ValueType", "IntValueType", "InstanceIdentifierValue", "BitsValue", "Code", }); + new java.lang.String[] { "Path", "Type", "PathArgument", "IntType", "Attributes", "Child", "Value", "ValueType", "IntValueType", "InstanceIdentifierValue", "BitsValue", "Code", "BytesValue", }); internal_static_org_opendaylight_controller_mdsal_Container_descriptor = getDescriptor().getMessageTypes().get(6); internal_static_org_opendaylight_controller_mdsal_Container_fieldAccessorTable = new diff --git a/opendaylight/md-sal/sal-clustering-commons/src/main/resources/Common.proto b/opendaylight/md-sal/sal-clustering-commons/src/main/resources/Common.proto index 356bfbf684..842a9725d1 100644 --- a/opendaylight/md-sal/sal-clustering-commons/src/main/resources/Common.proto +++ b/opendaylight/md-sal/sal-clustering-commons/src/main/resources/Common.proto @@ -63,6 +63,8 @@ message Node{ repeated string bitsValue = 11; // intValueType = Bits repeated string code = 12; // A list of string codes which can be used for any repeated strings in the NormalizedNode + + optional bytes bytesValue = 13; } message Container{ diff --git a/opendaylight/md-sal/sal-clustering-commons/src/test/java/org/opendaylight/controller/cluster/datastore/node/utils/serialization/NormalizedNodeSerializerTest.java b/opendaylight/md-sal/sal-clustering-commons/src/test/java/org/opendaylight/controller/cluster/datastore/node/utils/serialization/NormalizedNodeSerializerTest.java index cdb2e69e83..bcc0fd85d0 100644 --- a/opendaylight/md-sal/sal-clustering-commons/src/test/java/org/opendaylight/controller/cluster/datastore/node/utils/serialization/NormalizedNodeSerializerTest.java +++ b/opendaylight/md-sal/sal-clustering-commons/src/test/java/org/opendaylight/controller/cluster/datastore/node/utils/serialization/NormalizedNodeSerializerTest.java @@ -1,15 +1,20 @@ package org.opendaylight.controller.cluster.datastore.node.utils.serialization; +import com.google.common.base.Optional; import org.junit.Rule; import org.junit.Test; import org.junit.rules.ExpectedException; import org.opendaylight.controller.cluster.datastore.util.TestModel; import org.opendaylight.controller.protobuff.messages.common.NormalizedNodeMessages; import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; +import org.opendaylight.yangtools.yang.data.api.schema.ContainerNode; +import org.opendaylight.yangtools.yang.data.api.schema.DataContainerChild; import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode; +import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; public class NormalizedNodeSerializerTest { @@ -49,6 +54,37 @@ public class NormalizedNodeSerializerTest { // created by serializing the original node and deSerializing it back. assertEquals(expectedNode, actualNode); + byte[] binaryData = new byte[5]; + for(byte i=0;i<5;i++){ + binaryData[i] = i; + } + + ContainerNode node1 = TestModel.createBaseTestContainerBuilder() + .withChild(ImmutableNodes.leafNode(TestModel.SOME_BINARY_DATE_QNAME, binaryData)) + .build(); + + NormalizedNodeMessages.Node serializedNode1 = NormalizedNodeSerializer + .serialize(node1); + + ContainerNode node2 = + (ContainerNode) NormalizedNodeSerializer.deSerialize(serializedNode1); + + + // FIXME: This will not work due to BUG 2326. Once that is fixed we can uncomment this assertion + // assertEquals(node1, node2); + + Optional> child = node2.getChild(new YangInstanceIdentifier.NodeIdentifier(TestModel.SOME_BINARY_DATE_QNAME)); + + Object value = child.get().getValue(); + + assertTrue("value should be of type byte[]", value instanceof byte[]); + + byte[] bytesValue = (byte[]) value; + + for(byte i=0;i<5;i++){ + assertEquals(i, bytesValue[i]); + } + } @Test(expected = NullPointerException.class) diff --git a/opendaylight/md-sal/sal-clustering-commons/src/test/java/org/opendaylight/controller/cluster/datastore/node/utils/serialization/ValueTypeTest.java b/opendaylight/md-sal/sal-clustering-commons/src/test/java/org/opendaylight/controller/cluster/datastore/node/utils/serialization/ValueTypeTest.java new file mode 100644 index 0000000000..8fe0633b6e --- /dev/null +++ b/opendaylight/md-sal/sal-clustering-commons/src/test/java/org/opendaylight/controller/cluster/datastore/node/utils/serialization/ValueTypeTest.java @@ -0,0 +1,18 @@ +package org.opendaylight.controller.cluster.datastore.node.utils.serialization; + +import org.junit.Test; + +import static org.junit.Assert.assertEquals; + +public class ValueTypeTest { + + @Test + public void testGetSerializableType(){ + byte[] b = new byte[10]; + b[0] = 1; + b[2] = 2; + + ValueType serializableType = ValueType.getSerializableType(b); + assertEquals(ValueType.BINARY_TYPE, serializableType); + } +} \ No newline at end of file diff --git a/opendaylight/md-sal/sal-clustering-commons/src/test/java/org/opendaylight/controller/cluster/datastore/util/TestModel.java b/opendaylight/md-sal/sal-clustering-commons/src/test/java/org/opendaylight/controller/cluster/datastore/util/TestModel.java index cda4b06e29..99df01a32e 100644 --- a/opendaylight/md-sal/sal-clustering-commons/src/test/java/org/opendaylight/controller/cluster/datastore/util/TestModel.java +++ b/opendaylight/md-sal/sal-clustering-commons/src/test/java/org/opendaylight/controller/cluster/datastore/util/TestModel.java @@ -62,6 +62,7 @@ public class TestModel { public static final QName DESC_QNAME = QName.create(TEST_QNAME, "desc"); public static final QName POINTER_QNAME = QName.create(TEST_QNAME, "pointer"); + public static final QName SOME_BINARY_DATE_QNAME = QName.create(TEST_QNAME, "some-binary-data"); public static final QName SOME_REF_QNAME = QName.create(TEST_QNAME, "some-ref"); public static final QName MYIDENTITY_QNAME = QName.create(TEST_QNAME, @@ -282,114 +283,115 @@ public class TestModel { } - public static ContainerNode createTestContainer() { - - - // Create a list of shoes - // This is to test leaf list entry - final LeafSetEntryNode nike = - ImmutableLeafSetEntryNodeBuilder - .create() - .withNodeIdentifier( - new YangInstanceIdentifier.NodeWithValue(QName.create( - TEST_QNAME, "shoe"), "nike")).withValue("nike").build(); - - final LeafSetEntryNode puma = - ImmutableLeafSetEntryNodeBuilder - .create() - .withNodeIdentifier( - new YangInstanceIdentifier.NodeWithValue(QName.create( - TEST_QNAME, "shoe"), "puma")).withValue("puma").build(); - - final LeafSetNode shoes = - ImmutableLeafSetNodeBuilder - .create() - .withNodeIdentifier( - new YangInstanceIdentifier.NodeIdentifier(QName.create( - TEST_QNAME, "shoe"))).withChild(nike).withChild(puma) - .build(); - - - // Test a leaf-list where each entry contains an identity - final LeafSetEntryNode cap1 = - ImmutableLeafSetEntryNodeBuilder - .create() - .withNodeIdentifier( - new YangInstanceIdentifier.NodeWithValue(QName.create( - TEST_QNAME, "capability"), DESC_QNAME)) - .withValue(DESC_QNAME).build(); - - final LeafSetNode capabilities = - ImmutableLeafSetNodeBuilder - .create() - .withNodeIdentifier( - new YangInstanceIdentifier.NodeIdentifier(QName.create( - TEST_QNAME, "capability"))).withChild(cap1).build(); - - ContainerNode switchFeatures = - ImmutableContainerNodeBuilder - .create() - .withNodeIdentifier( - new YangInstanceIdentifier.NodeIdentifier(SWITCH_FEATURES_QNAME)) - .withChild(capabilities).build(); - - // Create a leaf list with numbers - final LeafSetEntryNode five = - ImmutableLeafSetEntryNodeBuilder - .create() - .withNodeIdentifier( - (new YangInstanceIdentifier.NodeWithValue(QName.create( - TEST_QNAME, "number"), 5))).withValue(5).build(); - final LeafSetEntryNode fifteen = - ImmutableLeafSetEntryNodeBuilder - .create() - .withNodeIdentifier( - (new YangInstanceIdentifier.NodeWithValue(QName.create( - TEST_QNAME, "number"), 15))).withValue(15).build(); - final LeafSetNode numbers = - ImmutableLeafSetNodeBuilder - .create() - .withNodeIdentifier( - new YangInstanceIdentifier.NodeIdentifier(QName.create( - TEST_QNAME, "number"))).withChild(five).withChild(fifteen) - .build(); - - - // Create augmentations - MapEntryNode mapEntry = createAugmentedListEntry(1, "First Test"); - - // Create a bits leaf + public static DataContainerNodeAttrBuilder createBaseTestContainerBuilder() { + // Create a list of shoes + // This is to test leaf list entry + final LeafSetEntryNode nike = + ImmutableLeafSetEntryNodeBuilder + .create() + .withNodeIdentifier( + new YangInstanceIdentifier.NodeWithValue(QName.create( + TEST_QNAME, "shoe"), "nike")).withValue("nike").build(); + + final LeafSetEntryNode puma = + ImmutableLeafSetEntryNodeBuilder + .create() + .withNodeIdentifier( + new YangInstanceIdentifier.NodeWithValue(QName.create( + TEST_QNAME, "shoe"), "puma")).withValue("puma").build(); + + final LeafSetNode shoes = + ImmutableLeafSetNodeBuilder + .create() + .withNodeIdentifier( + new YangInstanceIdentifier.NodeIdentifier(QName.create( + TEST_QNAME, "shoe"))).withChild(nike).withChild(puma) + .build(); + + + // Test a leaf-list where each entry contains an identity + final LeafSetEntryNode cap1 = + ImmutableLeafSetEntryNodeBuilder + .create() + .withNodeIdentifier( + new YangInstanceIdentifier.NodeWithValue(QName.create( + TEST_QNAME, "capability"), DESC_QNAME)) + .withValue(DESC_QNAME).build(); + + final LeafSetNode capabilities = + ImmutableLeafSetNodeBuilder + .create() + .withNodeIdentifier( + new YangInstanceIdentifier.NodeIdentifier(QName.create( + TEST_QNAME, "capability"))).withChild(cap1).build(); + + ContainerNode switchFeatures = + ImmutableContainerNodeBuilder + .create() + .withNodeIdentifier( + new YangInstanceIdentifier.NodeIdentifier(SWITCH_FEATURES_QNAME)) + .withChild(capabilities).build(); + + // Create a leaf list with numbers + final LeafSetEntryNode five = + ImmutableLeafSetEntryNodeBuilder + .create() + .withNodeIdentifier( + (new YangInstanceIdentifier.NodeWithValue(QName.create( + TEST_QNAME, "number"), 5))).withValue(5).build(); + final LeafSetEntryNode fifteen = + ImmutableLeafSetEntryNodeBuilder + .create() + .withNodeIdentifier( + (new YangInstanceIdentifier.NodeWithValue(QName.create( + TEST_QNAME, "number"), 15))).withValue(15).build(); + final LeafSetNode numbers = + ImmutableLeafSetNodeBuilder + .create() + .withNodeIdentifier( + new YangInstanceIdentifier.NodeIdentifier(QName.create( + TEST_QNAME, "number"))).withChild(five).withChild(fifteen) + .build(); + + + // Create augmentations + MapEntryNode mapEntry = createAugmentedListEntry(1, "First Test"); + + // Create a bits leaf NormalizedNodeAttrBuilder> - myBits = Builders.leafBuilder().withNodeIdentifier( - new YangInstanceIdentifier.NodeIdentifier( - QName.create(TEST_QNAME, "my-bits"))).withValue( - ImmutableSet.of("foo", "bar")); + myBits = Builders.leafBuilder().withNodeIdentifier( + new YangInstanceIdentifier.NodeIdentifier( + QName.create(TEST_QNAME, "my-bits"))).withValue( + ImmutableSet.of("foo", "bar")); // Create the document - return ImmutableContainerNodeBuilder - .create() - .withNodeIdentifier( - new YangInstanceIdentifier.NodeIdentifier(TEST_QNAME)) - .withChild(myBits.build()) - .withChild(ImmutableNodes.leafNode(DESC_QNAME, DESC)) - .withChild(ImmutableNodes.leafNode(POINTER_QNAME, "pointer")) - .withChild( - ImmutableNodes.leafNode(SOME_REF_QNAME, YangInstanceIdentifier - .builder().build())) - .withChild(ImmutableNodes.leafNode(MYIDENTITY_QNAME, DESC_QNAME)) - - // .withChild(augmentationNode) - .withChild(shoes) - .withChild(numbers) - .withChild(switchFeatures) - .withChild( - mapNodeBuilder(AUGMENTED_LIST_QNAME).withChild(mapEntry).build()) - .withChild( - mapNodeBuilder(OUTER_LIST_QNAME) - .withChild(mapEntry(OUTER_LIST_QNAME, ID_QNAME, ONE_ID)) - .withChild(BAR_NODE).build()).build(); + return ImmutableContainerNodeBuilder + .create() + .withNodeIdentifier( + new YangInstanceIdentifier.NodeIdentifier(TEST_QNAME)) + .withChild(myBits.build()) + .withChild(ImmutableNodes.leafNode(DESC_QNAME, DESC)) + .withChild(ImmutableNodes.leafNode(POINTER_QNAME, "pointer")) + .withChild( + ImmutableNodes.leafNode(SOME_REF_QNAME, YangInstanceIdentifier + .builder().build())) + .withChild(ImmutableNodes.leafNode(MYIDENTITY_QNAME, DESC_QNAME)) + + // .withChild(augmentationNode) + .withChild(shoes) + .withChild(numbers) + .withChild(switchFeatures) + .withChild( + mapNodeBuilder(AUGMENTED_LIST_QNAME).withChild(mapEntry).build()) + .withChild( + mapNodeBuilder(OUTER_LIST_QNAME) + .withChild(mapEntry(OUTER_LIST_QNAME, ID_QNAME, ONE_ID)) + .withChild(BAR_NODE).build()); + } + public static ContainerNode createTestContainer() { + return createBaseTestContainerBuilder().build(); } public static MapEntryNode createAugmentedListEntry(int id, String name) { diff --git a/opendaylight/md-sal/sal-clustering-commons/src/test/resources/odl-datastore-test.yang b/opendaylight/md-sal/sal-clustering-commons/src/test/resources/odl-datastore-test.yang index 246cc682cd..a1fbc1fdad 100644 --- a/opendaylight/md-sal/sal-clustering-commons/src/test/resources/odl-datastore-test.yang +++ b/opendaylight/md-sal/sal-clustering-commons/src/test/resources/odl-datastore-test.yang @@ -107,6 +107,10 @@ module odl-datastore-test { } } + leaf some-binary-data { + type binary; + } + } } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionChainProxy.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionChainProxy.java index b467ee4ddb..93f9e6b7de 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionChainProxy.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionChainProxy.java @@ -9,18 +9,17 @@ package org.opendaylight.controller.cluster.datastore; import akka.actor.ActorSelection; -import akka.dispatch.Futures; +import akka.dispatch.OnComplete; +import java.util.AbstractMap.SimpleEntry; +import java.util.List; import org.opendaylight.controller.cluster.datastore.messages.CloseTransactionChain; import org.opendaylight.controller.cluster.datastore.utils.ActorContext; import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadTransaction; import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadWriteTransaction; import org.opendaylight.controller.sal.core.spi.data.DOMStoreTransactionChain; import org.opendaylight.controller.sal.core.spi.data.DOMStoreWriteTransaction; -import scala.concurrent.Await; import scala.concurrent.Future; - -import java.util.Collections; -import java.util.List; +import scala.concurrent.Promise; /** * TransactionChainProxy acts as a proxy for a DOMStoreTransactionChain created on a remote shard @@ -28,7 +27,7 @@ import java.util.List; public class TransactionChainProxy implements DOMStoreTransactionChain{ private final ActorContext actorContext; private final String transactionChainId; - private volatile List> cohortFutures = Collections.emptyList(); + private volatile SimpleEntry>> previousTxReadyFutures; public TransactionChainProxy(ActorContext actorContext) { this.actorContext = actorContext; @@ -37,20 +36,17 @@ public class TransactionChainProxy implements DOMStoreTransactionChain{ @Override public DOMStoreReadTransaction newReadOnlyTransaction() { - return new TransactionProxy(actorContext, - TransactionProxy.TransactionType.READ_ONLY, this); + return new ChainedTransactionProxy(actorContext, TransactionProxy.TransactionType.READ_ONLY); } @Override public DOMStoreReadWriteTransaction newReadWriteTransaction() { - return new TransactionProxy(actorContext, - TransactionProxy.TransactionType.READ_WRITE, this); + return new ChainedTransactionProxy(actorContext, TransactionProxy.TransactionType.READ_WRITE); } @Override public DOMStoreWriteTransaction newWriteOnlyTransaction() { - return new TransactionProxy(actorContext, - TransactionProxy.TransactionType.WRITE_ONLY, this); + return new ChainedTransactionProxy(actorContext, TransactionProxy.TransactionType.WRITE_ONLY); } @Override @@ -63,17 +59,62 @@ public class TransactionChainProxy implements DOMStoreTransactionChain{ return transactionChainId; } - public void onTransactionReady(List> cohortFutures){ - this.cohortFutures = cohortFutures; - } + private class ChainedTransactionProxy extends TransactionProxy { + + ChainedTransactionProxy(ActorContext actorContext, TransactionType transactionType) { + super(actorContext, transactionType, transactionChainId); + } + + @Override + protected void onTransactionReady(List> cohortFutures) { + if(!cohortFutures.isEmpty()) { + previousTxReadyFutures = new SimpleEntry<>(getIdentifier(), cohortFutures); + } else { + previousTxReadyFutures = null; + } + } + + /** + * This method is overridden to ensure the previous Tx's ready operations complete + * before we create the next shard Tx in the chain to avoid creation failures if the + * previous Tx's ready operations haven't completed yet. + */ + @Override + protected Future sendCreateTransaction(final ActorSelection shard, + final Object serializedCreateMessage) { + // Check if there are any previous ready Futures. Also make sure the previous ready + // Futures aren't for this Tx as deadlock would occur if tried to wait on our own + // Futures. This may happen b/c the shard Tx creates are done async so it's possible + // for the client to ready this Tx before we've even attempted to create a shard Tx. + if(previousTxReadyFutures == null || + previousTxReadyFutures.getKey().equals(getIdentifier())) { + return super.sendCreateTransaction(shard, serializedCreateMessage); + } + + // Combine the ready Futures into 1. + Future> combinedFutures = akka.dispatch.Futures.sequence( + previousTxReadyFutures.getValue(), actorContext.getActorSystem().dispatcher()); + + // Add a callback for completion of the combined Futures. + final Promise createTxPromise = akka.dispatch.Futures.promise(); + OnComplete> onComplete = new OnComplete>() { + @Override + public void onComplete(Throwable failure, Iterable notUsed) { + if(failure != null) { + // A Ready Future failed so fail the returned Promise. + createTxPromise.failure(failure); + } else { + // Send the CreateTx message and use the resulting Future to complete the + // returned Promise. + createTxPromise.completeWith(actorContext.executeOperationAsync(shard, + serializedCreateMessage)); + } + } + }; + + combinedFutures.onComplete(onComplete, actorContext.getActorSystem().dispatcher()); - public void waitTillCurrentTransactionReady(){ - try { - Await.result(Futures - .sequence(this.cohortFutures, actorContext.getActorSystem().dispatcher()), - actorContext.getOperationDuration()); - } catch (Exception e) { - throw new IllegalStateException("Failed when waiting for transaction on a chain to become ready", e); + return createTxPromise.future(); } } } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionProxy.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionProxy.java index ffb1ab7c55..d93bae22e0 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionProxy.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionProxy.java @@ -21,6 +21,14 @@ import com.google.common.util.concurrent.CheckedFuture; import com.google.common.util.concurrent.FutureCallback; import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.SettableFuture; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicLong; +import javax.annotation.concurrent.GuardedBy; import org.opendaylight.controller.cluster.datastore.exceptions.NoShardLeaderException; import org.opendaylight.controller.cluster.datastore.identifiers.TransactionIdentifier; import org.opendaylight.controller.cluster.datastore.messages.CloseTransaction; @@ -50,15 +58,6 @@ import scala.concurrent.Future; import scala.concurrent.Promise; import scala.concurrent.duration.FiniteDuration; -import javax.annotation.concurrent.GuardedBy; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicLong; - /** * TransactionProxy acts as a proxy for one or more transactions that were created on a remote shard *

@@ -182,23 +181,23 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction { private final TransactionType transactionType; private final ActorContext actorContext; private final TransactionIdentifier identifier; - private final TransactionChainProxy transactionChainProxy; + private final String transactionChainId; private final SchemaContext schemaContext; private boolean inReadyState; public TransactionProxy(ActorContext actorContext, TransactionType transactionType) { - this(actorContext, transactionType, null); + this(actorContext, transactionType, ""); } public TransactionProxy(ActorContext actorContext, TransactionType transactionType, - TransactionChainProxy transactionChainProxy) { + String transactionChainId) { this.actorContext = Preconditions.checkNotNull(actorContext, "actorContext should not be null"); this.transactionType = Preconditions.checkNotNull(transactionType, "transactionType should not be null"); this.schemaContext = Preconditions.checkNotNull(actorContext.getSchemaContext(), "schemaContext should not be null"); - this.transactionChainProxy = transactionChainProxy; + this.transactionChainId = transactionChainId; String memberName = actorContext.getCurrentMemberName(); if(memberName == null){ @@ -237,6 +236,18 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction { return recordedOperationFutures; } + @VisibleForTesting + boolean hasTransactionContext() { + for(TransactionFutureCallback txFutureCallback : txFutureCallbackMap.values()) { + TransactionContext transactionContext = txFutureCallback.getTransactionContext(); + if(transactionContext != null) { + return true; + } + } + + return false; + } + @Override public CheckedFuture>, ReadFailedException> read( final YangInstanceIdentifier path) { @@ -433,14 +444,32 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction { } } - if(transactionChainProxy != null){ - transactionChainProxy.onTransactionReady(cohortFutures); - } + onTransactionReady(cohortFutures); return new ThreePhaseCommitCohortProxy(actorContext, cohortFutures, identifier.toString()); } + /** + * Method for derived classes to be notified when the transaction has been readied. + * + * @param cohortFutures the cohort Futures for each shard transaction. + */ + protected void onTransactionReady(List> cohortFutures) { + } + + /** + * Method called to send a CreateTransaction message to a shard. + * + * @param shard the shard actor to send to + * @param serializedCreateMessage the serialized message to send + * @return the response Future + */ + protected Future sendCreateTransaction(ActorSelection shard, + Object serializedCreateMessage) { + return actorContext.executeOperationAsync(shard, serializedCreateMessage); + } + @Override public Object getIdentifier() { return this.identifier; @@ -502,10 +531,7 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction { } public String getTransactionChainId() { - if(transactionChainProxy == null){ - return ""; - } - return transactionChainProxy.getTransactionChainId(); + return transactionChainId; } /** @@ -591,7 +617,7 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction { * Performs a CreateTransaction try async. */ private void tryCreateTransaction() { - Future createTxFuture = actorContext.executeOperationAsync(primaryShard, + Future createTxFuture = sendCreateTransaction(primaryShard, new CreateTransaction(identifier.toString(), TransactionProxy.this.transactionType.ordinal(), getTransactionChainId()).toSerializable()); @@ -626,29 +652,42 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction { // respect to #addTxOperationOnComplete to handle timing issues and ensure no // TransactionOperation is missed and that they are processed in the order they occurred. synchronized(txOperationsOnComplete) { + // Store the new TransactionContext locally until we've completed invoking the + // TransactionOperations. This avoids thread timing issues which could cause + // out-of-order TransactionOperations. Eg, on a modification operation, if the + // TransactionContext is non-null, then we directly call the TransactionContext. + // However, at the same time, the code may be executing the cached + // TransactionOperations. So to avoid thus timing, we don't publish the + // TransactionContext until after we've executed all cached TransactionOperations. + TransactionContext localTransactionContext; if(failure != null) { LOG.debug("Tx {} Creating NoOpTransaction because of error: {}", identifier, failure.getMessage()); - transactionContext = new NoOpTransactionContext(failure, identifier); + localTransactionContext = new NoOpTransactionContext(failure, identifier); } else if (response.getClass().equals(CreateTransactionReply.SERIALIZABLE_CLASS)) { - createValidTransactionContext(CreateTransactionReply.fromSerializable(response)); + localTransactionContext = createValidTransactionContext( + CreateTransactionReply.fromSerializable(response)); } else { IllegalArgumentException exception = new IllegalArgumentException(String.format( "Invalid reply type %s for CreateTransaction", response.getClass())); - transactionContext = new NoOpTransactionContext(exception, identifier); + localTransactionContext = new NoOpTransactionContext(exception, identifier); } for(TransactionOperation oper: txOperationsOnComplete) { - oper.invoke(transactionContext); + oper.invoke(localTransactionContext); } txOperationsOnComplete.clear(); + + // We're done invoking the TransactionOperations so we can now publish the + // TransactionContext. + transactionContext = localTransactionContext; } } - private void createValidTransactionContext(CreateTransactionReply reply) { + private TransactionContext createValidTransactionContext(CreateTransactionReply reply) { String transactionPath = reply.getTransactionPath(); LOG.debug("Tx {} Received transaction actor path {}", identifier, transactionPath); @@ -669,7 +708,7 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction { // Check if TxActor is created in the same node boolean isTxActorLocal = actorContext.isLocalPath(transactionPath); - transactionContext = new TransactionContextImpl(transactionPath, transactionActor, identifier, + return new TransactionContextImpl(transactionPath, transactionActor, identifier, actorContext, schemaContext, isTxActorLocal, reply.getVersion()); } } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DistributedDataStoreIntegrationTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DistributedDataStoreIntegrationTest.java index cec7ce1e3f..12c566d33d 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DistributedDataStoreIntegrationTest.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DistributedDataStoreIntegrationTest.java @@ -1,11 +1,17 @@ package org.opendaylight.controller.cluster.datastore; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; import akka.actor.ActorRef; import akka.actor.ActorSystem; import akka.actor.PoisonPill; import com.google.common.base.Optional; import com.google.common.util.concurrent.CheckedFuture; import com.google.common.util.concurrent.Uninterruptibles; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; import org.junit.Test; import org.opendaylight.controller.cluster.datastore.exceptions.NoShardLeaderException; import org.opendaylight.controller.cluster.datastore.exceptions.NotInitializedException; @@ -26,16 +32,10 @@ import org.opendaylight.controller.sal.core.spi.data.DOMStoreTransactionChain; import org.opendaylight.controller.sal.core.spi.data.DOMStoreWriteTransaction; import org.opendaylight.yangtools.concepts.ListenerRegistration; import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; +import org.opendaylight.yangtools.yang.data.api.schema.MapNode; import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode; import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes; import org.opendaylight.yangtools.yang.model.api.SchemaContext; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicReference; - -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertNotNull; public class DistributedDataStoreIntegrationTest extends AbstractActorTest { @@ -77,10 +77,7 @@ public class DistributedDataStoreIntegrationTest extends AbstractActorTest { DOMStoreThreePhaseCommitCohort cohort = writeTx.ready(); - Boolean canCommit = cohort.canCommit().get(5, TimeUnit.SECONDS); - assertEquals("canCommit", true, canCommit); - cohort.preCommit().get(5, TimeUnit.SECONDS); - cohort.commit().get(5, TimeUnit.SECONDS); + doCommit(cohort); // Verify the data in the store @@ -131,10 +128,7 @@ public class DistributedDataStoreIntegrationTest extends AbstractActorTest { // 5. Commit the Tx - Boolean canCommit = cohort.canCommit().get(5, TimeUnit.SECONDS); - assertEquals("canCommit", true, canCommit); - cohort.preCommit().get(5, TimeUnit.SECONDS); - cohort.commit().get(5, TimeUnit.SECONDS); + doCommit(cohort); // 6. Verify the data in the store @@ -219,9 +213,7 @@ public class DistributedDataStoreIntegrationTest extends AbstractActorTest { // Wait for the Tx commit to complete. - assertEquals("canCommit", true, txCohort.get().canCommit().get(5, TimeUnit.SECONDS)); - txCohort.get().preCommit().get(5, TimeUnit.SECONDS); - txCohort.get().commit().get(5, TimeUnit.SECONDS); + doCommit(txCohort.get()); // Verify the data in the store @@ -552,10 +544,8 @@ public class DistributedDataStoreIntegrationTest extends AbstractActorTest { @Test public void testTransactionChain() throws Exception{ - System.setProperty("shard.persistent", "true"); new IntegrationTestKit(getSystem()) {{ - DistributedDataStore dataStore = - setupDistributedDataStore("transactionChainIntegrationTest", "test-1"); + DistributedDataStore dataStore = setupDistributedDataStore("testTransactionChain", "test-1"); // 1. Create a Tx chain and write-only Tx @@ -566,30 +556,76 @@ public class DistributedDataStoreIntegrationTest extends AbstractActorTest { // 2. Write some data - NormalizedNode containerNode = ImmutableNodes.containerNode(TestModel.TEST_QNAME); - writeTx.write(TestModel.TEST_PATH, containerNode); + NormalizedNode testNode = ImmutableNodes.containerNode(TestModel.TEST_QNAME); + writeTx.write(TestModel.TEST_PATH, testNode); // 3. Ready the Tx for commit - DOMStoreThreePhaseCommitCohort cohort = writeTx.ready(); + final DOMStoreThreePhaseCommitCohort cohort1 = writeTx.ready(); - // 4. Commit the Tx + // 4. Commit the Tx on another thread that first waits for the second read Tx. - Boolean canCommit = cohort.canCommit().get(5, TimeUnit.SECONDS); - assertEquals("canCommit", true, canCommit); - cohort.preCommit().get(5, TimeUnit.SECONDS); - cohort.commit().get(5, TimeUnit.SECONDS); + final CountDownLatch continueCommit1 = new CountDownLatch(1); + final CountDownLatch commit1Done = new CountDownLatch(1); + final AtomicReference commit1Error = new AtomicReference<>(); + new Thread() { + @Override + public void run() { + try { + continueCommit1.await(); + doCommit(cohort1); + } catch (Exception e) { + commit1Error.set(e); + } finally { + commit1Done.countDown(); + } + } + }.start(); - // 5. Verify the data in the store + // 5. Create a new read Tx from the chain to read and verify the data from the first + // Tx is visible after being readied. DOMStoreReadTransaction readTx = txChain.newReadOnlyTransaction(); - Optional> optional = readTx.read(TestModel.TEST_PATH).get(5, TimeUnit.SECONDS); assertEquals("isPresent", true, optional.isPresent()); - assertEquals("Data node", containerNode, optional.get()); + assertEquals("Data node", testNode, optional.get()); + + // 6. Create a new RW Tx from the chain, write more data, and ready it + + DOMStoreReadWriteTransaction rwTx = txChain.newReadWriteTransaction(); + MapNode outerNode = ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build(); + rwTx.write(TestModel.OUTER_LIST_PATH, outerNode); + + DOMStoreThreePhaseCommitCohort cohort2 = rwTx.ready(); + + // 7. Create a new read Tx from the chain to read the data from the last RW Tx to + // verify it is visible. + + readTx = txChain.newReadOnlyTransaction(); + optional = readTx.read(TestModel.OUTER_LIST_PATH).get(5, TimeUnit.SECONDS); + assertEquals("isPresent", true, optional.isPresent()); + assertEquals("Data node", outerNode, optional.get()); + + // 8. Wait for the 2 commits to complete and close the chain. + + continueCommit1.countDown(); + Uninterruptibles.awaitUninterruptibly(commit1Done, 5, TimeUnit.SECONDS); + + if(commit1Error.get() != null) { + throw commit1Error.get(); + } + + doCommit(cohort2); txChain.close(); + // 9. Create a new read Tx from the data store and verify committed data. + + readTx = dataStore.newReadOnlyTransaction(); + optional = readTx.read(TestModel.OUTER_LIST_PATH).get(5, TimeUnit.SECONDS); + assertEquals("isPresent", true, optional.isPresent()); + assertEquals("Data node", outerNode, optional.get()); + cleanup(dataStore); }}; } @@ -600,7 +636,10 @@ public class DistributedDataStoreIntegrationTest extends AbstractActorTest { DistributedDataStore dataStore = setupDistributedDataStore("testChangeListenerRegistration", "test-1"); - MockDataChangeListener listener = new MockDataChangeListener(3); + testWriteTransaction(dataStore, TestModel.TEST_PATH, + ImmutableNodes.containerNode(TestModel.TEST_QNAME)); + + MockDataChangeListener listener = new MockDataChangeListener(1); ListenerRegistration listenerReg = dataStore.registerChangeListener(TestModel.TEST_PATH, listener, @@ -608,8 +647,13 @@ public class DistributedDataStoreIntegrationTest extends AbstractActorTest { assertNotNull("registerChangeListener returned null", listenerReg); - testWriteTransaction(dataStore, TestModel.TEST_PATH, - ImmutableNodes.containerNode(TestModel.TEST_QNAME)); + // Wait for the initial notification + + listener.waitForChangeEvents(TestModel.TEST_PATH); + + listener.reset(2); + + // Write 2 updates. testWriteTransaction(dataStore, TestModel.OUTER_LIST_PATH, ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build()); @@ -619,7 +663,9 @@ public class DistributedDataStoreIntegrationTest extends AbstractActorTest { testWriteTransaction(dataStore, listPath, ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1)); - listener.waitForChangeEvents(TestModel.TEST_PATH, TestModel.OUTER_LIST_PATH, listPath ); + // Wait for the 2 updates. + + listener.waitForChangeEvents(TestModel.OUTER_LIST_PATH, listPath); listenerReg.close(); @@ -694,10 +740,7 @@ public class DistributedDataStoreIntegrationTest extends AbstractActorTest { // 4. Commit the Tx - Boolean canCommit = cohort.canCommit().get(5, TimeUnit.SECONDS); - assertEquals("canCommit", true, canCommit); - cohort.preCommit().get(5, TimeUnit.SECONDS); - cohort.commit().get(5, TimeUnit.SECONDS); + doCommit(cohort); // 5. Verify the data in the store @@ -708,6 +751,13 @@ public class DistributedDataStoreIntegrationTest extends AbstractActorTest { assertEquals("Data node", nodeToWrite, optional.get()); } + void doCommit(final DOMStoreThreePhaseCommitCohort cohort) throws Exception { + Boolean canCommit = cohort.canCommit().get(5, TimeUnit.SECONDS); + assertEquals("canCommit", true, canCommit); + cohort.preCommit().get(5, TimeUnit.SECONDS); + cohort.commit().get(5, TimeUnit.SECONDS); + } + void cleanup(DistributedDataStore dataStore) { dataStore.getActorContext().getShardManager().tell(PoisonPill.getInstance(), null); } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/utils/MockDataChangeListener.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/utils/MockDataChangeListener.java index f2f49d1bf3..5bbdcae93c 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/utils/MockDataChangeListener.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/utils/MockDataChangeListener.java @@ -9,6 +9,10 @@ package org.opendaylight.controller.cluster.datastore.utils; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; +import com.google.common.collect.Lists; +import com.google.common.util.concurrent.Uninterruptibles; +import java.util.Collections; import java.util.List; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; @@ -16,8 +20,6 @@ import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeEvent; import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeListener; import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode; -import com.google.common.collect.Lists; -import com.google.common.util.concurrent.Uninterruptibles; /** * A mock DataChangeListener implementation. @@ -27,14 +29,21 @@ import com.google.common.util.concurrent.Uninterruptibles; public class MockDataChangeListener implements AsyncDataChangeListener> { - private final List>> - changeList = Lists.newArrayList(); - private final CountDownLatch changeLatch; - private final int expChangeEventCount; + private final List>> changeList = + Collections.synchronizedList(Lists.>>newArrayList()); + + private volatile CountDownLatch changeLatch; + private int expChangeEventCount; public MockDataChangeListener(int expChangeEventCount) { + reset(expChangeEventCount); + } + + public void reset(int expChangeEventCount) { changeLatch = new CountDownLatch(expChangeEventCount); this.expChangeEventCount = expChangeEventCount; + changeList.clear(); } @Override @@ -44,8 +53,13 @@ public class MockDataChangeListener implements } public void waitForChangeEvents(YangInstanceIdentifier... expPaths) { - assertEquals("Change notifications complete", true, - Uninterruptibles.awaitUninterruptibly(changeLatch, 5, TimeUnit.SECONDS)); + boolean done = Uninterruptibles.awaitUninterruptibly(changeLatch, 5, TimeUnit.SECONDS); + if(!done) { + fail(String.format("Missing change notifications. Expected: %d. Actual: %d", + expChangeEventCount, (expChangeEventCount - changeLatch.getCount()))); + } + + assertEquals("Change notifications complete", true, done); for(int i = 0; i < expPaths.length; i++) { assertTrue(String.format("Change %d does not contain %s", (i+1), expPaths[i]),