Merge "Fix warnings in clustering"
authorTony Tkacik <ttkacik@cisco.com>
Fri, 7 Nov 2014 10:03:58 +0000 (10:03 +0000)
committerGerrit Code Review <gerrit@opendaylight.org>
Fri, 7 Nov 2014 10:03:58 +0000 (10:03 +0000)
12 files changed:
opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/datastore/node/utils/serialization/ValueSerializer.java
opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/datastore/node/utils/serialization/ValueType.java
opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/protobuff/messages/common/NormalizedNodeMessages.java
opendaylight/md-sal/sal-clustering-commons/src/main/resources/Common.proto
opendaylight/md-sal/sal-clustering-commons/src/test/java/org/opendaylight/controller/cluster/datastore/node/utils/serialization/NormalizedNodeSerializerTest.java
opendaylight/md-sal/sal-clustering-commons/src/test/java/org/opendaylight/controller/cluster/datastore/node/utils/serialization/ValueTypeTest.java [new file with mode: 0644]
opendaylight/md-sal/sal-clustering-commons/src/test/java/org/opendaylight/controller/cluster/datastore/util/TestModel.java
opendaylight/md-sal/sal-clustering-commons/src/test/resources/odl-datastore-test.yang
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionChainProxy.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionProxy.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DistributedDataStoreIntegrationTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/utils/MockDataChangeListener.java

index 6a843f57c7718fb76911e90649b7194e3b6db0bb..aac45e18b5d7c420eeae0f6d47b62fdee6cb3078 100644 (file)
@@ -8,6 +8,7 @@
 
 package org.opendaylight.controller.cluster.datastore.node.utils.serialization;
 
+import com.google.protobuf.ByteString;
 import org.opendaylight.controller.cluster.datastore.node.utils.QNameFactory;
 import org.opendaylight.controller.cluster.datastore.util.InstanceIdentifierUtils;
 import org.opendaylight.controller.protobuff.messages.common.NormalizedNodeMessages;
@@ -28,16 +29,18 @@ public class ValueSerializer {
                 InstanceIdentifierUtils.toSerializable((YangInstanceIdentifier) value, context));
         } else if(value instanceof Set) {
             Set set = (Set) value;
-            if(!set.isEmpty()){
-                for(Object o : set){
-                    if(o instanceof String){
+            if (!set.isEmpty()) {
+                for (Object o : set) {
+                    if (o instanceof String) {
                         builder.addBitsValue(o.toString());
                     } else {
                         throw new IllegalArgumentException("Expected value type to be Bits but was : " +
-                            value.toString());
+                                value.toString());
                     }
                 }
             }
+        } else if(value instanceof byte[]){
+            builder.setBytesValue(ByteString.copyFrom((byte[]) value));
         } else {
             builder.setValue(value.toString());
         }
@@ -57,6 +60,8 @@ public class ValueSerializer {
                     node.getInstanceIdentifierValue(), context);
         } else if(node.getIntValueType() == ValueType.BITS_TYPE.ordinal()){
             return new HashSet(node.getBitsValueList());
+        } else if(node.getIntValueType() == ValueType.BINARY_TYPE.ordinal()){
+            return node.getBytesValue().toByteArray();
         }
         return deSerializeBasicTypes(node.getIntValueType(), node.getValue());
     }
index 49db8967a685914924eb4e90d5914ecdbc1b43a8..6c884734e298a54dea541fe4973404365db41573 100644 (file)
@@ -29,7 +29,8 @@ public enum ValueType {
     YANG_IDENTIFIER_TYPE,
     STRING_TYPE,
     BIG_INTEGER_TYPE,
-    BIG_DECIMAL_TYPE;
+    BIG_DECIMAL_TYPE,
+    BINARY_TYPE;
 
     private static Map<Class, ValueType> types = new HashMap<>();
 
@@ -45,6 +46,7 @@ public enum ValueType {
         types.put(Short.class,SHORT_TYPE);
         types.put(BigInteger.class, BIG_INTEGER_TYPE);
         types.put(BigDecimal.class, BIG_DECIMAL_TYPE);
+        types.put(byte[].class, BINARY_TYPE);
     }
 
     public static final ValueType getSerializableType(Object node){
index e7f2c361aeb0615c52ff44002ee1b35451396b46..8285009d359fd56ff6d6ed797d3473e7440b7d60 100644 (file)
@@ -5410,6 +5410,16 @@ public final class NormalizedNodeMessages {
      */
     com.google.protobuf.ByteString
         getCodeBytes(int index);
+
+    // optional bytes bytesValue = 13;
+    /**
+     * <code>optional bytes bytesValue = 13;</code>
+     */
+    boolean hasBytesValue();
+    /**
+     * <code>optional bytes bytesValue = 13;</code>
+     */
+    com.google.protobuf.ByteString getBytesValue();
   }
   /**
    * Protobuf type {@code org.opendaylight.controller.mdsal.Node}
@@ -5550,6 +5560,11 @@ public final class NormalizedNodeMessages {
               code_.add(input.readBytes());
               break;
             }
+            case 106: {
+              bitField0_ |= 0x00000100;
+              bytesValue_ = input.readBytes();
+              break;
+            }
           }
         }
       } catch (com.google.protobuf.InvalidProtocolBufferException e) {
@@ -6070,6 +6085,22 @@ public final class NormalizedNodeMessages {
       return code_.getByteString(index);
     }
 
+    // optional bytes bytesValue = 13;
+    public static final int BYTESVALUE_FIELD_NUMBER = 13;
+    private com.google.protobuf.ByteString bytesValue_;
+    /**
+     * <code>optional bytes bytesValue = 13;</code>
+     */
+    public boolean hasBytesValue() {
+      return ((bitField0_ & 0x00000100) == 0x00000100);
+    }
+    /**
+     * <code>optional bytes bytesValue = 13;</code>
+     */
+    public com.google.protobuf.ByteString getBytesValue() {
+      return bytesValue_;
+    }
+
     private void initFields() {
       path_ = "";
       type_ = "";
@@ -6083,6 +6114,7 @@ public final class NormalizedNodeMessages {
       instanceIdentifierValue_ = org.opendaylight.controller.protobuff.messages.common.NormalizedNodeMessages.InstanceIdentifier.getDefaultInstance();
       bitsValue_ = com.google.protobuf.LazyStringArrayList.EMPTY;
       code_ = com.google.protobuf.LazyStringArrayList.EMPTY;
+      bytesValue_ = com.google.protobuf.ByteString.EMPTY;
     }
     private byte memoizedIsInitialized = -1;
     public final boolean isInitialized() {
@@ -6156,6 +6188,9 @@ public final class NormalizedNodeMessages {
       for (int i = 0; i < code_.size(); i++) {
         output.writeBytes(12, code_.getByteString(i));
       }
+      if (((bitField0_ & 0x00000100) == 0x00000100)) {
+        output.writeBytes(13, bytesValue_);
+      }
       getUnknownFields().writeTo(output);
     }
 
@@ -6223,6 +6258,10 @@ public final class NormalizedNodeMessages {
         size += dataSize;
         size += 1 * getCodeList().size();
       }
+      if (((bitField0_ & 0x00000100) == 0x00000100)) {
+        size += com.google.protobuf.CodedOutputStream
+          .computeBytesSize(13, bytesValue_);
+      }
       size += getUnknownFields().getSerializedSize();
       memoizedSerializedSize = size;
       return size;
@@ -6383,6 +6422,8 @@ public final class NormalizedNodeMessages {
         bitField0_ = (bitField0_ & ~0x00000400);
         code_ = com.google.protobuf.LazyStringArrayList.EMPTY;
         bitField0_ = (bitField0_ & ~0x00000800);
+        bytesValue_ = com.google.protobuf.ByteString.EMPTY;
+        bitField0_ = (bitField0_ & ~0x00001000);
         return this;
       }
 
@@ -6481,6 +6522,10 @@ public final class NormalizedNodeMessages {
           bitField0_ = (bitField0_ & ~0x00000800);
         }
         result.code_ = code_;
+        if (((from_bitField0_ & 0x00001000) == 0x00001000)) {
+          to_bitField0_ |= 0x00000100;
+        }
+        result.bytesValue_ = bytesValue_;
         result.bitField0_ = to_bitField0_;
         onBuilt();
         return result;
@@ -6601,6 +6646,9 @@ public final class NormalizedNodeMessages {
           }
           onChanged();
         }
+        if (other.hasBytesValue()) {
+          setBytesValue(other.getBytesValue());
+        }
         this.mergeUnknownFields(other.getUnknownFields());
         return this;
       }
@@ -8110,6 +8158,42 @@ public final class NormalizedNodeMessages {
         return this;
       }
 
+      // optional bytes bytesValue = 13;
+      private com.google.protobuf.ByteString bytesValue_ = com.google.protobuf.ByteString.EMPTY;
+      /**
+       * <code>optional bytes bytesValue = 13;</code>
+       */
+      public boolean hasBytesValue() {
+        return ((bitField0_ & 0x00001000) == 0x00001000);
+      }
+      /**
+       * <code>optional bytes bytesValue = 13;</code>
+       */
+      public com.google.protobuf.ByteString getBytesValue() {
+        return bytesValue_;
+      }
+      /**
+       * <code>optional bytes bytesValue = 13;</code>
+       */
+      public Builder setBytesValue(com.google.protobuf.ByteString value) {
+        if (value == null) {
+    throw new NullPointerException();
+  }
+  bitField0_ |= 0x00001000;
+        bytesValue_ = value;
+        onChanged();
+        return this;
+      }
+      /**
+       * <code>optional bytes bytesValue = 13;</code>
+       */
+      public Builder clearBytesValue() {
+        bitField0_ = (bitField0_ & ~0x00001000);
+        bytesValue_ = getDefaultInstance().getBytesValue();
+        onChanged();
+        return this;
+      }
+
       // @@protoc_insertion_point(builder_scope:org.opendaylight.controller.mdsal.Node)
     }
 
@@ -10287,7 +10371,7 @@ public final class NormalizedNodeMessages {
       "ntroller.mdsal.Attribute\022\017\n\007intType\030\006 \001(" +
       "\005\"f\n\022InstanceIdentifier\022B\n\targuments\030\001 \003" +
       "(\0132/.org.opendaylight.controller.mdsal.P" +
-      "athArgument\022\014\n\004code\030\002 \003(\t\"\245\003\n\004Node\022\014\n\004pa" +
+      "athArgument\022\014\n\004code\030\002 \003(\t\"\271\003\n\004Node\022\014\n\004pa" +
       "th\030\001 \001(\t\022\014\n\004type\030\002 \001(\t\022E\n\014pathArgument\030\003" +
       " \001(\0132/.org.opendaylight.controller.mdsal" +
       ".PathArgument\022\017\n\007intType\030\004 \001(\005\022@\n\nattrib",
@@ -10298,17 +10382,18 @@ public final class NormalizedNodeMessages {
       "pe\030\t \001(\005\022V\n\027instanceIdentifierValue\030\n \001(" +
       "\01325.org.opendaylight.controller.mdsal.In" +
       "stanceIdentifier\022\021\n\tbitsValue\030\013 \003(\t\022\014\n\004c" +
-      "ode\030\014 \003(\t\"`\n\tContainer\022\022\n\nparentPath\030\001 \002" +
-      "(\t\022?\n\016normalizedNode\030\002 \001(\0132\'.org.openday" +
-      "light.controller.mdsal.Node\"\246\001\n\014NodeMapE",
-      "ntry\022U\n\026instanceIdentifierPath\030\001 \002(\01325.o" +
-      "rg.opendaylight.controller.mdsal.Instanc" +
-      "eIdentifier\022?\n\016normalizedNode\030\002 \001(\0132\'.or" +
-      "g.opendaylight.controller.mdsal.Node\"N\n\007" +
-      "NodeMap\022C\n\nmapEntries\030\001 \003(\0132/.org.openda" +
-      "ylight.controller.mdsal.NodeMapEntryBO\n5" +
-      "org.opendaylight.controller.protobuff.me" +
-      "ssages.commonB\026NormalizedNodeMessages"
+      "ode\030\014 \003(\t\022\022\n\nbytesValue\030\r \001(\014\"`\n\tContain" +
+      "er\022\022\n\nparentPath\030\001 \002(\t\022?\n\016normalizedNode" +
+      "\030\002 \001(\0132\'.org.opendaylight.controller.mds",
+      "al.Node\"\246\001\n\014NodeMapEntry\022U\n\026instanceIden" +
+      "tifierPath\030\001 \002(\01325.org.opendaylight.cont" +
+      "roller.mdsal.InstanceIdentifier\022?\n\016norma" +
+      "lizedNode\030\002 \001(\0132\'.org.opendaylight.contr" +
+      "oller.mdsal.Node\"N\n\007NodeMap\022C\n\nmapEntrie" +
+      "s\030\001 \003(\0132/.org.opendaylight.controller.md" +
+      "sal.NodeMapEntryBO\n5org.opendaylight.con" +
+      "troller.protobuff.messages.commonB\026Norma" +
+      "lizedNodeMessages"
     };
     com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner assigner =
       new com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner() {
@@ -10350,7 +10435,7 @@ public final class NormalizedNodeMessages {
           internal_static_org_opendaylight_controller_mdsal_Node_fieldAccessorTable = new
             com.google.protobuf.GeneratedMessage.FieldAccessorTable(
               internal_static_org_opendaylight_controller_mdsal_Node_descriptor,
-              new java.lang.String[] { "Path", "Type", "PathArgument", "IntType", "Attributes", "Child", "Value", "ValueType", "IntValueType", "InstanceIdentifierValue", "BitsValue", "Code", });
+              new java.lang.String[] { "Path", "Type", "PathArgument", "IntType", "Attributes", "Child", "Value", "ValueType", "IntValueType", "InstanceIdentifierValue", "BitsValue", "Code", "BytesValue", });
           internal_static_org_opendaylight_controller_mdsal_Container_descriptor =
             getDescriptor().getMessageTypes().get(6);
           internal_static_org_opendaylight_controller_mdsal_Container_fieldAccessorTable = new
index 356bfbf684295a0392198dd0b02d92fcf12b4d25..842a9725d166452a229625c36c51ee73c31d73d8 100644 (file)
@@ -63,6 +63,8 @@ message Node{
   repeated string bitsValue = 11; // intValueType = Bits
 
   repeated string code = 12; // A list of string codes which can be used for any repeated strings in the NormalizedNode
+
+  optional bytes bytesValue = 13;
 }
 
 message Container{
index cdb2e69e831eb67cab28311919f9d30e0698d7ec..bcc0fd85d0f343f7b1e9ec02ed6c4a6aec1f4027 100644 (file)
@@ -1,15 +1,20 @@
 package org.opendaylight.controller.cluster.datastore.node.utils.serialization;
 
+import com.google.common.base.Optional;
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.rules.ExpectedException;
 import org.opendaylight.controller.cluster.datastore.util.TestModel;
 import org.opendaylight.controller.protobuff.messages.common.NormalizedNodeMessages;
 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
+import org.opendaylight.yangtools.yang.data.api.schema.ContainerNode;
+import org.opendaylight.yangtools.yang.data.api.schema.DataContainerChild;
 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
+import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
 
 public class NormalizedNodeSerializerTest {
 
@@ -49,6 +54,37 @@ public class NormalizedNodeSerializerTest {
         // created by serializing the original node and deSerializing it back.
         assertEquals(expectedNode, actualNode);
 
+        byte[] binaryData = new byte[5];
+        for(byte i=0;i<5;i++){
+            binaryData[i] = i;
+        }
+
+        ContainerNode node1 = TestModel.createBaseTestContainerBuilder()
+                .withChild(ImmutableNodes.leafNode(TestModel.SOME_BINARY_DATE_QNAME, binaryData))
+                .build();
+
+        NormalizedNodeMessages.Node serializedNode1 = NormalizedNodeSerializer
+                .serialize(node1);
+
+        ContainerNode node2 =
+                (ContainerNode) NormalizedNodeSerializer.deSerialize(serializedNode1);
+
+
+        // FIXME: This will not work due to BUG 2326. Once that is fixed we can uncomment this assertion
+        // assertEquals(node1, node2);
+
+        Optional<DataContainerChild<? extends YangInstanceIdentifier.PathArgument, ?>> child = node2.getChild(new YangInstanceIdentifier.NodeIdentifier(TestModel.SOME_BINARY_DATE_QNAME));
+
+        Object value = child.get().getValue();
+
+        assertTrue("value should be of type byte[]", value instanceof byte[]);
+
+        byte[] bytesValue = (byte[]) value;
+
+        for(byte i=0;i<5;i++){
+            assertEquals(i, bytesValue[i]);
+        }
+
     }
 
     @Test(expected = NullPointerException.class)
diff --git a/opendaylight/md-sal/sal-clustering-commons/src/test/java/org/opendaylight/controller/cluster/datastore/node/utils/serialization/ValueTypeTest.java b/opendaylight/md-sal/sal-clustering-commons/src/test/java/org/opendaylight/controller/cluster/datastore/node/utils/serialization/ValueTypeTest.java
new file mode 100644 (file)
index 0000000..8fe0633
--- /dev/null
@@ -0,0 +1,18 @@
+package org.opendaylight.controller.cluster.datastore.node.utils.serialization;
+
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+
+public class ValueTypeTest {
+
+    @Test
+    public void testGetSerializableType(){
+        byte[] b = new byte[10];
+        b[0] = 1;
+        b[2] = 2;
+
+        ValueType serializableType = ValueType.getSerializableType(b);
+        assertEquals(ValueType.BINARY_TYPE, serializableType);
+    }
+}
\ No newline at end of file
index cda4b06e2926cf42bb8c975c8e4b7fd79a89b781..99df01a32e17c9c10c0e4ada9268e52e15008162 100644 (file)
@@ -62,6 +62,7 @@ public class TestModel {
 
   public static final QName DESC_QNAME = QName.create(TEST_QNAME, "desc");
   public static final QName POINTER_QNAME = QName.create(TEST_QNAME, "pointer");
+  public static final QName SOME_BINARY_DATE_QNAME = QName.create(TEST_QNAME, "some-binary-data");
   public static final QName SOME_REF_QNAME = QName.create(TEST_QNAME,
       "some-ref");
   public static final QName MYIDENTITY_QNAME = QName.create(TEST_QNAME,
@@ -282,114 +283,115 @@ public class TestModel {
 
   }
 
-  public static ContainerNode createTestContainer() {
-
-
-    // Create a list of shoes
-    // This is to test leaf list entry
-    final LeafSetEntryNode<Object> nike =
-        ImmutableLeafSetEntryNodeBuilder
-            .create()
-            .withNodeIdentifier(
-                new YangInstanceIdentifier.NodeWithValue(QName.create(
-                    TEST_QNAME, "shoe"), "nike")).withValue("nike").build();
-
-    final LeafSetEntryNode<Object> puma =
-        ImmutableLeafSetEntryNodeBuilder
-            .create()
-            .withNodeIdentifier(
-                new YangInstanceIdentifier.NodeWithValue(QName.create(
-                    TEST_QNAME, "shoe"), "puma")).withValue("puma").build();
-
-    final LeafSetNode<Object> shoes =
-        ImmutableLeafSetNodeBuilder
-            .create()
-            .withNodeIdentifier(
-                new YangInstanceIdentifier.NodeIdentifier(QName.create(
-                    TEST_QNAME, "shoe"))).withChild(nike).withChild(puma)
-            .build();
-
-
-    // Test a leaf-list where each entry contains an identity
-    final LeafSetEntryNode<Object> cap1 =
-        ImmutableLeafSetEntryNodeBuilder
-            .create()
-            .withNodeIdentifier(
-                new YangInstanceIdentifier.NodeWithValue(QName.create(
-                    TEST_QNAME, "capability"), DESC_QNAME))
-            .withValue(DESC_QNAME).build();
-
-    final LeafSetNode<Object> capabilities =
-        ImmutableLeafSetNodeBuilder
-            .create()
-            .withNodeIdentifier(
-                new YangInstanceIdentifier.NodeIdentifier(QName.create(
-                    TEST_QNAME, "capability"))).withChild(cap1).build();
-
-    ContainerNode switchFeatures =
-        ImmutableContainerNodeBuilder
-            .create()
-            .withNodeIdentifier(
-                new YangInstanceIdentifier.NodeIdentifier(SWITCH_FEATURES_QNAME))
-            .withChild(capabilities).build();
-
-    // Create a leaf list with numbers
-    final LeafSetEntryNode<Object> five =
-        ImmutableLeafSetEntryNodeBuilder
-            .create()
-            .withNodeIdentifier(
-                (new YangInstanceIdentifier.NodeWithValue(QName.create(
-                    TEST_QNAME, "number"), 5))).withValue(5).build();
-    final LeafSetEntryNode<Object> fifteen =
-        ImmutableLeafSetEntryNodeBuilder
-            .create()
-            .withNodeIdentifier(
-                (new YangInstanceIdentifier.NodeWithValue(QName.create(
-                    TEST_QNAME, "number"), 15))).withValue(15).build();
-    final LeafSetNode<Object> numbers =
-        ImmutableLeafSetNodeBuilder
-            .create()
-            .withNodeIdentifier(
-                new YangInstanceIdentifier.NodeIdentifier(QName.create(
-                    TEST_QNAME, "number"))).withChild(five).withChild(fifteen)
-            .build();
-
-
-    // Create augmentations
-    MapEntryNode mapEntry = createAugmentedListEntry(1, "First Test");
-
-    // Create a bits leaf
+  public static DataContainerNodeAttrBuilder<YangInstanceIdentifier.NodeIdentifier, ContainerNode> createBaseTestContainerBuilder() {
+      // Create a list of shoes
+      // This is to test leaf list entry
+      final LeafSetEntryNode<Object> nike =
+              ImmutableLeafSetEntryNodeBuilder
+                      .create()
+                      .withNodeIdentifier(
+                              new YangInstanceIdentifier.NodeWithValue(QName.create(
+                                      TEST_QNAME, "shoe"), "nike")).withValue("nike").build();
+
+      final LeafSetEntryNode<Object> puma =
+              ImmutableLeafSetEntryNodeBuilder
+                      .create()
+                      .withNodeIdentifier(
+                              new YangInstanceIdentifier.NodeWithValue(QName.create(
+                                      TEST_QNAME, "shoe"), "puma")).withValue("puma").build();
+
+      final LeafSetNode<Object> shoes =
+              ImmutableLeafSetNodeBuilder
+                      .create()
+                      .withNodeIdentifier(
+                              new YangInstanceIdentifier.NodeIdentifier(QName.create(
+                                      TEST_QNAME, "shoe"))).withChild(nike).withChild(puma)
+                      .build();
+
+
+      // Test a leaf-list where each entry contains an identity
+      final LeafSetEntryNode<Object> cap1 =
+              ImmutableLeafSetEntryNodeBuilder
+                      .create()
+                      .withNodeIdentifier(
+                              new YangInstanceIdentifier.NodeWithValue(QName.create(
+                                      TEST_QNAME, "capability"), DESC_QNAME))
+                      .withValue(DESC_QNAME).build();
+
+      final LeafSetNode<Object> capabilities =
+              ImmutableLeafSetNodeBuilder
+                      .create()
+                      .withNodeIdentifier(
+                              new YangInstanceIdentifier.NodeIdentifier(QName.create(
+                                      TEST_QNAME, "capability"))).withChild(cap1).build();
+
+      ContainerNode switchFeatures =
+              ImmutableContainerNodeBuilder
+                      .create()
+                      .withNodeIdentifier(
+                              new YangInstanceIdentifier.NodeIdentifier(SWITCH_FEATURES_QNAME))
+                      .withChild(capabilities).build();
+
+      // Create a leaf list with numbers
+      final LeafSetEntryNode<Object> five =
+              ImmutableLeafSetEntryNodeBuilder
+                      .create()
+                      .withNodeIdentifier(
+                              (new YangInstanceIdentifier.NodeWithValue(QName.create(
+                                      TEST_QNAME, "number"), 5))).withValue(5).build();
+      final LeafSetEntryNode<Object> fifteen =
+              ImmutableLeafSetEntryNodeBuilder
+                      .create()
+                      .withNodeIdentifier(
+                              (new YangInstanceIdentifier.NodeWithValue(QName.create(
+                                      TEST_QNAME, "number"), 15))).withValue(15).build();
+      final LeafSetNode<Object> numbers =
+              ImmutableLeafSetNodeBuilder
+                      .create()
+                      .withNodeIdentifier(
+                              new YangInstanceIdentifier.NodeIdentifier(QName.create(
+                                      TEST_QNAME, "number"))).withChild(five).withChild(fifteen)
+                      .build();
+
+
+      // Create augmentations
+      MapEntryNode mapEntry = createAugmentedListEntry(1, "First Test");
+
+      // Create a bits leaf
       NormalizedNodeAttrBuilder<YangInstanceIdentifier.NodeIdentifier, Object, LeafNode<Object>>
-          myBits = Builders.leafBuilder().withNodeIdentifier(
-          new YangInstanceIdentifier.NodeIdentifier(
-              QName.create(TEST_QNAME, "my-bits"))).withValue(
-          ImmutableSet.of("foo", "bar"));
+              myBits = Builders.leafBuilder().withNodeIdentifier(
+              new YangInstanceIdentifier.NodeIdentifier(
+                      QName.create(TEST_QNAME, "my-bits"))).withValue(
+              ImmutableSet.of("foo", "bar"));
 
 
       // Create the document
-    return ImmutableContainerNodeBuilder
-        .create()
-        .withNodeIdentifier(
-            new YangInstanceIdentifier.NodeIdentifier(TEST_QNAME))
-        .withChild(myBits.build())
-        .withChild(ImmutableNodes.leafNode(DESC_QNAME, DESC))
-        .withChild(ImmutableNodes.leafNode(POINTER_QNAME, "pointer"))
-        .withChild(
-            ImmutableNodes.leafNode(SOME_REF_QNAME, YangInstanceIdentifier
-                .builder().build()))
-        .withChild(ImmutableNodes.leafNode(MYIDENTITY_QNAME, DESC_QNAME))
-
-        // .withChild(augmentationNode)
-        .withChild(shoes)
-        .withChild(numbers)
-        .withChild(switchFeatures)
-        .withChild(
-            mapNodeBuilder(AUGMENTED_LIST_QNAME).withChild(mapEntry).build())
-        .withChild(
-            mapNodeBuilder(OUTER_LIST_QNAME)
-                .withChild(mapEntry(OUTER_LIST_QNAME, ID_QNAME, ONE_ID))
-                .withChild(BAR_NODE).build()).build();
+      return ImmutableContainerNodeBuilder
+              .create()
+              .withNodeIdentifier(
+                      new YangInstanceIdentifier.NodeIdentifier(TEST_QNAME))
+              .withChild(myBits.build())
+              .withChild(ImmutableNodes.leafNode(DESC_QNAME, DESC))
+              .withChild(ImmutableNodes.leafNode(POINTER_QNAME, "pointer"))
+              .withChild(
+                      ImmutableNodes.leafNode(SOME_REF_QNAME, YangInstanceIdentifier
+                              .builder().build()))
+              .withChild(ImmutableNodes.leafNode(MYIDENTITY_QNAME, DESC_QNAME))
+
+                      // .withChild(augmentationNode)
+              .withChild(shoes)
+              .withChild(numbers)
+              .withChild(switchFeatures)
+              .withChild(
+                      mapNodeBuilder(AUGMENTED_LIST_QNAME).withChild(mapEntry).build())
+              .withChild(
+                      mapNodeBuilder(OUTER_LIST_QNAME)
+                              .withChild(mapEntry(OUTER_LIST_QNAME, ID_QNAME, ONE_ID))
+                              .withChild(BAR_NODE).build());
+  }
 
+  public static ContainerNode createTestContainer() {
+      return createBaseTestContainerBuilder().build();
   }
 
   public static MapEntryNode createAugmentedListEntry(int id, String name) {
index 246cc682cd7ae277f36e152392209c8577c3785d..a1fbc1fdad1f897d265bbd6813d7a36962b70a3f 100644 (file)
@@ -107,6 +107,10 @@ module odl-datastore-test {
             }
         }
 
+        leaf some-binary-data {
+            type binary;
+        }
+
 
     }
 }
index b467ee4ddbf56c456c2e9f0f62381eefa173e31d..93f9e6b7de1e2085f01d36c74a37b84ff7ecb4d2 100644 (file)
@@ -9,18 +9,17 @@
 package org.opendaylight.controller.cluster.datastore;
 
 import akka.actor.ActorSelection;
-import akka.dispatch.Futures;
+import akka.dispatch.OnComplete;
+import java.util.AbstractMap.SimpleEntry;
+import java.util.List;
 import org.opendaylight.controller.cluster.datastore.messages.CloseTransactionChain;
 import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
 import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadTransaction;
 import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadWriteTransaction;
 import org.opendaylight.controller.sal.core.spi.data.DOMStoreTransactionChain;
 import org.opendaylight.controller.sal.core.spi.data.DOMStoreWriteTransaction;
-import scala.concurrent.Await;
 import scala.concurrent.Future;
-
-import java.util.Collections;
-import java.util.List;
+import scala.concurrent.Promise;
 
 /**
  * TransactionChainProxy acts as a proxy for a DOMStoreTransactionChain created on a remote shard
@@ -28,7 +27,7 @@ import java.util.List;
 public class TransactionChainProxy implements DOMStoreTransactionChain{
     private final ActorContext actorContext;
     private final String transactionChainId;
-    private volatile List<Future<ActorSelection>> cohortFutures = Collections.emptyList();
+    private volatile SimpleEntry<Object, List<Future<ActorSelection>>> previousTxReadyFutures;
 
     public TransactionChainProxy(ActorContext actorContext) {
         this.actorContext = actorContext;
@@ -37,20 +36,17 @@ public class TransactionChainProxy implements DOMStoreTransactionChain{
 
     @Override
     public DOMStoreReadTransaction newReadOnlyTransaction() {
-        return new TransactionProxy(actorContext,
-            TransactionProxy.TransactionType.READ_ONLY, this);
+        return new ChainedTransactionProxy(actorContext, TransactionProxy.TransactionType.READ_ONLY);
     }
 
     @Override
     public DOMStoreReadWriteTransaction newReadWriteTransaction() {
-        return new TransactionProxy(actorContext,
-            TransactionProxy.TransactionType.READ_WRITE, this);
+        return new ChainedTransactionProxy(actorContext, TransactionProxy.TransactionType.READ_WRITE);
     }
 
     @Override
     public DOMStoreWriteTransaction newWriteOnlyTransaction() {
-        return new TransactionProxy(actorContext,
-            TransactionProxy.TransactionType.WRITE_ONLY, this);
+        return new ChainedTransactionProxy(actorContext, TransactionProxy.TransactionType.WRITE_ONLY);
     }
 
     @Override
@@ -63,17 +59,62 @@ public class TransactionChainProxy implements DOMStoreTransactionChain{
         return transactionChainId;
     }
 
-    public void onTransactionReady(List<Future<ActorSelection>> cohortFutures){
-        this.cohortFutures = cohortFutures;
-    }
+    private class ChainedTransactionProxy extends TransactionProxy {
+
+        ChainedTransactionProxy(ActorContext actorContext, TransactionType transactionType) {
+            super(actorContext, transactionType, transactionChainId);
+        }
+
+        @Override
+        protected void onTransactionReady(List<Future<ActorSelection>> cohortFutures) {
+            if(!cohortFutures.isEmpty()) {
+                previousTxReadyFutures = new SimpleEntry<>(getIdentifier(), cohortFutures);
+            } else {
+                previousTxReadyFutures = null;
+            }
+        }
+
+        /**
+         * This method is overridden to ensure the previous Tx's ready operations complete
+         * before we create the next shard Tx in the chain to avoid creation failures if the
+         * previous Tx's ready operations haven't completed yet.
+         */
+        @Override
+        protected Future<Object> sendCreateTransaction(final ActorSelection shard,
+                final Object serializedCreateMessage) {
+            // Check if there are any previous ready Futures. Also make sure the previous ready
+            // Futures aren't for this Tx as deadlock would occur if tried to wait on our own
+            // Futures. This may happen b/c the shard Tx creates are done async so it's possible
+            // for the client to ready this Tx before we've even attempted to create a shard Tx.
+            if(previousTxReadyFutures == null ||
+                    previousTxReadyFutures.getKey().equals(getIdentifier())) {
+                return super.sendCreateTransaction(shard, serializedCreateMessage);
+            }
+
+            // Combine the ready Futures into 1.
+            Future<Iterable<ActorSelection>> combinedFutures = akka.dispatch.Futures.sequence(
+                    previousTxReadyFutures.getValue(), actorContext.getActorSystem().dispatcher());
+
+            // Add a callback for completion of the combined Futures.
+            final Promise<Object> createTxPromise = akka.dispatch.Futures.promise();
+            OnComplete<Iterable<ActorSelection>> onComplete = new OnComplete<Iterable<ActorSelection>>() {
+                @Override
+                public void onComplete(Throwable failure, Iterable<ActorSelection> notUsed) {
+                    if(failure != null) {
+                        // A Ready Future failed so fail the returned Promise.
+                        createTxPromise.failure(failure);
+                    } else {
+                        // Send the CreateTx message and use the resulting Future to complete the
+                        // returned Promise.
+                        createTxPromise.completeWith(actorContext.executeOperationAsync(shard,
+                                serializedCreateMessage));
+                    }
+                }
+            };
+
+            combinedFutures.onComplete(onComplete, actorContext.getActorSystem().dispatcher());
 
-    public void waitTillCurrentTransactionReady(){
-        try {
-            Await.result(Futures
-                .sequence(this.cohortFutures, actorContext.getActorSystem().dispatcher()),
-                actorContext.getOperationDuration());
-        } catch (Exception e) {
-            throw new IllegalStateException("Failed when waiting for transaction on a chain to become ready", e);
+            return createTxPromise.future();
         }
     }
 }
index ffb1ab7c55064ecf7683e1857800b256c9c82a1d..d93bae22e08d9fddb3f1ac7d9c12aa4a278d0ff6 100644 (file)
@@ -21,6 +21,14 @@ import com.google.common.util.concurrent.CheckedFuture;
 import com.google.common.util.concurrent.FutureCallback;
 import com.google.common.util.concurrent.Futures;
 import com.google.common.util.concurrent.SettableFuture;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
+import javax.annotation.concurrent.GuardedBy;
 import org.opendaylight.controller.cluster.datastore.exceptions.NoShardLeaderException;
 import org.opendaylight.controller.cluster.datastore.identifiers.TransactionIdentifier;
 import org.opendaylight.controller.cluster.datastore.messages.CloseTransaction;
@@ -50,15 +58,6 @@ import scala.concurrent.Future;
 import scala.concurrent.Promise;
 import scala.concurrent.duration.FiniteDuration;
 
-import javax.annotation.concurrent.GuardedBy;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicLong;
-
 /**
  * TransactionProxy acts as a proxy for one or more transactions that were created on a remote shard
  * <p>
@@ -182,23 +181,23 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction {
     private final TransactionType transactionType;
     private final ActorContext actorContext;
     private final TransactionIdentifier identifier;
-    private final TransactionChainProxy transactionChainProxy;
+    private final String transactionChainId;
     private final SchemaContext schemaContext;
     private boolean inReadyState;
 
     public TransactionProxy(ActorContext actorContext, TransactionType transactionType) {
-        this(actorContext, transactionType, null);
+        this(actorContext, transactionType, "");
     }
 
     public TransactionProxy(ActorContext actorContext, TransactionType transactionType,
-            TransactionChainProxy transactionChainProxy) {
+            String transactionChainId) {
         this.actorContext = Preconditions.checkNotNull(actorContext,
             "actorContext should not be null");
         this.transactionType = Preconditions.checkNotNull(transactionType,
             "transactionType should not be null");
         this.schemaContext = Preconditions.checkNotNull(actorContext.getSchemaContext(),
             "schemaContext should not be null");
-        this.transactionChainProxy = transactionChainProxy;
+        this.transactionChainId = transactionChainId;
 
         String memberName = actorContext.getCurrentMemberName();
         if(memberName == null){
@@ -237,6 +236,18 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction {
         return recordedOperationFutures;
     }
 
+    @VisibleForTesting
+    boolean hasTransactionContext() {
+        for(TransactionFutureCallback txFutureCallback : txFutureCallbackMap.values()) {
+            TransactionContext transactionContext = txFutureCallback.getTransactionContext();
+            if(transactionContext != null) {
+                return true;
+            }
+        }
+
+        return false;
+    }
+
     @Override
     public CheckedFuture<Optional<NormalizedNode<?, ?>>, ReadFailedException> read(
             final YangInstanceIdentifier path) {
@@ -433,14 +444,32 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction {
             }
         }
 
-        if(transactionChainProxy != null){
-            transactionChainProxy.onTransactionReady(cohortFutures);
-        }
+        onTransactionReady(cohortFutures);
 
         return new ThreePhaseCommitCohortProxy(actorContext, cohortFutures,
                 identifier.toString());
     }
 
+    /**
+     * Method for derived classes to be notified when the transaction has been readied.
+     *
+     * @param cohortFutures the cohort Futures for each shard transaction.
+     */
+    protected void onTransactionReady(List<Future<ActorSelection>> cohortFutures) {
+    }
+
+    /**
+     * Method called to send a CreateTransaction message to a shard.
+     *
+     * @param shard the shard actor to send to
+     * @param serializedCreateMessage the serialized message to send
+     * @return the response Future
+     */
+    protected Future<Object> sendCreateTransaction(ActorSelection shard,
+            Object serializedCreateMessage) {
+        return actorContext.executeOperationAsync(shard, serializedCreateMessage);
+    }
+
     @Override
     public Object getIdentifier() {
         return this.identifier;
@@ -502,10 +531,7 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction {
     }
 
     public String getTransactionChainId() {
-        if(transactionChainProxy == null){
-            return "";
-        }
-        return transactionChainProxy.getTransactionChainId();
+        return transactionChainId;
     }
 
     /**
@@ -591,7 +617,7 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction {
          * Performs a CreateTransaction try async.
          */
         private void tryCreateTransaction() {
-            Future<Object> createTxFuture = actorContext.executeOperationAsync(primaryShard,
+            Future<Object> createTxFuture = sendCreateTransaction(primaryShard,
                     new CreateTransaction(identifier.toString(),
                             TransactionProxy.this.transactionType.ordinal(),
                             getTransactionChainId()).toSerializable());
@@ -626,29 +652,42 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction {
             // respect to #addTxOperationOnComplete to handle timing issues and ensure no
             // TransactionOperation is missed and that they are processed in the order they occurred.
             synchronized(txOperationsOnComplete) {
+                // Store the new TransactionContext locally until we've completed invoking the
+                // TransactionOperations. This avoids thread timing issues which could cause
+                // out-of-order TransactionOperations. Eg, on a modification operation, if the
+                // TransactionContext is non-null, then we directly call the TransactionContext.
+                // However, at the same time, the code may be executing the cached
+                // TransactionOperations. So to avoid thus timing, we don't publish the
+                // TransactionContext until after we've executed all cached TransactionOperations.
+                TransactionContext localTransactionContext;
                 if(failure != null) {
                     LOG.debug("Tx {} Creating NoOpTransaction because of error: {}", identifier,
                             failure.getMessage());
 
-                    transactionContext = new NoOpTransactionContext(failure, identifier);
+                    localTransactionContext = new NoOpTransactionContext(failure, identifier);
                 } else if (response.getClass().equals(CreateTransactionReply.SERIALIZABLE_CLASS)) {
-                    createValidTransactionContext(CreateTransactionReply.fromSerializable(response));
+                    localTransactionContext = createValidTransactionContext(
+                            CreateTransactionReply.fromSerializable(response));
                 } else {
                     IllegalArgumentException exception = new IllegalArgumentException(String.format(
                         "Invalid reply type %s for CreateTransaction", response.getClass()));
 
-                    transactionContext = new NoOpTransactionContext(exception, identifier);
+                    localTransactionContext = new NoOpTransactionContext(exception, identifier);
                 }
 
                 for(TransactionOperation oper: txOperationsOnComplete) {
-                    oper.invoke(transactionContext);
+                    oper.invoke(localTransactionContext);
                 }
 
                 txOperationsOnComplete.clear();
+
+                // We're done invoking the TransactionOperations so we can now publish the
+                // TransactionContext.
+                transactionContext = localTransactionContext;
             }
         }
 
-        private void createValidTransactionContext(CreateTransactionReply reply) {
+        private TransactionContext createValidTransactionContext(CreateTransactionReply reply) {
             String transactionPath = reply.getTransactionPath();
 
             LOG.debug("Tx {} Received transaction actor path {}", identifier, transactionPath);
@@ -669,7 +708,7 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction {
             // Check if TxActor is created in the same node
             boolean isTxActorLocal = actorContext.isLocalPath(transactionPath);
 
-            transactionContext = new TransactionContextImpl(transactionPath, transactionActor, identifier,
+            return new TransactionContextImpl(transactionPath, transactionActor, identifier,
                 actorContext, schemaContext, isTxActorLocal, reply.getVersion());
         }
     }
index cec7ce1e3fc250a84231f84e83d97996bbe2e55a..12c566d33de786db62bfad3ec86a56939edbe1c2 100644 (file)
@@ -1,11 +1,17 @@
 package org.opendaylight.controller.cluster.datastore;
 
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
 import akka.actor.ActorRef;
 import akka.actor.ActorSystem;
 import akka.actor.PoisonPill;
 import com.google.common.base.Optional;
 import com.google.common.util.concurrent.CheckedFuture;
 import com.google.common.util.concurrent.Uninterruptibles;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
 import org.junit.Test;
 import org.opendaylight.controller.cluster.datastore.exceptions.NoShardLeaderException;
 import org.opendaylight.controller.cluster.datastore.exceptions.NotInitializedException;
@@ -26,16 +32,10 @@ import org.opendaylight.controller.sal.core.spi.data.DOMStoreTransactionChain;
 import org.opendaylight.controller.sal.core.spi.data.DOMStoreWriteTransaction;
 import org.opendaylight.yangtools.concepts.ListenerRegistration;
 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
+import org.opendaylight.yangtools.yang.data.api.schema.MapNode;
 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
 import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicReference;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotNull;
 
 public class DistributedDataStoreIntegrationTest extends AbstractActorTest {
 
@@ -77,10 +77,7 @@ public class DistributedDataStoreIntegrationTest extends AbstractActorTest {
 
             DOMStoreThreePhaseCommitCohort cohort = writeTx.ready();
 
-            Boolean canCommit = cohort.canCommit().get(5, TimeUnit.SECONDS);
-            assertEquals("canCommit", true, canCommit);
-            cohort.preCommit().get(5, TimeUnit.SECONDS);
-            cohort.commit().get(5, TimeUnit.SECONDS);
+            doCommit(cohort);
 
             // Verify the data in the store
 
@@ -131,10 +128,7 @@ public class DistributedDataStoreIntegrationTest extends AbstractActorTest {
 
             // 5. Commit the Tx
 
-            Boolean canCommit = cohort.canCommit().get(5, TimeUnit.SECONDS);
-            assertEquals("canCommit", true, canCommit);
-            cohort.preCommit().get(5, TimeUnit.SECONDS);
-            cohort.commit().get(5, TimeUnit.SECONDS);
+            doCommit(cohort);
 
             // 6. Verify the data in the store
 
@@ -219,9 +213,7 @@ public class DistributedDataStoreIntegrationTest extends AbstractActorTest {
 
             // Wait for the Tx commit to complete.
 
-            assertEquals("canCommit", true, txCohort.get().canCommit().get(5, TimeUnit.SECONDS));
-            txCohort.get().preCommit().get(5, TimeUnit.SECONDS);
-            txCohort.get().commit().get(5, TimeUnit.SECONDS);
+            doCommit(txCohort.get());
 
             // Verify the data in the store
 
@@ -552,10 +544,8 @@ public class DistributedDataStoreIntegrationTest extends AbstractActorTest {
 
     @Test
     public void testTransactionChain() throws Exception{
-        System.setProperty("shard.persistent", "true");
         new IntegrationTestKit(getSystem()) {{
-            DistributedDataStore dataStore =
-                    setupDistributedDataStore("transactionChainIntegrationTest", "test-1");
+            DistributedDataStore dataStore = setupDistributedDataStore("testTransactionChain", "test-1");
 
             // 1. Create a Tx chain and write-only Tx
 
@@ -566,30 +556,76 @@ public class DistributedDataStoreIntegrationTest extends AbstractActorTest {
 
             // 2. Write some data
 
-            NormalizedNode<?, ?> containerNode = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
-            writeTx.write(TestModel.TEST_PATH, containerNode);
+            NormalizedNode<?, ?> testNode = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
+            writeTx.write(TestModel.TEST_PATH, testNode);
 
             // 3. Ready the Tx for commit
 
-            DOMStoreThreePhaseCommitCohort cohort = writeTx.ready();
+            final DOMStoreThreePhaseCommitCohort cohort1 = writeTx.ready();
 
-            // 4. Commit the Tx
+            // 4. Commit the Tx on another thread that first waits for the second read Tx.
 
-            Boolean canCommit = cohort.canCommit().get(5, TimeUnit.SECONDS);
-            assertEquals("canCommit", true, canCommit);
-            cohort.preCommit().get(5, TimeUnit.SECONDS);
-            cohort.commit().get(5, TimeUnit.SECONDS);
+            final CountDownLatch continueCommit1 = new CountDownLatch(1);
+            final CountDownLatch commit1Done = new CountDownLatch(1);
+            final AtomicReference<Exception> commit1Error = new AtomicReference<>();
+            new Thread() {
+                @Override
+                public void run() {
+                    try {
+                        continueCommit1.await();
+                        doCommit(cohort1);
+                    } catch (Exception e) {
+                        commit1Error.set(e);
+                    } finally {
+                        commit1Done.countDown();
+                    }
+                }
+            }.start();
 
-            // 5. Verify the data in the store
+            // 5. Create a new read Tx from the chain to read and verify the data from the first
+            // Tx is visible after being readied.
 
             DOMStoreReadTransaction readTx = txChain.newReadOnlyTransaction();
-
             Optional<NormalizedNode<?, ?>> optional = readTx.read(TestModel.TEST_PATH).get(5, TimeUnit.SECONDS);
             assertEquals("isPresent", true, optional.isPresent());
-            assertEquals("Data node", containerNode, optional.get());
+            assertEquals("Data node", testNode, optional.get());
+
+            // 6. Create a new RW Tx from the chain, write more data, and ready it
+
+            DOMStoreReadWriteTransaction rwTx = txChain.newReadWriteTransaction();
+            MapNode outerNode = ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build();
+            rwTx.write(TestModel.OUTER_LIST_PATH, outerNode);
+
+            DOMStoreThreePhaseCommitCohort cohort2 = rwTx.ready();
+
+            // 7. Create a new read Tx from the chain to read the data from the last RW Tx to
+            // verify it is visible.
+
+            readTx = txChain.newReadOnlyTransaction();
+            optional = readTx.read(TestModel.OUTER_LIST_PATH).get(5, TimeUnit.SECONDS);
+            assertEquals("isPresent", true, optional.isPresent());
+            assertEquals("Data node", outerNode, optional.get());
+
+            // 8. Wait for the 2 commits to complete and close the chain.
+
+            continueCommit1.countDown();
+            Uninterruptibles.awaitUninterruptibly(commit1Done, 5, TimeUnit.SECONDS);
+
+            if(commit1Error.get() != null) {
+                throw commit1Error.get();
+            }
+
+            doCommit(cohort2);
 
             txChain.close();
 
+            // 9. Create a new read Tx from the data store and verify committed data.
+
+            readTx = dataStore.newReadOnlyTransaction();
+            optional = readTx.read(TestModel.OUTER_LIST_PATH).get(5, TimeUnit.SECONDS);
+            assertEquals("isPresent", true, optional.isPresent());
+            assertEquals("Data node", outerNode, optional.get());
+
             cleanup(dataStore);
         }};
     }
@@ -600,7 +636,10 @@ public class DistributedDataStoreIntegrationTest extends AbstractActorTest {
             DistributedDataStore dataStore =
                     setupDistributedDataStore("testChangeListenerRegistration", "test-1");
 
-            MockDataChangeListener listener = new MockDataChangeListener(3);
+            testWriteTransaction(dataStore, TestModel.TEST_PATH,
+                    ImmutableNodes.containerNode(TestModel.TEST_QNAME));
+
+            MockDataChangeListener listener = new MockDataChangeListener(1);
 
             ListenerRegistration<MockDataChangeListener>
                     listenerReg = dataStore.registerChangeListener(TestModel.TEST_PATH, listener,
@@ -608,8 +647,13 @@ public class DistributedDataStoreIntegrationTest extends AbstractActorTest {
 
             assertNotNull("registerChangeListener returned null", listenerReg);
 
-            testWriteTransaction(dataStore, TestModel.TEST_PATH,
-                    ImmutableNodes.containerNode(TestModel.TEST_QNAME));
+            // Wait for the initial notification
+
+            listener.waitForChangeEvents(TestModel.TEST_PATH);
+
+            listener.reset(2);
+
+            // Write 2 updates.
 
             testWriteTransaction(dataStore, TestModel.OUTER_LIST_PATH,
                     ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build());
@@ -619,7 +663,9 @@ public class DistributedDataStoreIntegrationTest extends AbstractActorTest {
             testWriteTransaction(dataStore, listPath,
                     ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1));
 
-            listener.waitForChangeEvents(TestModel.TEST_PATH, TestModel.OUTER_LIST_PATH, listPath );
+            // Wait for the 2 updates.
+
+            listener.waitForChangeEvents(TestModel.OUTER_LIST_PATH, listPath);
 
             listenerReg.close();
 
@@ -694,10 +740,7 @@ public class DistributedDataStoreIntegrationTest extends AbstractActorTest {
 
             // 4. Commit the Tx
 
-            Boolean canCommit = cohort.canCommit().get(5, TimeUnit.SECONDS);
-            assertEquals("canCommit", true, canCommit);
-            cohort.preCommit().get(5, TimeUnit.SECONDS);
-            cohort.commit().get(5, TimeUnit.SECONDS);
+            doCommit(cohort);
 
             // 5. Verify the data in the store
 
@@ -708,6 +751,13 @@ public class DistributedDataStoreIntegrationTest extends AbstractActorTest {
             assertEquals("Data node", nodeToWrite, optional.get());
         }
 
+        void doCommit(final DOMStoreThreePhaseCommitCohort cohort) throws Exception {
+            Boolean canCommit = cohort.canCommit().get(5, TimeUnit.SECONDS);
+            assertEquals("canCommit", true, canCommit);
+            cohort.preCommit().get(5, TimeUnit.SECONDS);
+            cohort.commit().get(5, TimeUnit.SECONDS);
+        }
+
         void cleanup(DistributedDataStore dataStore) {
             dataStore.getActorContext().getShardManager().tell(PoisonPill.getInstance(), null);
         }
index f2f49d1bf3e21f048b393d5f5c14a6e9595e05d6..5bbdcae93c2241cd8b9e7a87613735b69841973d 100644 (file)
@@ -9,6 +9,10 @@ package org.opendaylight.controller.cluster.datastore.utils;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+import com.google.common.collect.Lists;
+import com.google.common.util.concurrent.Uninterruptibles;
+import java.util.Collections;
 import java.util.List;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
@@ -16,8 +20,6 @@ import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeEvent;
 import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeListener;
 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
-import com.google.common.collect.Lists;
-import com.google.common.util.concurrent.Uninterruptibles;
 
 /**
  * A mock DataChangeListener implementation.
@@ -27,14 +29,21 @@ import com.google.common.util.concurrent.Uninterruptibles;
 public class MockDataChangeListener implements
                          AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>> {
 
-    private final List<AsyncDataChangeEvent<YangInstanceIdentifier, NormalizedNode<?, ?>>>
-                                                               changeList = Lists.newArrayList();
-    private final CountDownLatch changeLatch;
-    private final int expChangeEventCount;
+    private final List<AsyncDataChangeEvent<YangInstanceIdentifier, NormalizedNode<?, ?>>> changeList =
+            Collections.synchronizedList(Lists.<AsyncDataChangeEvent<YangInstanceIdentifier,
+                                                NormalizedNode<?, ?>>>newArrayList());
+
+    private volatile CountDownLatch changeLatch;
+    private int expChangeEventCount;
 
     public MockDataChangeListener(int expChangeEventCount) {
+        reset(expChangeEventCount);
+    }
+
+    public void reset(int expChangeEventCount) {
         changeLatch = new CountDownLatch(expChangeEventCount);
         this.expChangeEventCount = expChangeEventCount;
+        changeList.clear();
     }
 
     @Override
@@ -44,8 +53,13 @@ public class MockDataChangeListener implements
     }
 
     public void waitForChangeEvents(YangInstanceIdentifier... expPaths) {
-        assertEquals("Change notifications complete", true,
-                Uninterruptibles.awaitUninterruptibly(changeLatch, 5, TimeUnit.SECONDS));
+        boolean done = Uninterruptibles.awaitUninterruptibly(changeLatch, 5, TimeUnit.SECONDS);
+        if(!done) {
+            fail(String.format("Missing change notifications. Expected: %d. Actual: %d",
+                    expChangeEventCount, (expChangeEventCount - changeLatch.getCount())));
+        }
+
+        assertEquals("Change notifications complete", true, done);
 
         for(int i = 0; i < expPaths.length; i++) {
             assertTrue(String.format("Change %d does not contain %s", (i+1), expPaths[i]),