Bug 2265: Use new NormalizedNode streaming in messages 48/12448/19
authortpantelis <tpanteli@brocade.com>
Mon, 19 Jan 2015 03:21:46 +0000 (22:21 -0500)
committertpantelis <tpanteli@brocade.com>
Wed, 21 Jan 2015 04:50:48 +0000 (23:50 -0500)
    Utilized the new NormalizedNode streaming classes for the WriteData,
    MergeData, ReadDataReply and DataChanged messages.

    One solution was to add a bytes field to the protobuff messages and
    make the previous fields optional. For backwards compatibility, in
    the wrapper message's fromSerializable method, check whether or not
    the protobuff message has the bytes field and act accordingly.

    While this works, it results in an undesirable inefficiency.
    Protobuff translates the bytes field to a ByteString type. So when
    streaming, we need to create a ByteArrayOutputStream and pass that to
    the NormalizedNode stream writer. Then call
    ByteString.copyFrom(bos.toByteArray) to get the resulting ByteString.
    However this results in 2 copies of the serialized byte[]. The
    byte[] cannot be passed to ByteString as is as it always copies it to
    maintain immutability. I looked into subclassing ByteString and
    lazily streaming the data on demand but the ByteString ctor is
    package scoped so they don’t allow subclassing.

    So I went with an approach to make each message Externalizable
    instead of using protobuff. The classes writes
    the version number to enable us to handle compatibility in the future.
    So in this manner, we can get the efficient direct streaming we
    want and easily handle backwards and forwards compatibility.

    I added a VersionedSerializableMessage interface whose
    toSerializable method takes a version number. The version # is passed
    from the remote version # obtained from the CreateTransactionReply.
    This allows the message classes to handle backwards compatibility. So
    if the version is Helium-1 or less, send the protobuff message
    otherwise send the Externalizable instance.

    In the fromSerializable method, it checks if the passed Object is an
    instance of the Externalizable class or the protbuff message type.

Change-Id: I5ebb968e70ac8ff92c29183c52e6c3fe5362ae34
Signed-off-by: tpantelis <tpanteli@brocade.com>
48 files changed:
opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/protobuff/messages/transaction/ShardTransactionMessages.java
opendaylight/md-sal/sal-clustering-commons/src/main/resources/ShardTransaction.proto
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DataChangeListener.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DataChangeListenerProxy.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DataStoreVersions.java [new file with mode: 0644]
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardCommitCoordinator.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardReadTransaction.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardReadWriteTransaction.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardTransaction.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardTransactionChain.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardWriteTransaction.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionProxy.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/AbortTransactionReply.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/CanCommitTransactionReply.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/CloseTransaction.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/CloseTransactionChainReply.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/CloseTransactionReply.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/CommitTransactionReply.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/CreateTransaction.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/CreateTransactionReply.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/DataChanged.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/DataChangedReply.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/DeleteDataReply.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/EmptyExternalizable.java [new file with mode: 0644]
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/EmptyReply.java [new file with mode: 0644]
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/MergeData.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/MergeDataReply.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/ModifyData.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/ReadDataReply.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/ReadyTransaction.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/VersionedSerializableMessage.java [new file with mode: 0644]
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/WriteData.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/WriteDataReply.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/utils/SerializationUtils.java [new file with mode: 0644]
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DataChangeListenerProxyTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DataChangeListenerTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTransactionFailureTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTransactionHeliumBackwardsCompatibilityTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTransactionTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ThreePhaseCommitCohortProxyTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/TransactionProxyTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/messages/DataChangedTest.java [new file with mode: 0644]
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/messages/MergeDataTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/messages/ReadDataReplyTest.java [new file with mode: 0644]
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/messages/WriteDataTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/md/cluster/datastore/model/TestModel.java

index 3a1cfaa..8be6ad1 100644 (file)
@@ -4823,14 +4823,26 @@ public final class ShardTransactionMessages {
     // required .org.opendaylight.controller.mdsal.InstanceIdentifier instanceIdentifierPathArguments = 1;
     /**
      * <code>required .org.opendaylight.controller.mdsal.InstanceIdentifier instanceIdentifierPathArguments = 1;</code>
+     *
+     * <pre>
+     * base Helium version
+     * </pre>
      */
     boolean hasInstanceIdentifierPathArguments();
     /**
      * <code>required .org.opendaylight.controller.mdsal.InstanceIdentifier instanceIdentifierPathArguments = 1;</code>
+     *
+     * <pre>
+     * base Helium version
+     * </pre>
      */
     org.opendaylight.controller.protobuff.messages.common.NormalizedNodeMessages.InstanceIdentifier getInstanceIdentifierPathArguments();
     /**
      * <code>required .org.opendaylight.controller.mdsal.InstanceIdentifier instanceIdentifierPathArguments = 1;</code>
+     *
+     * <pre>
+     * base Helium version
+     * </pre>
      */
     org.opendaylight.controller.protobuff.messages.common.NormalizedNodeMessages.InstanceIdentifierOrBuilder getInstanceIdentifierPathArgumentsOrBuilder();
 
@@ -4970,18 +4982,30 @@ public final class ShardTransactionMessages {
     private org.opendaylight.controller.protobuff.messages.common.NormalizedNodeMessages.InstanceIdentifier instanceIdentifierPathArguments_;
     /**
      * <code>required .org.opendaylight.controller.mdsal.InstanceIdentifier instanceIdentifierPathArguments = 1;</code>
+     *
+     * <pre>
+     * base Helium version
+     * </pre>
      */
     public boolean hasInstanceIdentifierPathArguments() {
       return ((bitField0_ & 0x00000001) == 0x00000001);
     }
     /**
      * <code>required .org.opendaylight.controller.mdsal.InstanceIdentifier instanceIdentifierPathArguments = 1;</code>
+     *
+     * <pre>
+     * base Helium version
+     * </pre>
      */
     public org.opendaylight.controller.protobuff.messages.common.NormalizedNodeMessages.InstanceIdentifier getInstanceIdentifierPathArguments() {
       return instanceIdentifierPathArguments_;
     }
     /**
      * <code>required .org.opendaylight.controller.mdsal.InstanceIdentifier instanceIdentifierPathArguments = 1;</code>
+     *
+     * <pre>
+     * base Helium version
+     * </pre>
      */
     public org.opendaylight.controller.protobuff.messages.common.NormalizedNodeMessages.InstanceIdentifierOrBuilder getInstanceIdentifierPathArgumentsOrBuilder() {
       return instanceIdentifierPathArguments_;
@@ -5309,12 +5333,20 @@ public final class ShardTransactionMessages {
           org.opendaylight.controller.protobuff.messages.common.NormalizedNodeMessages.InstanceIdentifier, org.opendaylight.controller.protobuff.messages.common.NormalizedNodeMessages.InstanceIdentifier.Builder, org.opendaylight.controller.protobuff.messages.common.NormalizedNodeMessages.InstanceIdentifierOrBuilder> instanceIdentifierPathArgumentsBuilder_;
       /**
        * <code>required .org.opendaylight.controller.mdsal.InstanceIdentifier instanceIdentifierPathArguments = 1;</code>
+       *
+       * <pre>
+       * base Helium version
+       * </pre>
        */
       public boolean hasInstanceIdentifierPathArguments() {
         return ((bitField0_ & 0x00000001) == 0x00000001);
       }
       /**
        * <code>required .org.opendaylight.controller.mdsal.InstanceIdentifier instanceIdentifierPathArguments = 1;</code>
+       *
+       * <pre>
+       * base Helium version
+       * </pre>
        */
       public org.opendaylight.controller.protobuff.messages.common.NormalizedNodeMessages.InstanceIdentifier getInstanceIdentifierPathArguments() {
         if (instanceIdentifierPathArgumentsBuilder_ == null) {
@@ -5325,6 +5357,10 @@ public final class ShardTransactionMessages {
       }
       /**
        * <code>required .org.opendaylight.controller.mdsal.InstanceIdentifier instanceIdentifierPathArguments = 1;</code>
+       *
+       * <pre>
+       * base Helium version
+       * </pre>
        */
       public Builder setInstanceIdentifierPathArguments(org.opendaylight.controller.protobuff.messages.common.NormalizedNodeMessages.InstanceIdentifier value) {
         if (instanceIdentifierPathArgumentsBuilder_ == null) {
@@ -5341,6 +5377,10 @@ public final class ShardTransactionMessages {
       }
       /**
        * <code>required .org.opendaylight.controller.mdsal.InstanceIdentifier instanceIdentifierPathArguments = 1;</code>
+       *
+       * <pre>
+       * base Helium version
+       * </pre>
        */
       public Builder setInstanceIdentifierPathArguments(
           org.opendaylight.controller.protobuff.messages.common.NormalizedNodeMessages.InstanceIdentifier.Builder builderForValue) {
@@ -5355,6 +5395,10 @@ public final class ShardTransactionMessages {
       }
       /**
        * <code>required .org.opendaylight.controller.mdsal.InstanceIdentifier instanceIdentifierPathArguments = 1;</code>
+       *
+       * <pre>
+       * base Helium version
+       * </pre>
        */
       public Builder mergeInstanceIdentifierPathArguments(org.opendaylight.controller.protobuff.messages.common.NormalizedNodeMessages.InstanceIdentifier value) {
         if (instanceIdentifierPathArgumentsBuilder_ == null) {
@@ -5374,6 +5418,10 @@ public final class ShardTransactionMessages {
       }
       /**
        * <code>required .org.opendaylight.controller.mdsal.InstanceIdentifier instanceIdentifierPathArguments = 1;</code>
+       *
+       * <pre>
+       * base Helium version
+       * </pre>
        */
       public Builder clearInstanceIdentifierPathArguments() {
         if (instanceIdentifierPathArgumentsBuilder_ == null) {
@@ -5387,6 +5435,10 @@ public final class ShardTransactionMessages {
       }
       /**
        * <code>required .org.opendaylight.controller.mdsal.InstanceIdentifier instanceIdentifierPathArguments = 1;</code>
+       *
+       * <pre>
+       * base Helium version
+       * </pre>
        */
       public org.opendaylight.controller.protobuff.messages.common.NormalizedNodeMessages.InstanceIdentifier.Builder getInstanceIdentifierPathArgumentsBuilder() {
         bitField0_ |= 0x00000001;
@@ -5395,6 +5447,10 @@ public final class ShardTransactionMessages {
       }
       /**
        * <code>required .org.opendaylight.controller.mdsal.InstanceIdentifier instanceIdentifierPathArguments = 1;</code>
+       *
+       * <pre>
+       * base Helium version
+       * </pre>
        */
       public org.opendaylight.controller.protobuff.messages.common.NormalizedNodeMessages.InstanceIdentifierOrBuilder getInstanceIdentifierPathArgumentsOrBuilder() {
         if (instanceIdentifierPathArgumentsBuilder_ != null) {
@@ -5405,6 +5461,10 @@ public final class ShardTransactionMessages {
       }
       /**
        * <code>required .org.opendaylight.controller.mdsal.InstanceIdentifier instanceIdentifierPathArguments = 1;</code>
+       *
+       * <pre>
+       * base Helium version
+       * </pre>
        */
       private com.google.protobuf.SingleFieldBuilder<
           org.opendaylight.controller.protobuff.messages.common.NormalizedNodeMessages.InstanceIdentifier, org.opendaylight.controller.protobuff.messages.common.NormalizedNodeMessages.InstanceIdentifier.Builder, org.opendaylight.controller.protobuff.messages.common.NormalizedNodeMessages.InstanceIdentifierOrBuilder>
@@ -5863,14 +5923,26 @@ public final class ShardTransactionMessages {
     // required .org.opendaylight.controller.mdsal.InstanceIdentifier instanceIdentifierPathArguments = 1;
     /**
      * <code>required .org.opendaylight.controller.mdsal.InstanceIdentifier instanceIdentifierPathArguments = 1;</code>
+     *
+     * <pre>
+     * base Helium version
+     * </pre>
      */
     boolean hasInstanceIdentifierPathArguments();
     /**
      * <code>required .org.opendaylight.controller.mdsal.InstanceIdentifier instanceIdentifierPathArguments = 1;</code>
+     *
+     * <pre>
+     * base Helium version
+     * </pre>
      */
     org.opendaylight.controller.protobuff.messages.common.NormalizedNodeMessages.InstanceIdentifier getInstanceIdentifierPathArguments();
     /**
      * <code>required .org.opendaylight.controller.mdsal.InstanceIdentifier instanceIdentifierPathArguments = 1;</code>
+     *
+     * <pre>
+     * base Helium version
+     * </pre>
      */
     org.opendaylight.controller.protobuff.messages.common.NormalizedNodeMessages.InstanceIdentifierOrBuilder getInstanceIdentifierPathArgumentsOrBuilder();
 
@@ -6010,18 +6082,30 @@ public final class ShardTransactionMessages {
     private org.opendaylight.controller.protobuff.messages.common.NormalizedNodeMessages.InstanceIdentifier instanceIdentifierPathArguments_;
     /**
      * <code>required .org.opendaylight.controller.mdsal.InstanceIdentifier instanceIdentifierPathArguments = 1;</code>
+     *
+     * <pre>
+     * base Helium version
+     * </pre>
      */
     public boolean hasInstanceIdentifierPathArguments() {
       return ((bitField0_ & 0x00000001) == 0x00000001);
     }
     /**
      * <code>required .org.opendaylight.controller.mdsal.InstanceIdentifier instanceIdentifierPathArguments = 1;</code>
+     *
+     * <pre>
+     * base Helium version
+     * </pre>
      */
     public org.opendaylight.controller.protobuff.messages.common.NormalizedNodeMessages.InstanceIdentifier getInstanceIdentifierPathArguments() {
       return instanceIdentifierPathArguments_;
     }
     /**
      * <code>required .org.opendaylight.controller.mdsal.InstanceIdentifier instanceIdentifierPathArguments = 1;</code>
+     *
+     * <pre>
+     * base Helium version
+     * </pre>
      */
     public org.opendaylight.controller.protobuff.messages.common.NormalizedNodeMessages.InstanceIdentifierOrBuilder getInstanceIdentifierPathArgumentsOrBuilder() {
       return instanceIdentifierPathArguments_;
@@ -6349,12 +6433,20 @@ public final class ShardTransactionMessages {
           org.opendaylight.controller.protobuff.messages.common.NormalizedNodeMessages.InstanceIdentifier, org.opendaylight.controller.protobuff.messages.common.NormalizedNodeMessages.InstanceIdentifier.Builder, org.opendaylight.controller.protobuff.messages.common.NormalizedNodeMessages.InstanceIdentifierOrBuilder> instanceIdentifierPathArgumentsBuilder_;
       /**
        * <code>required .org.opendaylight.controller.mdsal.InstanceIdentifier instanceIdentifierPathArguments = 1;</code>
+       *
+       * <pre>
+       * base Helium version
+       * </pre>
        */
       public boolean hasInstanceIdentifierPathArguments() {
         return ((bitField0_ & 0x00000001) == 0x00000001);
       }
       /**
        * <code>required .org.opendaylight.controller.mdsal.InstanceIdentifier instanceIdentifierPathArguments = 1;</code>
+       *
+       * <pre>
+       * base Helium version
+       * </pre>
        */
       public org.opendaylight.controller.protobuff.messages.common.NormalizedNodeMessages.InstanceIdentifier getInstanceIdentifierPathArguments() {
         if (instanceIdentifierPathArgumentsBuilder_ == null) {
@@ -6365,6 +6457,10 @@ public final class ShardTransactionMessages {
       }
       /**
        * <code>required .org.opendaylight.controller.mdsal.InstanceIdentifier instanceIdentifierPathArguments = 1;</code>
+       *
+       * <pre>
+       * base Helium version
+       * </pre>
        */
       public Builder setInstanceIdentifierPathArguments(org.opendaylight.controller.protobuff.messages.common.NormalizedNodeMessages.InstanceIdentifier value) {
         if (instanceIdentifierPathArgumentsBuilder_ == null) {
@@ -6381,6 +6477,10 @@ public final class ShardTransactionMessages {
       }
       /**
        * <code>required .org.opendaylight.controller.mdsal.InstanceIdentifier instanceIdentifierPathArguments = 1;</code>
+       *
+       * <pre>
+       * base Helium version
+       * </pre>
        */
       public Builder setInstanceIdentifierPathArguments(
           org.opendaylight.controller.protobuff.messages.common.NormalizedNodeMessages.InstanceIdentifier.Builder builderForValue) {
@@ -6395,6 +6495,10 @@ public final class ShardTransactionMessages {
       }
       /**
        * <code>required .org.opendaylight.controller.mdsal.InstanceIdentifier instanceIdentifierPathArguments = 1;</code>
+       *
+       * <pre>
+       * base Helium version
+       * </pre>
        */
       public Builder mergeInstanceIdentifierPathArguments(org.opendaylight.controller.protobuff.messages.common.NormalizedNodeMessages.InstanceIdentifier value) {
         if (instanceIdentifierPathArgumentsBuilder_ == null) {
@@ -6414,6 +6518,10 @@ public final class ShardTransactionMessages {
       }
       /**
        * <code>required .org.opendaylight.controller.mdsal.InstanceIdentifier instanceIdentifierPathArguments = 1;</code>
+       *
+       * <pre>
+       * base Helium version
+       * </pre>
        */
       public Builder clearInstanceIdentifierPathArguments() {
         if (instanceIdentifierPathArgumentsBuilder_ == null) {
@@ -6427,6 +6535,10 @@ public final class ShardTransactionMessages {
       }
       /**
        * <code>required .org.opendaylight.controller.mdsal.InstanceIdentifier instanceIdentifierPathArguments = 1;</code>
+       *
+       * <pre>
+       * base Helium version
+       * </pre>
        */
       public org.opendaylight.controller.protobuff.messages.common.NormalizedNodeMessages.InstanceIdentifier.Builder getInstanceIdentifierPathArgumentsBuilder() {
         bitField0_ |= 0x00000001;
@@ -6435,6 +6547,10 @@ public final class ShardTransactionMessages {
       }
       /**
        * <code>required .org.opendaylight.controller.mdsal.InstanceIdentifier instanceIdentifierPathArguments = 1;</code>
+       *
+       * <pre>
+       * base Helium version
+       * </pre>
        */
       public org.opendaylight.controller.protobuff.messages.common.NormalizedNodeMessages.InstanceIdentifierOrBuilder getInstanceIdentifierPathArgumentsOrBuilder() {
         if (instanceIdentifierPathArgumentsBuilder_ != null) {
@@ -6445,6 +6561,10 @@ public final class ShardTransactionMessages {
       }
       /**
        * <code>required .org.opendaylight.controller.mdsal.InstanceIdentifier instanceIdentifierPathArguments = 1;</code>
+       *
+       * <pre>
+       * base Helium version
+       * </pre>
        */
       private com.google.protobuf.SingleFieldBuilder<
           org.opendaylight.controller.protobuff.messages.common.NormalizedNodeMessages.InstanceIdentifier, org.opendaylight.controller.protobuff.messages.common.NormalizedNodeMessages.InstanceIdentifier.Builder, org.opendaylight.controller.protobuff.messages.common.NormalizedNodeMessages.InstanceIdentifierOrBuilder>
index c5e4ee4..ac9cb22 100644 (file)
@@ -49,9 +49,9 @@ message ReadDataReply{
 }
 
 message WriteData {
- required InstanceIdentifier instanceIdentifierPathArguments = 1;
-required Node normalizedNode =2;
-
+  // base Helium version
+  required InstanceIdentifier instanceIdentifierPathArguments = 1;
+  required Node normalizedNode = 2;
 }
 
 message WriteDataReply{
@@ -59,9 +59,9 @@ message WriteDataReply{
 }
 
 message MergeData {
- required InstanceIdentifier instanceIdentifierPathArguments = 1;
-required Node normalizedNode =2;
-
+  // base Helium version
+  required InstanceIdentifier instanceIdentifierPathArguments = 1;
+  required Node normalizedNode = 2;
 }
 
 message MergeDataReply{
index 6f14af3..1bc835f 100644 (file)
@@ -71,7 +71,7 @@ public class DataChangeListener extends AbstractUntypedActor {
         // It seems the sender is never null but it doesn't hurt to check. If the caller passes in
         // a null sender (ActorRef.noSender()), akka translates that to the deadLetters actor.
         if(getSender() != null && !getContext().system().deadLetters().equals(getSender())) {
-            getSender().tell(new DataChangedReply(), getSelf());
+            getSender().tell(DataChangedReply.INSTANCE, getSelf());
         }
     }
 
index 1a0ee8c..afec1a0 100644 (file)
@@ -16,22 +16,21 @@ import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeEvent;
 import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeListener;
 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
-import org.opendaylight.yangtools.yang.model.api.SchemaContext;
 
 /**
  * DataChangeListenerProxy represents a single remote DataChangeListener
  */
 public class DataChangeListenerProxy implements AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>>{
     private final ActorSelection dataChangeListenerActor;
-    private final SchemaContext schemaContext;
 
-    public DataChangeListenerProxy(SchemaContext schemaContext, ActorSelection dataChangeListenerActor) {
-        this.dataChangeListenerActor = Preconditions.checkNotNull(dataChangeListenerActor, "dataChangeListenerActor should not be null");
-        this.schemaContext = schemaContext;
+    public DataChangeListenerProxy(ActorSelection dataChangeListenerActor) {
+        this.dataChangeListenerActor = Preconditions.checkNotNull(dataChangeListenerActor,
+                "dataChangeListenerActor should not be null");
     }
 
-    @Override public void onDataChanged(
+    @Override
+    public void onDataChanged(
         AsyncDataChangeEvent<YangInstanceIdentifier, NormalizedNode<?, ?>> change) {
-        dataChangeListenerActor.tell(new DataChanged(schemaContext, change), ActorRef.noSender());
+        dataChangeListenerActor.tell(new DataChanged(change), ActorRef.noSender());
     }
 }
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DataStoreVersions.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DataStoreVersions.java
new file mode 100644 (file)
index 0000000..1f22222
--- /dev/null
@@ -0,0 +1,21 @@
+/*
+ * Copyright (c) 2014 Brocade Communications Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.datastore;
+
+/**
+ * Defines version numbers.
+ *
+ * @author Thomas Pantelis
+ */
+public interface DataStoreVersions {
+    short BASE_HELIUM_VERSION = 0;
+    short HELIUM_1_VERSION = 1;
+    short HELIUM_2_VERSION = 2;
+    short LITHIUM_VERSION = 3;
+    short CURRENT_VERSION = LITHIUM_VERSION;
+}
index 7ef6e04..1661bb4 100644 (file)
@@ -95,8 +95,6 @@ import scala.concurrent.duration.FiniteDuration;
  */
 public class Shard extends RaftActor {
 
-    private static final Object COMMIT_TRANSACTION_REPLY = new CommitTransactionReply().toSerializable();
-
     private static final Object TX_COMMIT_TIMEOUT_CHECK_MESSAGE = "txCommitTimeoutCheck";
 
     public static final String DEFAULT_NAME = "default";
@@ -354,7 +352,7 @@ public class Shard extends RaftActor {
             cohortEntry = commitCoordinator.getAndRemoveCohortEntry(transactionID);
             if(cohortEntry != null) {
                 commitWithNewTransaction(cohortEntry.getModification());
-                sender.tell(COMMIT_TRANSACTION_REPLY, getSelf());
+                sender.tell(CommitTransactionReply.INSTANCE.toSerializable(), getSelf());
             } else {
                 // This really shouldn't happen - it likely means that persistence or replication
                 // took so long to complete such that the cohort entry was expired from the cache.
@@ -376,7 +374,7 @@ public class Shard extends RaftActor {
             // currently uses a same thread executor anyway.
             cohortEntry.getCohort().commit().get();
 
-            sender.tell(COMMIT_TRANSACTION_REPLY, getSelf());
+            sender.tell(CommitTransactionReply.INSTANCE.toSerializable(), getSelf());
 
             shardMBean.incrementCommittedTransactionCount();
             shardMBean.setLastCommittedTransactionTime(System.currentTimeMillis());
@@ -412,7 +410,7 @@ public class Shard extends RaftActor {
         // transactionId so to maintain backwards compatibility, we create a separate cohort actor
         // to provide the compatible behavior.
         ActorRef replyActorPath = self();
-        if(ready.getTxnClientVersion() < CreateTransaction.HELIUM_1_VERSION) {
+        if(ready.getTxnClientVersion() < DataStoreVersions.HELIUM_1_VERSION) {
             LOG.debug("Creating BackwardsCompatibleThreePhaseCommitCohort");
             replyActorPath = getContext().actorOf(BackwardsCompatibleThreePhaseCommitCohort.props(
                     ready.getTransactionID()));
@@ -447,7 +445,7 @@ public class Shard extends RaftActor {
                     shardMBean.incrementAbortTransactionsCount();
 
                     if(sender != null) {
-                        sender.tell(new AbortTransactionReply().toSerializable(), self);
+                        sender.tell(AbortTransactionReply.INSTANCE.toSerializable(), self);
                     }
                 }
 
@@ -480,7 +478,7 @@ public class Shard extends RaftActor {
         // This must be for install snapshot. Don't want to open this up and trigger
         // deSerialization
 
-        self().tell(new CaptureSnapshotReply(ReadDataReply.getNormalizedNodeByteString(message)),
+        self().tell(new CaptureSnapshotReply(ReadDataReply.fromSerializableAsByteString(message)),
                 self());
 
         createSnapshotTransaction = null;
@@ -500,7 +498,8 @@ public class Shard extends RaftActor {
     }
 
     private ActorRef createTypedTransactionActor(int transactionType,
-            ShardTransactionIdentifier transactionId, String transactionChainId, int clientVersion ) {
+            ShardTransactionIdentifier transactionId, String transactionChainId,
+            short clientVersion ) {
 
         DOMStoreTransactionFactory factory = store;
 
@@ -568,7 +567,7 @@ public class Shard extends RaftActor {
     }
 
     private ActorRef createTransaction(int transactionType, String remoteTransactionId,
-            String transactionChainId, int clientVersion) {
+            String transactionChainId, short clientVersion) {
 
         ShardTransactionIdentifier transactionId =
             ShardTransactionIdentifier.builder()
@@ -659,7 +658,7 @@ public class Shard extends RaftActor {
         dataChangeListeners.add(dataChangeListenerPath);
 
         AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>> listener =
-                new DataChangeListenerProxy(schemaContext, dataChangeListenerPath);
+                new DataChangeListenerProxy(dataChangeListenerPath);
 
         LOG.debug("Registering for path {}", registerChangeListener.getPath());
 
@@ -818,7 +817,7 @@ public class Shard extends RaftActor {
             createSnapshotTransaction = createTransaction(
                 TransactionProxy.TransactionType.READ_ONLY.ordinal(),
                 "createSnapshot" + ++createSnapshotTransactionCounter, "",
-                CreateTransaction.CURRENT_VERSION);
+                DataStoreVersions.CURRENT_VERSION);
 
             createSnapshotTransaction.tell(
                 new ReadData(YangInstanceIdentifier.builder().build()).toSerializable(), self());
index f3b4e41..19fa266 100644 (file)
@@ -7,6 +7,10 @@
  */
 package org.opendaylight.controller.cluster.datastore;
 
+import akka.actor.ActorRef;
+import akka.actor.Status;
+import com.google.common.cache.Cache;
+import com.google.common.cache.CacheBuilder;
 import java.util.LinkedList;
 import java.util.Queue;
 import java.util.concurrent.ExecutionException;
@@ -17,10 +21,6 @@ import org.opendaylight.controller.cluster.datastore.modification.Modification;
 import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-import akka.actor.ActorRef;
-import akka.actor.Status;
-import com.google.common.cache.Cache;
-import com.google.common.cache.CacheBuilder;
 
 /**
  * Coordinates commits for a shard ensuring only one concurrent 3-phase commit.
@@ -31,12 +31,6 @@ public class ShardCommitCoordinator {
 
     private static final Logger LOG = LoggerFactory.getLogger(ShardCommitCoordinator.class);
 
-    private static final Object CAN_COMMIT_REPLY_TRUE =
-            new CanCommitTransactionReply(Boolean.TRUE).toSerializable();
-
-    private static final Object CAN_COMMIT_REPLY_FALSE =
-            new CanCommitTransactionReply(Boolean.FALSE).toSerializable();
-
     private final Cache<String, CohortEntry> cohortCache;
 
     private CohortEntry currentCohortEntry;
@@ -138,7 +132,8 @@ public class ShardCommitCoordinator {
             Boolean canCommit = cohortEntry.getCohort().canCommit().get();
 
             cohortEntry.getCanCommitSender().tell(
-                    canCommit ? CAN_COMMIT_REPLY_TRUE : CAN_COMMIT_REPLY_FALSE, cohortEntry.getShard());
+                    canCommit ? CanCommitTransactionReply.YES.toSerializable() :
+                        CanCommitTransactionReply.NO.toSerializable(), cohortEntry.getShard());
 
             if(!canCommit) {
                 // Remove the entry from the cache now since the Tx will be aborted.
index 9d8f572..be9c4d8 100644 (file)
@@ -27,8 +27,8 @@ public class ShardReadTransaction extends ShardTransaction {
 
     public ShardReadTransaction(DOMStoreReadTransaction transaction, ActorRef shardActor,
             SchemaContext schemaContext, ShardStats shardStats, String transactionID,
-            int txnClientVersion) {
-        super(shardActor, schemaContext, shardStats, transactionID, txnClientVersion);
+            short clientTxVersion) {
+        super(shardActor, schemaContext, shardStats, transactionID, clientTxVersion);
         this.transaction = transaction;
     }
 
index e558677..b394da8 100644 (file)
@@ -11,7 +11,6 @@
 package org.opendaylight.controller.cluster.datastore;
 
 import akka.actor.ActorRef;
-
 import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shard.ShardStats;
 import org.opendaylight.controller.cluster.datastore.messages.DataExists;
 import org.opendaylight.controller.cluster.datastore.messages.ReadData;
@@ -27,8 +26,8 @@ public class ShardReadWriteTransaction extends ShardWriteTransaction {
 
     public ShardReadWriteTransaction(DOMStoreReadWriteTransaction transaction, ActorRef shardActor,
             SchemaContext schemaContext, ShardStats shardStats, String transactionID,
-            int txnClientVersion) {
-        super(transaction, shardActor, schemaContext, shardStats, transactionID, txnClientVersion);
+            short clientTxVersion) {
+        super(transaction, shardActor, schemaContext, shardStats, transactionID, clientTxVersion);
         this.transaction = transaction;
     }
 
index 59bb4bf..678b781 100644 (file)
@@ -63,21 +63,21 @@ public abstract class ShardTransaction extends AbstractUntypedActorWithMetering
     private final SchemaContext schemaContext;
     private final ShardStats shardStats;
     private final String transactionID;
-    private final int txnClientVersion;
+    private final short clientTxVersion;
 
     protected ShardTransaction(ActorRef shardActor, SchemaContext schemaContext,
-            ShardStats shardStats, String transactionID, int txnClientVersion) {
+            ShardStats shardStats, String transactionID, short clientTxVersion) {
         super("shard-tx"); //actor name override used for metering. This does not change the "real" actor name
         this.shardActor = shardActor;
         this.schemaContext = schemaContext;
         this.shardStats = shardStats;
         this.transactionID = transactionID;
-        this.txnClientVersion = txnClientVersion;
+        this.clientTxVersion = clientTxVersion;
     }
 
     public static Props props(DOMStoreTransaction transaction, ActorRef shardActor,
             SchemaContext schemaContext,DatastoreContext datastoreContext, ShardStats shardStats,
-            String transactionID, int txnClientVersion) {
+            String transactionID, short txnClientVersion) {
         return Props.create(new ShardTransactionCreator(transaction, shardActor, schemaContext,
            datastoreContext, shardStats, transactionID, txnClientVersion));
     }
@@ -96,8 +96,8 @@ public abstract class ShardTransaction extends AbstractUntypedActorWithMetering
         return schemaContext;
     }
 
-    protected int getTxnClientVersion() {
-        return txnClientVersion;
+    protected short getClientTxVersion() {
+        return clientTxVersion;
     }
 
     @Override
@@ -118,28 +118,28 @@ public abstract class ShardTransaction extends AbstractUntypedActorWithMetering
         getDOMStoreTransaction().close();
 
         if(sendReply) {
-            getSender().tell(new CloseTransactionReply().toSerializable(), getSelf());
+            getSender().tell(CloseTransactionReply.INSTANCE.toSerializable(), getSelf());
         }
 
         getSelf().tell(PoisonPill.getInstance(), getSelf());
     }
 
-    protected void readData(DOMStoreReadTransaction transaction, ReadData message, final boolean returnSerialized) {
+    protected void readData(DOMStoreReadTransaction transaction, ReadData message,
+            final boolean returnSerialized) {
         final ActorRef sender = getSender();
         final ActorRef self = getSelf();
         final YangInstanceIdentifier path = message.getPath();
         final CheckedFuture<Optional<NormalizedNode<?, ?>>, ReadFailedException> future =
                 transaction.read(path);
 
-
         future.addListener(new Runnable() {
             @Override
             public void run() {
                 try {
                     Optional<NormalizedNode<?, ?>> optional = future.checkedGet();
-                    ReadDataReply readDataReply = new ReadDataReply(schemaContext, optional.orNull());
+                    ReadDataReply readDataReply = new ReadDataReply(optional.orNull());
 
-                    sender.tell((returnSerialized ? readDataReply.toSerializable():
+                    sender.tell((returnSerialized ? readDataReply.toSerializable(clientTxVersion):
                         readDataReply), self);
 
                 } catch (Exception e) {
@@ -176,11 +176,11 @@ public abstract class ShardTransaction extends AbstractUntypedActorWithMetering
         final DatastoreContext datastoreContext;
         final ShardStats shardStats;
         final String transactionID;
-        final int txnClientVersion;
+        final short txnClientVersion;
 
         ShardTransactionCreator(DOMStoreTransaction transaction, ActorRef shardActor,
                 SchemaContext schemaContext, DatastoreContext datastoreContext,
-                ShardStats shardStats, String transactionID, int txnClientVersion) {
+                ShardStats shardStats, String transactionID, short txnClientVersion) {
             this.transaction = transaction;
             this.shardActor = shardActor;
             this.shardStats = shardStats;
index 78c6a55..8ba6139 100644 (file)
@@ -12,7 +12,6 @@ import akka.actor.ActorRef;
 import akka.actor.Props;
 import akka.japi.Creator;
 import org.opendaylight.controller.cluster.common.actor.AbstractUntypedActor;
-
 import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shard.ShardStats;
 import org.opendaylight.controller.cluster.datastore.messages.CloseTransactionChain;
 import org.opendaylight.controller.cluster.datastore.messages.CloseTransactionChainReply;
@@ -46,7 +45,7 @@ public class ShardTransactionChain extends AbstractUntypedActor {
             createTransaction(createTransaction);
         } else if (message.getClass().equals(CloseTransactionChain.SERIALIZABLE_CLASS)) {
             chain.close();
-            getSender().tell(new CloseTransactionChainReply().toSerializable(), getSelf());
+            getSender().tell(CloseTransactionChainReply.INSTANCE.toSerializable(), getSelf());
         }else{
             unknownMessage(message);
         }
index 44f2c7b..2e43219 100644 (file)
@@ -43,8 +43,8 @@ public class ShardWriteTransaction extends ShardTransaction {
 
     public ShardWriteTransaction(DOMStoreWriteTransaction transaction, ActorRef shardActor,
             SchemaContext schemaContext, ShardStats shardStats, String transactionID,
-            int txnClientVersion) {
-        super(shardActor, schemaContext, shardStats, transactionID, txnClientVersion);
+            short clientTxVersion) {
+        super(shardActor, schemaContext, shardStats, transactionID, clientTxVersion);
         this.transaction = transaction;
     }
 
@@ -66,19 +66,19 @@ public class ShardWriteTransaction extends ShardTransaction {
             deleteData(transaction, (DeleteData) message, !SERIALIZED_REPLY);
 
         } else if (message instanceof ReadyTransaction) {
-            readyTransaction(transaction, new ReadyTransaction(), !SERIALIZED_REPLY);
+            readyTransaction(transaction, !SERIALIZED_REPLY);
 
-        } else if(WriteData.SERIALIZABLE_CLASS.equals(message.getClass())) {
-            writeData(transaction, WriteData.fromSerializable(message, getSchemaContext()), SERIALIZED_REPLY);
+        } else if(WriteData.isSerializedType(message)) {
+            writeData(transaction, WriteData.fromSerializable(message), SERIALIZED_REPLY);
 
-        } else if(MergeData.SERIALIZABLE_CLASS.equals(message.getClass())) {
-            mergeData(transaction, MergeData.fromSerializable(message, getSchemaContext()), SERIALIZED_REPLY);
+        } else if(MergeData.isSerializedType(message)) {
+            mergeData(transaction, MergeData.fromSerializable(message), SERIALIZED_REPLY);
 
         } else if(DeleteData.SERIALIZABLE_CLASS.equals(message.getClass())) {
             deleteData(transaction, DeleteData.fromSerializable(message), SERIALIZED_REPLY);
 
         } else if(ReadyTransaction.SERIALIZABLE_CLASS.equals(message.getClass())) {
-            readyTransaction(transaction, new ReadyTransaction(), SERIALIZED_REPLY);
+            readyTransaction(transaction, SERIALIZED_REPLY);
 
         } else if (message instanceof GetCompositedModification) {
             // This is here for testing only
@@ -97,9 +97,9 @@ public class ShardWriteTransaction extends ShardTransaction {
                 new WriteModification(message.getPath(), message.getData(), getSchemaContext()));
         try {
             transaction.write(message.getPath(), message.getData());
-            WriteDataReply writeDataReply = new WriteDataReply();
-            getSender().tell(returnSerialized ? writeDataReply.toSerializable() : writeDataReply,
-                getSelf());
+            WriteDataReply writeDataReply = WriteDataReply.INSTANCE;
+            getSender().tell(returnSerialized ? writeDataReply.toSerializable(message.getVersion()) :
+                writeDataReply, getSelf());
         }catch(Exception e){
             getSender().tell(new akka.actor.Status.Failure(e), getSelf());
         }
@@ -114,9 +114,9 @@ public class ShardWriteTransaction extends ShardTransaction {
 
         try {
             transaction.merge(message.getPath(), message.getData());
-            MergeDataReply mergeDataReply = new MergeDataReply();
-            getSender().tell(returnSerialized ? mergeDataReply.toSerializable() : mergeDataReply ,
-                getSelf());
+            MergeDataReply mergeDataReply = MergeDataReply.INSTANCE;
+            getSender().tell(returnSerialized ? mergeDataReply.toSerializable(message.getVersion()) :
+                mergeDataReply, getSelf());
         }catch(Exception e){
             getSender().tell(new akka.actor.Status.Failure(e), getSelf());
         }
@@ -137,15 +137,14 @@ public class ShardWriteTransaction extends ShardTransaction {
         }
     }
 
-    private void readyTransaction(DOMStoreWriteTransaction transaction, ReadyTransaction message,
-            boolean returnSerialized) {
+    private void readyTransaction(DOMStoreWriteTransaction transaction, boolean returnSerialized) {
         String transactionID = getTransactionID();
 
         LOG.debug("readyTransaction : {}", transactionID);
 
         DOMStoreThreePhaseCommitCohort cohort =  transaction.ready();
 
-        getShardActor().forward(new ForwardedReadyTransaction(transactionID, getTxnClientVersion(),
+        getShardActor().forward(new ForwardedReadyTransaction(transactionID, getClientTxVersion(),
                 cohort, modification, returnSerialized), getContext());
 
         // The shard will handle the commit from here so we're no longer needed - self-destruct.
index 7703f48..f34e88f 100644 (file)
@@ -43,6 +43,7 @@ import org.opendaylight.controller.cluster.datastore.messages.ReadDataReply;
 import org.opendaylight.controller.cluster.datastore.messages.ReadyTransaction;
 import org.opendaylight.controller.cluster.datastore.messages.ReadyTransactionReply;
 import org.opendaylight.controller.cluster.datastore.messages.SerializableMessage;
+import org.opendaylight.controller.cluster.datastore.messages.VersionedSerializableMessage;
 import org.opendaylight.controller.cluster.datastore.messages.WriteData;
 import org.opendaylight.controller.cluster.datastore.shardstrategy.ShardStrategyFactory;
 import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
@@ -157,8 +158,7 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction {
             if(remoteTransactionActorsMB.get()) {
                 for(ActorSelection actor : remoteTransactionActors) {
                     LOG.trace("Sending CloseTransaction to {}", actor);
-                    actorContext.sendOperationAsync(actor,
-                            new CloseTransaction().toSerializable());
+                    actorContext.sendOperationAsync(actor, CloseTransaction.INSTANCE.toSerializable());
                 }
             }
         }
@@ -617,10 +617,6 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction {
             }
         }
 
-
-
-
-
         /**
          * Performs a CreateTransaction try async.
          */
@@ -763,11 +759,11 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction {
         private final String transactionPath;
         private final ActorSelection actor;
         private final boolean isTxActorLocal;
-        private final int remoteTransactionVersion;
+        private final short remoteTransactionVersion;
 
         private TransactionContextImpl(String transactionPath, ActorSelection actor, TransactionIdentifier identifier,
                 ActorContext actorContext, SchemaContext schemaContext,
-                boolean isTxActorLocal, int remoteTransactionVersion) {
+                boolean isTxActorLocal, short remoteTransactionVersion) {
             super(identifier);
             this.transactionPath = transactionPath;
             this.actor = actor;
@@ -785,11 +781,16 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction {
             return actorContext.executeOperationAsync(getActor(), isTxActorLocal ? msg : msg.toSerializable());
         }
 
+        private Future<Object> executeOperationAsync(VersionedSerializableMessage msg) {
+            return actorContext.executeOperationAsync(getActor(), isTxActorLocal ? msg :
+                msg.toSerializable(remoteTransactionVersion));
+        }
+
         @Override
         public void closeTransaction() {
             LOG.debug("Tx {} closeTransaction called", identifier);
 
-            actorContext.sendOperationAsync(getActor(), new CloseTransaction().toSerializable());
+            actorContext.sendOperationAsync(getActor(), CloseTransaction.INSTANCE.toSerializable());
         }
 
         @Override
@@ -799,7 +800,7 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction {
 
             // Send the ReadyTransaction message to the Tx actor.
 
-            final Future<Object> replyFuture = executeOperationAsync(new ReadyTransaction());
+            final Future<Object> replyFuture = executeOperationAsync(ReadyTransaction.INSTANCE);
 
             // Combine all the previously recorded put/merge/delete operation reply Futures and the
             // ReadyTransactionReply Future into one Future. If any one fails then the combined
@@ -846,7 +847,8 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction {
                         // At some point in the future when upgrades from Helium are not supported
                         // we could remove this code to resolvePath and just use the cohortPath as the
                         // resolved cohortPath
-                        if(TransactionContextImpl.this.remoteTransactionVersion < CreateTransaction.HELIUM_1_VERSION) {
+                        if(TransactionContextImpl.this.remoteTransactionVersion <
+                                DataStoreVersions.HELIUM_1_VERSION) {
                             cohortPath = actorContext.resolvePath(transactionPath, cohortPath);
                         }
 
@@ -872,14 +874,14 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction {
         public void mergeData(YangInstanceIdentifier path, NormalizedNode<?, ?> data) {
             LOG.debug("Tx {} mergeData called path = {}", identifier, path);
 
-            recordedOperationFutures.add(executeOperationAsync(new MergeData(path, data, schemaContext)));
+            recordedOperationFutures.add(executeOperationAsync(new MergeData(path, data)));
         }
 
         @Override
         public void writeData(YangInstanceIdentifier path, NormalizedNode<?, ?> data) {
             LOG.debug("Tx {} writeData called path = {}", identifier, path);
 
-            recordedOperationFutures.add(executeOperationAsync(new WriteData(path, data, schemaContext)));
+            recordedOperationFutures.add(executeOperationAsync(new WriteData(path, data)));
         }
 
         @Override
@@ -950,8 +952,8 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction {
                             ReadDataReply reply = (ReadDataReply) readResponse;
                             returnFuture.set(Optional.<NormalizedNode<?, ?>>fromNullable(reply.getNormalizedNode()));
 
-                        } else if (readResponse.getClass().equals(ReadDataReply.SERIALIZABLE_CLASS)) {
-                            ReadDataReply reply = ReadDataReply.fromSerializable(schemaContext, path, readResponse);
+                        } else if (ReadDataReply.isSerializedType(readResponse)) {
+                            ReadDataReply reply = ReadDataReply.fromSerializable(readResponse);
                             returnFuture.set(Optional.<NormalizedNode<?, ?>>fromNullable(reply.getNormalizedNode()));
 
                         } else {
index 79c6b03..3680aec 100644 (file)
@@ -14,8 +14,13 @@ public class AbortTransactionReply implements SerializableMessage {
     public static final Class<ThreePhaseCommitCohortMessages.AbortTransactionReply> SERIALIZABLE_CLASS =
             ThreePhaseCommitCohortMessages.AbortTransactionReply.class;
 
+    private static final Object SERIALIZED_INSTANCE =
+            ThreePhaseCommitCohortMessages.AbortTransactionReply.newBuilder().build();
+
+    public static final AbortTransactionReply INSTANCE = new AbortTransactionReply();
+
     @Override
     public Object toSerializable() {
-        return ThreePhaseCommitCohortMessages.AbortTransactionReply.newBuilder().build();
+        return SERIALIZED_INSTANCE;
     }
 }
index 4d121ba..7db4846 100644 (file)
@@ -14,23 +14,30 @@ public class CanCommitTransactionReply implements SerializableMessage {
     public static final Class<ThreePhaseCommitCohortMessages.CanCommitTransactionReply> SERIALIZABLE_CLASS =
             ThreePhaseCommitCohortMessages.CanCommitTransactionReply.class;
 
-    private final Boolean canCommit;
+    public static final CanCommitTransactionReply YES = new CanCommitTransactionReply(true);
+    public static final CanCommitTransactionReply NO = new CanCommitTransactionReply(false);
 
-    public CanCommitTransactionReply(final Boolean canCommit) {
+    private final boolean canCommit;
+    private final Object serializedMessage;
+
+    private CanCommitTransactionReply(final boolean canCommit) {
         this.canCommit = canCommit;
+        this.serializedMessage = ThreePhaseCommitCohortMessages.CanCommitTransactionReply.newBuilder().
+                setCanCommit(canCommit).build();
     }
 
-    public Boolean getCanCommit() {
+    public boolean getCanCommit() {
         return canCommit;
     }
 
     @Override
     public Object toSerializable() {
-        return ThreePhaseCommitCohortMessages.CanCommitTransactionReply.newBuilder().setCanCommit(canCommit).build();
+        return serializedMessage;
     }
 
     public static CanCommitTransactionReply fromSerializable(final Object message) {
-        return new CanCommitTransactionReply(
-                ((ThreePhaseCommitCohortMessages.CanCommitTransactionReply) message).getCanCommit());
+        ThreePhaseCommitCohortMessages.CanCommitTransactionReply serialized =
+                (ThreePhaseCommitCohortMessages.CanCommitTransactionReply) message;
+        return serialized.getCanCommit() ? YES : NO;
     }
 }
index c73111f..ef1aac8 100644 (file)
@@ -11,10 +11,16 @@ package org.opendaylight.controller.cluster.datastore.messages;
 import org.opendaylight.controller.protobuff.messages.transaction.ShardTransactionMessages;
 
 public class CloseTransaction implements SerializableMessage{
-  public static final Class<ShardTransactionMessages.CloseTransaction> SERIALIZABLE_CLASS =
-          ShardTransactionMessages.CloseTransaction.class;
-  @Override
-  public Object toSerializable() {
-    return ShardTransactionMessages.CloseTransaction.newBuilder().build();
-  }
+    public static final Class<ShardTransactionMessages.CloseTransaction> SERIALIZABLE_CLASS =
+            ShardTransactionMessages.CloseTransaction.class;
+
+    private static final Object SERIALIZED_INSTANCE =
+            ShardTransactionMessages.CloseTransaction.newBuilder().build();
+
+    public static final CloseTransaction INSTANCE = new CloseTransaction();
+
+    @Override
+    public Object toSerializable() {
+        return SERIALIZED_INSTANCE;
+    }
 }
index c001ae1..b4673e8 100644 (file)
@@ -11,11 +11,16 @@ package org.opendaylight.controller.cluster.datastore.messages;
 import org.opendaylight.controller.protobuff.messages.transaction.ShardTransactionChainMessages;
 
 public class CloseTransactionChainReply implements SerializableMessage {
-  public static final Class<ShardTransactionChainMessages.CloseTransactionChainReply> SERIALIZABLE_CLASS =
-          ShardTransactionChainMessages.CloseTransactionChainReply.class;
-  @Override
-  public Object toSerializable() {
-    return ShardTransactionChainMessages.CloseTransactionChainReply.newBuilder().build();
-  }
+    public static final Class<ShardTransactionChainMessages.CloseTransactionChainReply> SERIALIZABLE_CLASS =
+            ShardTransactionChainMessages.CloseTransactionChainReply.class;
 
+    private static final Object SERIALIZED_INSTANCE =
+            ShardTransactionChainMessages.CloseTransactionChainReply.newBuilder().build();
+
+    public static final CloseTransactionChainReply INSTANCE = new CloseTransactionChainReply();
+
+    @Override
+    public Object toSerializable() {
+        return SERIALIZED_INSTANCE;
+    }
 }
index 124eeb2..1c47a18 100644 (file)
@@ -11,10 +11,16 @@ package org.opendaylight.controller.cluster.datastore.messages;
 import org.opendaylight.controller.protobuff.messages.transaction.ShardTransactionMessages;
 
 public class CloseTransactionReply implements SerializableMessage {
-  public static final Class<ShardTransactionMessages.CloseTransactionReply> SERIALIZABLE_CLASS =
-          ShardTransactionMessages.CloseTransactionReply.class;
-  @Override
-  public Object toSerializable() {
-    return ShardTransactionMessages.CloseTransactionReply.newBuilder().build();
-  }
+    public static final Class<ShardTransactionMessages.CloseTransactionReply> SERIALIZABLE_CLASS =
+            ShardTransactionMessages.CloseTransactionReply.class;
+
+    private static final Object SERIALIZED_INSTANCE =
+            ShardTransactionMessages.CloseTransactionReply.newBuilder().build();
+
+    public static final CloseTransactionReply INSTANCE = new CloseTransactionReply();
+
+    @Override
+    public Object toSerializable() {
+        return SERIALIZED_INSTANCE;
+    }
 }
index 3d4a168..47adea5 100644 (file)
@@ -14,8 +14,13 @@ public class CommitTransactionReply implements SerializableMessage {
     public static final Class<ThreePhaseCommitCohortMessages.CommitTransactionReply> SERIALIZABLE_CLASS =
             ThreePhaseCommitCohortMessages.CommitTransactionReply.class;
 
+    private static final Object SERIALIZED_INSTANCE =
+            ThreePhaseCommitCohortMessages.CommitTransactionReply.newBuilder().build();
+
+    public static final CommitTransactionReply INSTANCE = new CommitTransactionReply();
+
     @Override
     public Object toSerializable() {
-        return ThreePhaseCommitCohortMessages.CommitTransactionReply.newBuilder().build();
+        return SERIALIZED_INSTANCE;
     }
 }
index bf82e66..ea3caef 100644 (file)
@@ -9,6 +9,7 @@
 package org.opendaylight.controller.cluster.datastore.messages;
 
 
+import org.opendaylight.controller.cluster.datastore.DataStoreVersions;
 import org.opendaylight.controller.protobuff.messages.transaction.ShardTransactionMessages;
 
 
@@ -16,24 +17,21 @@ public class CreateTransaction implements SerializableMessage {
     public static final Class<ShardTransactionMessages.CreateTransaction> SERIALIZABLE_CLASS =
             ShardTransactionMessages.CreateTransaction.class;
 
-    public static final int HELIUM_1_VERSION = 1;
-    public static final int CURRENT_VERSION = HELIUM_1_VERSION;
-
     private final String transactionId;
     private final int transactionType;
     private final String transactionChainId;
-    private final int version;
+    private final short version;
 
     public CreateTransaction(String transactionId, int transactionType) {
         this(transactionId, transactionType, "");
     }
 
     public CreateTransaction(String transactionId, int transactionType, String transactionChainId) {
-        this(transactionId, transactionType, transactionChainId, CURRENT_VERSION);
+        this(transactionId, transactionType, transactionChainId, DataStoreVersions.CURRENT_VERSION);
     }
 
     private CreateTransaction(String transactionId, int transactionType, String transactionChainId,
-            int version) {
+            short version) {
         this.transactionId = transactionId;
         this.transactionType = transactionType;
         this.transactionChainId = transactionChainId;
@@ -48,7 +46,7 @@ public class CreateTransaction implements SerializableMessage {
         return transactionType;
     }
 
-    public int getVersion() {
+    public short getVersion() {
         return version;
     }
 
@@ -66,7 +64,7 @@ public class CreateTransaction implements SerializableMessage {
             (ShardTransactionMessages.CreateTransaction) message;
         return new CreateTransaction(createTransaction.getTransactionId(),
             createTransaction.getTransactionType(), createTransaction.getTransactionChainId(),
-            createTransaction.getMessageVersion());
+            (short)createTransaction.getMessageVersion());
     }
 
     public String getTransactionChainId() {
index 83e68c9..3fde6cc 100644 (file)
@@ -8,23 +8,22 @@
 
 package org.opendaylight.controller.cluster.datastore.messages;
 
+import org.opendaylight.controller.cluster.datastore.DataStoreVersions;
 import org.opendaylight.controller.protobuff.messages.transaction.ShardTransactionMessages;
 
 public class CreateTransactionReply implements SerializableMessage {
 
-    public static final Class<ShardTransactionMessages.CreateTransactionReply> SERIALIZABLE_CLASS =
-            ShardTransactionMessages.CreateTransactionReply.class;
+    public static final Class SERIALIZABLE_CLASS = ShardTransactionMessages.CreateTransactionReply.class;
     private final String transactionPath;
     private final String transactionId;
-    private final int version;
+    private final short version;
 
-    public CreateTransactionReply(final String transactionPath,
-        final String transactionId) {
-        this(transactionPath, transactionId, CreateTransaction.CURRENT_VERSION);
+    public CreateTransactionReply(String transactionPath, String transactionId) {
+        this(transactionPath, transactionId, DataStoreVersions.CURRENT_VERSION);
     }
 
     public CreateTransactionReply(final String transactionPath,
-                                  final String transactionId, final int version) {
+                                  final String transactionId, final short version) {
         this.transactionPath = transactionPath;
         this.transactionId = transactionId;
         this.version = version;
@@ -39,7 +38,7 @@ public class CreateTransactionReply implements SerializableMessage {
         return transactionId;
     }
 
-    public int getVersion() {
+    public short getVersion() {
         return version;
     }
 
@@ -52,9 +51,10 @@ public class CreateTransactionReply implements SerializableMessage {
             .build();
     }
 
-    public static CreateTransactionReply fromSerializable(final Object serializable){
+    public static CreateTransactionReply fromSerializable(Object serializable){
         ShardTransactionMessages.CreateTransactionReply o = (ShardTransactionMessages.CreateTransactionReply) serializable;
-        return new CreateTransactionReply(o.getTransactionActorPath(), o.getTransactionId(), o.getMessageVersion());
+        return new CreateTransactionReply(o.getTransactionActorPath(), o.getTransactionId(),
+                (short)o.getMessageVersion());
     }
 
 }
index 5b5f076..fe81e27 100644 (file)
 
 package org.opendaylight.controller.cluster.datastore.messages;
 
-import org.opendaylight.controller.cluster.datastore.node.NormalizedNodeToNodeCodec;
-import org.opendaylight.controller.cluster.datastore.util.InstanceIdentifierUtils;
+import java.io.Externalizable;
+import java.io.IOException;
+import java.io.ObjectInput;
+import java.io.ObjectOutput;
+import java.util.Map;
+import java.util.Set;
+import org.opendaylight.controller.cluster.datastore.DataStoreVersions;
+import org.opendaylight.controller.cluster.datastore.node.utils.stream.NormalizedNodeInputStreamReader;
+import org.opendaylight.controller.cluster.datastore.node.utils.stream.NormalizedNodeOutputStreamWriter;
+import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker.DataChangeScope;
 import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeEvent;
-import org.opendaylight.controller.protobuff.messages.common.NormalizedNodeMessages;
-import org.opendaylight.controller.protobuff.messages.datachange.notification.DataChangeListenerMessages;
+import org.opendaylight.controller.md.sal.dom.store.impl.DOMImmutableDataChangeEvent;
+import org.opendaylight.controller.md.sal.dom.store.impl.DOMImmutableDataChangeEvent.Builder;
 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
-import org.opendaylight.yangtools.yang.model.api.SchemaContext;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-
-public class DataChanged implements SerializableMessage {
-    public static final Class<DataChangeListenerMessages.DataChanged> SERIALIZABLE_CLASS =
-        DataChangeListenerMessages.DataChanged.class;
+import org.opendaylight.yangtools.yang.data.api.schema.stream.NormalizedNodeWriter;
 
-    final private SchemaContext schemaContext;
-    private final AsyncDataChangeEvent<YangInstanceIdentifier, NormalizedNode<?, ?>>
-        change;
+public class DataChanged implements Externalizable {
+    private static final long serialVersionUID = 1L;
 
+    private AsyncDataChangeEvent<YangInstanceIdentifier, NormalizedNode<?, ?>> change;
 
+    public DataChanged() {
+    }
 
-    public DataChanged(SchemaContext schemaContext,
-        AsyncDataChangeEvent<YangInstanceIdentifier, NormalizedNode<?, ?>> change) {
+    public DataChanged(AsyncDataChangeEvent<YangInstanceIdentifier, NormalizedNode<?, ?>> change) {
         this.change = change;
-        this.schemaContext = schemaContext;
     }
 
-
     public AsyncDataChangeEvent<YangInstanceIdentifier, NormalizedNode<?, ?>> getChange() {
         return change;
     }
 
+    @Override
+    public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
+        in.readShort(); // Read the version
 
-    private NormalizedNodeMessages.Node convertToNodeTree(
-        NormalizedNode<?, ?> normalizedNode) {
+        NormalizedNodeInputStreamReader streamReader = new NormalizedNodeInputStreamReader(in);
 
-        return new NormalizedNodeToNodeCodec(schemaContext)
-            .encode(normalizedNode)
-            .getNormalizedNode();
+        // Note: the scope passed to builder is not actually used.
+        Builder builder = DOMImmutableDataChangeEvent.builder(DataChangeScope.SUBTREE);
 
-    }
+        // Read created data
 
-    private Iterable<NormalizedNodeMessages.InstanceIdentifier> convertToRemovePaths(
-        Set<YangInstanceIdentifier> removedPaths) {
-        final Set<NormalizedNodeMessages.InstanceIdentifier> removedPathInstanceIds = new HashSet<>();
-        for (YangInstanceIdentifier id : removedPaths) {
-            removedPathInstanceIds.add(InstanceIdentifierUtils.toSerializable(id));
+        int size = in.readInt();
+        for(int i = 0; i < size; i++) {
+            YangInstanceIdentifier path = streamReader.readYangInstanceIdentifier();
+            NormalizedNode<?, ?> node = streamReader.readNormalizedNode();
+            builder.addCreated(path, node);
         }
-        return new Iterable<NormalizedNodeMessages.InstanceIdentifier>() {
-            @Override
-            public Iterator<NormalizedNodeMessages.InstanceIdentifier> iterator() {
-                return removedPathInstanceIds.iterator();
-            }
-        };
 
-    }
+        // Read updated data
 
-    private NormalizedNodeMessages.NodeMap convertToNodeMap(
-        Map<YangInstanceIdentifier, NormalizedNode<?, ?>> data) {
-        NormalizedNodeToNodeCodec normalizedNodeToNodeCodec =
-            new NormalizedNodeToNodeCodec(schemaContext);
-        NormalizedNodeMessages.NodeMap.Builder nodeMapBuilder =
-            NormalizedNodeMessages.NodeMap.newBuilder();
-        NormalizedNodeMessages.NodeMapEntry.Builder builder =
-            NormalizedNodeMessages.NodeMapEntry.newBuilder();
-        for (Map.Entry<YangInstanceIdentifier, NormalizedNode<?, ?>> entry : data
-            .entrySet()) {
-
-
-            NormalizedNodeMessages.InstanceIdentifier instanceIdentifier =
-                InstanceIdentifierUtils.toSerializable(entry.getKey());
-
-            builder.setInstanceIdentifierPath(instanceIdentifier)
-                .setNormalizedNode(normalizedNodeToNodeCodec
-                    .encode(entry.getValue())
-                    .getNormalizedNode());
-            nodeMapBuilder.addMapEntries(builder.build());
+        size = in.readInt();
+        for(int i = 0; i < size; i++) {
+            YangInstanceIdentifier path = streamReader.readYangInstanceIdentifier();
+            NormalizedNode<?, ?> before = streamReader.readNormalizedNode();
+            NormalizedNode<?, ?> after = streamReader.readNormalizedNode();
+            builder.addUpdated(path, before, after);
         }
-        return nodeMapBuilder.build();
-    }
-
 
-    @Override
-    public Object toSerializable() {
-        return DataChangeListenerMessages.DataChanged.newBuilder()
-            .addAllRemovedPaths(convertToRemovePaths(change.getRemovedPaths()))
-            .setCreatedData(convertToNodeMap(change.getCreatedData()))
-            .setOriginalData(convertToNodeMap(change.getOriginalData()))
-            .setUpdatedData(convertToNodeMap(change.getUpdatedData()))
-            .setOriginalSubTree(convertToNodeTree(change.getOriginalSubtree()))
-            .setUpdatedSubTree(convertToNodeTree(change.getUpdatedSubtree()))
-            .build();
-    }
+        // Read removed data
 
-    public static DataChanged fromSerialize(SchemaContext sc, Object message,
-        YangInstanceIdentifier pathId) {
-        DataChangeListenerMessages.DataChanged dataChanged =
-            (DataChangeListenerMessages.DataChanged) message;
-        DataChangedEvent event = new DataChangedEvent(sc);
-        if (dataChanged.getCreatedData() != null && dataChanged.getCreatedData()
-            .isInitialized()) {
-            event.setCreatedData(dataChanged.getCreatedData());
-        }
-        if (dataChanged.getOriginalData() != null && dataChanged
-            .getOriginalData().isInitialized()) {
-            event.setOriginalData(dataChanged.getOriginalData());
+        size = in.readInt();
+        for(int i = 0; i < size; i++) {
+            YangInstanceIdentifier path = streamReader.readYangInstanceIdentifier();
+            NormalizedNode<?, ?> node = streamReader.readNormalizedNode();
+            builder.addRemoved(path, node);
         }
 
-        if (dataChanged.getUpdatedData() != null && dataChanged.getUpdatedData()
-            .isInitialized()) {
-            event.setUpdateData(dataChanged.getUpdatedData());
-        }
+        // Read original subtree
 
-        if (dataChanged.getOriginalSubTree() != null && dataChanged
-            .getOriginalSubTree().isInitialized()) {
-            event.setOriginalSubtree(dataChanged.getOriginalSubTree(), pathId);
+        boolean present = in.readBoolean();
+        if(present) {
+            builder.setBefore(streamReader.readNormalizedNode());
         }
 
-        if (dataChanged.getUpdatedSubTree() != null && dataChanged
-            .getUpdatedSubTree().isInitialized()) {
-            event.setUpdatedSubtree(dataChanged.getOriginalSubTree(), pathId);
-        }
+        // Read updated subtree
 
-        if (dataChanged.getRemovedPathsList() != null && !dataChanged
-            .getRemovedPathsList().isEmpty()) {
-            event.setRemovedPaths(dataChanged.getRemovedPathsList());
+        present = in.readBoolean();
+        if(present) {
+            builder.setAfter(streamReader.readNormalizedNode());
         }
 
-        return new DataChanged(sc, event);
-
+        change = builder.build();
     }
 
-    static class DataChangedEvent implements
-        AsyncDataChangeEvent<YangInstanceIdentifier, NormalizedNode<?, ?>> {
-        private Map<YangInstanceIdentifier, NormalizedNode<?, ?>> createdData;
-        private final NormalizedNodeToNodeCodec nodeCodec;
-        private Map<YangInstanceIdentifier, NormalizedNode<?, ?>> updatedData;
-        private Map<YangInstanceIdentifier, NormalizedNode<?, ?>> originalData;
-        private NormalizedNode<?, ?> originalSubTree;
-        private NormalizedNode<?, ?> updatedSubTree;
-        private Set<YangInstanceIdentifier> removedPathIds;
-
-        DataChangedEvent(SchemaContext schemaContext) {
-            nodeCodec = new NormalizedNodeToNodeCodec(schemaContext);
-        }
-
-        @Override
-        public Map<YangInstanceIdentifier, NormalizedNode<?, ?>> getCreatedData() {
-            if(createdData == null){
-                return Collections.emptyMap();
-            }
-            return createdData;
-        }
-
-        DataChangedEvent setCreatedData(
-            NormalizedNodeMessages.NodeMap nodeMap) {
-            this.createdData = convertNodeMapToMap(nodeMap);
-            return this;
-        }
-
-        private Map<YangInstanceIdentifier, NormalizedNode<?, ?>> convertNodeMapToMap(
-            NormalizedNodeMessages.NodeMap nodeMap) {
-            Map<YangInstanceIdentifier, NormalizedNode<?, ?>> mapEntries =
-                new HashMap<YangInstanceIdentifier, NormalizedNode<?, ?>>();
-            for (NormalizedNodeMessages.NodeMapEntry nodeMapEntry : nodeMap
-                .getMapEntriesList()) {
-                YangInstanceIdentifier id = InstanceIdentifierUtils
-                    .fromSerializable(nodeMapEntry.getInstanceIdentifierPath());
-                mapEntries.put(id,
-                    nodeCodec.decode(nodeMapEntry.getNormalizedNode()));
-            }
-            return mapEntries;
-        }
+    @Override
+    public void writeExternal(ObjectOutput out) throws IOException {
+        out.writeShort(DataStoreVersions.CURRENT_VERSION);
 
+        NormalizedNodeOutputStreamWriter streamWriter = new NormalizedNodeOutputStreamWriter(out);
+        NormalizedNodeWriter nodeWriter = NormalizedNodeWriter.forStreamWriter(streamWriter);
 
-        @Override
-        public Map<YangInstanceIdentifier, NormalizedNode<?, ?>> getUpdatedData() {
-            if(updatedData == null){
-                return Collections.emptyMap();
-            }
-            return updatedData;
-        }
+        // Write created data
 
-        DataChangedEvent setUpdateData(NormalizedNodeMessages.NodeMap nodeMap) {
-            this.updatedData = convertNodeMapToMap(nodeMap);
-            return this;
+        Map<YangInstanceIdentifier, NormalizedNode<?, ?>> createdData = change.getCreatedData();
+        out.writeInt(createdData.size());
+        for(Map.Entry<YangInstanceIdentifier, NormalizedNode<?, ?>> e: createdData.entrySet()) {
+            streamWriter.writeYangInstanceIdentifier(e.getKey());
+            nodeWriter.write(e.getValue());
         }
 
-        @Override
-        public Set<YangInstanceIdentifier> getRemovedPaths() {
-            if (removedPathIds == null) {
-                return Collections.emptySet();
-            }
-            return removedPathIds;
-        }
+        // Write updated data
 
-        public DataChangedEvent setRemovedPaths(List<NormalizedNodeMessages.InstanceIdentifier> removedPaths) {
-            Set<YangInstanceIdentifier> removedIds = new HashSet<>();
-            for (NormalizedNodeMessages.InstanceIdentifier path : removedPaths) {
-                removedIds.add(InstanceIdentifierUtils.fromSerializable(path));
-            }
-            this.removedPathIds = removedIds;
-            return this;
+        Map<YangInstanceIdentifier, NormalizedNode<?, ?>> originalData = change.getOriginalData();
+        Map<YangInstanceIdentifier, NormalizedNode<?, ?>> updatedData = change.getUpdatedData();
+        out.writeInt(updatedData.size());
+        for(Map.Entry<YangInstanceIdentifier, NormalizedNode<?, ?>> e: updatedData.entrySet()) {
+            streamWriter.writeYangInstanceIdentifier(e.getKey());
+            nodeWriter.write(originalData.get(e.getKey()));
+            nodeWriter.write(e.getValue());
         }
 
-        @Override
-        public Map<YangInstanceIdentifier, NormalizedNode<?, ?>> getOriginalData() {
-            if (originalData == null) {
-                Collections.emptyMap();
-            }
-            return originalData;
-        }
+        // Write removed data
 
-        DataChangedEvent setOriginalData(
-            NormalizedNodeMessages.NodeMap nodeMap) {
-            this.originalData = convertNodeMapToMap(nodeMap);
-            return this;
+        Set<YangInstanceIdentifier> removed = change.getRemovedPaths();
+        out.writeInt(removed.size());
+        for(YangInstanceIdentifier path: removed) {
+            streamWriter.writeYangInstanceIdentifier(path);
+            nodeWriter.write(originalData.get(path));
         }
 
-        @Override
-        public NormalizedNode<?, ?> getOriginalSubtree() {
-            return originalSubTree;
-        }
+        // Write original subtree
 
-        DataChangedEvent setOriginalSubtree(NormalizedNodeMessages.Node node,
-            YangInstanceIdentifier instanceIdentifierPath) {
-            originalSubTree = nodeCodec.decode(node);
-            return this;
+        NormalizedNode<?, ?> originalSubtree = change.getOriginalSubtree();
+        out.writeBoolean(originalSubtree != null);
+        if(originalSubtree != null) {
+            nodeWriter.write(originalSubtree);
         }
 
-        @Override
-        public NormalizedNode<?, ?> getUpdatedSubtree() {
-            return updatedSubTree;
-        }
+        // Write original subtree
 
-        DataChangedEvent setUpdatedSubtree(NormalizedNodeMessages.Node node,
-            YangInstanceIdentifier instanceIdentifierPath) {
-            updatedSubTree = nodeCodec.decode(node);
-            return this;
+        NormalizedNode<?, ?> updatedSubtree = change.getUpdatedSubtree();
+        out.writeBoolean(updatedSubtree != null);
+        if(updatedSubtree != null) {
+            nodeWriter.write(updatedSubtree);
         }
-
-
     }
-
-
-
 }
index e10a407..2db0344 100644 (file)
@@ -11,10 +11,16 @@ package org.opendaylight.controller.cluster.datastore.messages;
 import org.opendaylight.controller.protobuff.messages.datachange.notification.DataChangeListenerMessages;
 
 public class DataChangedReply implements SerializableMessage {
-  public static final Class<DataChangeListenerMessages.DataChangedReply> SERIALIZABLE_CLASS =
-          DataChangeListenerMessages.DataChangedReply.class;
-  @Override
-  public Object toSerializable() {
-    return DataChangeListenerMessages.DataChangedReply.newBuilder().build();
-  }
+    public static final Class<DataChangeListenerMessages.DataChangedReply> SERIALIZABLE_CLASS =
+            DataChangeListenerMessages.DataChangedReply.class;
+
+    private static final Object SERIALIZED_INSTANCE =
+            DataChangeListenerMessages.DataChangedReply.newBuilder().build();
+
+    public static final DataChangedReply INSTANCE = new DataChangedReply();
+
+    @Override
+    public Object toSerializable() {
+        return SERIALIZED_INSTANCE;
+    }
 }
index 2e02664..eba9c39 100644 (file)
@@ -11,10 +11,15 @@ package org.opendaylight.controller.cluster.datastore.messages;
 import org.opendaylight.controller.protobuff.messages.transaction.ShardTransactionMessages;
 
 public class DeleteDataReply implements SerializableMessage{
-  public static final Class<ShardTransactionMessages.DeleteDataReply> SERIALIZABLE_CLASS =
-          ShardTransactionMessages.DeleteDataReply.class;
-  @Override
-  public Object toSerializable() {
-    return ShardTransactionMessages.DeleteDataReply.newBuilder().build();
-  }
+    public static final Class<ShardTransactionMessages.DeleteDataReply> SERIALIZABLE_CLASS =
+            ShardTransactionMessages.DeleteDataReply.class;
+
+    private static final Object SERIALIZED_INSTANCE = ShardTransactionMessages.DeleteDataReply.newBuilder().build();
+
+    public static final DeleteDataReply INSTANCE = new DeleteDataReply();
+
+    @Override
+    public Object toSerializable() {
+        return SERIALIZED_INSTANCE;
+    }
 }
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/EmptyExternalizable.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/EmptyExternalizable.java
new file mode 100644 (file)
index 0000000..0b7b262
--- /dev/null
@@ -0,0 +1,29 @@
+/*
+ * Copyright (c) 2015 Brocade Communications Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.datastore.messages;
+
+import java.io.Externalizable;
+import java.io.IOException;
+import java.io.ObjectInput;
+import java.io.ObjectOutput;
+
+/**
+ * Externalizable with no data.
+ *
+ * @author Thomas Pantelis
+ */
+public class EmptyExternalizable implements Externalizable {
+
+    @Override
+    public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
+    }
+
+    @Override
+    public void writeExternal(ObjectOutput out) throws IOException {
+    }
+}
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/EmptyReply.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/EmptyReply.java
new file mode 100644 (file)
index 0000000..284c6ef
--- /dev/null
@@ -0,0 +1,30 @@
+/*
+ * Copyright (c) 2015 Brocade Communications Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.datastore.messages;
+
+import org.opendaylight.controller.cluster.datastore.DataStoreVersions;
+
+/**
+ * A reply with no data.
+ *
+ * @author Thomas Pantelis
+ */
+public abstract class EmptyReply extends EmptyExternalizable implements VersionedSerializableMessage {
+
+    private final Object legacySerializedInstance;
+
+    protected EmptyReply(Object legacySerializedInstance) {
+        super();
+        this.legacySerializedInstance = legacySerializedInstance;
+    }
+
+    @Override
+    public Object toSerializable(short toVersion) {
+        return toVersion >= DataStoreVersions.LITHIUM_VERSION ? this : legacySerializedInstance;
+    }
+}
index eb1f349..9234385 100644 (file)
@@ -8,36 +8,54 @@
 
 package org.opendaylight.controller.cluster.datastore.messages;
 
+import org.opendaylight.controller.cluster.datastore.DataStoreVersions;
 import org.opendaylight.controller.cluster.datastore.node.NormalizedNodeToNodeCodec;
 import org.opendaylight.controller.cluster.datastore.node.NormalizedNodeToNodeCodec.Decoded;
 import org.opendaylight.controller.cluster.datastore.node.NormalizedNodeToNodeCodec.Encoded;
 import org.opendaylight.controller.protobuff.messages.transaction.ShardTransactionMessages;
 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
-import org.opendaylight.yangtools.yang.model.api.SchemaContext;
 
-public class MergeData extends ModifyData{
+public class MergeData extends ModifyData implements VersionedSerializableMessage {
+    private static final long serialVersionUID = 1L;
 
-    public static final Class<ShardTransactionMessages.MergeData> SERIALIZABLE_CLASS =
-            ShardTransactionMessages.MergeData.class;
+    public static final Class<MergeData> SERIALIZABLE_CLASS = MergeData.class;
 
-    public MergeData(YangInstanceIdentifier path, NormalizedNode<?, ?> data,
-        SchemaContext context) {
-        super(path, data, context);
+    public MergeData() {
+    }
+
+    public MergeData(YangInstanceIdentifier path, NormalizedNode<?, ?> data) {
+        super(path, data);
     }
 
     @Override
-    public Object toSerializable() {
-        Encoded encoded = new NormalizedNodeToNodeCodec(schemaContext).encode(path, data);
-        return ShardTransactionMessages.MergeData.newBuilder()
-            .setInstanceIdentifierPathArguments(encoded.getEncodedPath())
-            .setNormalizedNode(encoded.getEncodedNode().getNormalizedNode()).build();
+    public Object toSerializable(short toVersion) {
+        if(toVersion >= DataStoreVersions.LITHIUM_VERSION) {
+            setVersion(toVersion);
+            return this;
+        } else {
+            // To base or R1 Helium version
+            Encoded encoded = new NormalizedNodeToNodeCodec(null).encode(getPath(), getData());
+            return ShardTransactionMessages.MergeData.newBuilder()
+                    .setInstanceIdentifierPathArguments(encoded.getEncodedPath())
+                    .setNormalizedNode(encoded.getEncodedNode().getNormalizedNode()).build();
+        }
+    }
+
+    public static MergeData fromSerializable(Object serializable){
+        if(serializable instanceof MergeData) {
+            return (MergeData) serializable;
+        } else {
+            // From base or R1 Helium version
+            ShardTransactionMessages.MergeData o = (ShardTransactionMessages.MergeData) serializable;
+            Decoded decoded = new NormalizedNodeToNodeCodec(null).decode(
+                    o.getInstanceIdentifierPathArguments(), o.getNormalizedNode());
+            return new MergeData(decoded.getDecodedPath(), decoded.getDecodedNode());
+        }
     }
 
-    public static MergeData fromSerializable(Object serializable, SchemaContext schemaContext){
-        ShardTransactionMessages.MergeData o = (ShardTransactionMessages.MergeData) serializable;
-        Decoded decoded = new NormalizedNodeToNodeCodec(schemaContext).decode(
-                o.getInstanceIdentifierPathArguments(), o.getNormalizedNode());
-        return new MergeData(decoded.getDecodedPath(), decoded.getDecodedNode(), schemaContext);
+    public static boolean isSerializedType(Object message) {
+        return SERIALIZABLE_CLASS.isAssignableFrom(message.getClass()) ||
+               message instanceof ShardTransactionMessages.MergeData;
     }
 }
index 92d6d72..a4c514b 100644 (file)
@@ -10,12 +10,15 @@ package org.opendaylight.controller.cluster.datastore.messages;
 
 import org.opendaylight.controller.protobuff.messages.transaction.ShardTransactionMessages;
 
-public class MergeDataReply implements SerializableMessage{
-  public static final Class<ShardTransactionMessages.MergeDataReply> SERIALIZABLE_CLASS =
-          ShardTransactionMessages.MergeDataReply.class;
+public class MergeDataReply extends EmptyReply {
+    private static final long serialVersionUID = 1L;
 
-  @Override
-  public Object toSerializable() {
-    return ShardTransactionMessages.MergeDataReply.newBuilder().build();
-  }
+    private static final Object LEGACY_SERIALIZED_INSTANCE =
+            ShardTransactionMessages.MergeDataReply.newBuilder().build();
+
+    public static final MergeDataReply INSTANCE = new MergeDataReply();
+
+    public MergeDataReply() {
+        super(LEGACY_SERIALIZED_INSTANCE);
+    }
 }
index b5c39d1..69c41c2 100644 (file)
@@ -8,25 +8,28 @@
 
 package org.opendaylight.controller.cluster.datastore.messages;
 
-import com.google.common.base.Preconditions;
+import java.io.Externalizable;
+import java.io.IOException;
+import java.io.ObjectInput;
+import java.io.ObjectOutput;
+import org.opendaylight.controller.cluster.datastore.utils.SerializationUtils;
+import org.opendaylight.controller.cluster.datastore.utils.SerializationUtils.Applier;
 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
-import org.opendaylight.yangtools.yang.model.api.SchemaContext;
 
-public abstract class ModifyData implements SerializableMessage {
-    protected final YangInstanceIdentifier path;
-    protected final NormalizedNode<?, ?> data;
-    protected final SchemaContext schemaContext;
+public abstract class ModifyData implements Externalizable {
+    private static final long serialVersionUID = 1L;
 
-    public ModifyData(YangInstanceIdentifier path, NormalizedNode<?, ?> data,
-        SchemaContext context) {
-        Preconditions.checkNotNull(context,
-            "Cannot serialize an object which does not have a schema schemaContext");
+    private YangInstanceIdentifier path;
+    private NormalizedNode<?, ?> data;
+    private short version;
 
+    protected ModifyData() {
+    }
 
+    protected ModifyData(YangInstanceIdentifier path, NormalizedNode<?, ?> data) {
         this.path = path;
         this.data = data;
-        this.schemaContext = context;
     }
 
     public YangInstanceIdentifier getPath() {
@@ -37,4 +40,31 @@ public abstract class ModifyData implements SerializableMessage {
         return data;
     }
 
+    public short getVersion() {
+        return version;
+    }
+
+    protected void setVersion(short version) {
+        this.version = version;
+    }
+
+    @Override
+    public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
+        version = in.readShort();
+        SerializationUtils.deserializePathAndNode(in, this, APPLIER);
+    }
+
+    @Override
+    public void writeExternal(ObjectOutput out) throws IOException {
+        out.writeShort(version);
+        SerializationUtils.serializePathAndNode(path, data, out);
+    }
+
+    private static final Applier<ModifyData> APPLIER = new Applier<ModifyData>() {
+        @Override
+        public void apply(ModifyData instance, YangInstanceIdentifier path, NormalizedNode<?, ?> data) {
+            instance.path = path;
+            instance.data = data;
+        }
+    };
 }
index 43dd812..8ac6e1b 100644 (file)
@@ -9,23 +9,29 @@
 package org.opendaylight.controller.cluster.datastore.messages;
 
 import com.google.protobuf.ByteString;
+import java.io.Externalizable;
+import java.io.IOException;
+import java.io.ObjectInput;
+import java.io.ObjectOutput;
+import org.opendaylight.controller.cluster.datastore.DataStoreVersions;
 import org.opendaylight.controller.cluster.datastore.node.NormalizedNodeToNodeCodec;
+import org.opendaylight.controller.cluster.datastore.utils.SerializationUtils;
 import org.opendaylight.controller.protobuff.messages.transaction.ShardTransactionMessages;
-import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
-import org.opendaylight.yangtools.yang.model.api.SchemaContext;
 
-public class ReadDataReply implements SerializableMessage {
-    public static final Class<ShardTransactionMessages.ReadDataReply> SERIALIZABLE_CLASS =
-            ShardTransactionMessages.ReadDataReply.class;
+public class ReadDataReply implements VersionedSerializableMessage, Externalizable {
+    private static final long serialVersionUID = 1L;
 
-    private final NormalizedNode<?, ?> normalizedNode;
-    private final SchemaContext schemaContext;
+    public static final Class<ReadDataReply> SERIALIZABLE_CLASS = ReadDataReply.class;
 
-    public ReadDataReply(SchemaContext context,NormalizedNode<?, ?> normalizedNode){
+    private NormalizedNode<?, ?> normalizedNode;
+    private short version;
 
+    public ReadDataReply() {
+    }
+
+    public ReadDataReply(NormalizedNode<?, ?> normalizedNode) {
         this.normalizedNode = normalizedNode;
-        this.schemaContext = context;
     }
 
     public NormalizedNode<?, ?> getNormalizedNode() {
@@ -33,26 +39,62 @@ public class ReadDataReply implements SerializableMessage {
     }
 
     @Override
-    public Object toSerializable(){
+    public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
+        version = in.readShort();
+        normalizedNode = SerializationUtils.deserializeNormalizedNode(in);
+    }
+
+    @Override
+    public void writeExternal(ObjectOutput out) throws IOException {
+        out.writeShort(version);
+        SerializationUtils.serializeNormalizedNode(normalizedNode, out);
+    }
+
+    @Override
+    public Object toSerializable(short toVersion) {
+        if(toVersion >= DataStoreVersions.LITHIUM_VERSION) {
+            version = toVersion;
+            return this;
+        } else {
+            return toSerializableReadDataReply(normalizedNode);
+        }
+    }
+
+    private static ShardTransactionMessages.ReadDataReply toSerializableReadDataReply(
+            NormalizedNode<?, ?> normalizedNode) {
         if(normalizedNode != null) {
             return ShardTransactionMessages.ReadDataReply.newBuilder()
-                    .setNormalizedNode(new NormalizedNodeToNodeCodec(schemaContext)
-                        .encode(normalizedNode).getNormalizedNode()).build();
+                    .setNormalizedNode(new NormalizedNodeToNodeCodec(null)
+                    .encode(normalizedNode).getNormalizedNode()).build();
         } else {
             return ShardTransactionMessages.ReadDataReply.newBuilder().build();
 
         }
     }
 
-    public static ReadDataReply fromSerializable(SchemaContext schemaContext,
-            YangInstanceIdentifier id, Object serializable) {
-        ShardTransactionMessages.ReadDataReply o = (ShardTransactionMessages.ReadDataReply) serializable;
-        return new ReadDataReply(schemaContext, new NormalizedNodeToNodeCodec(schemaContext).decode(
-                o.getNormalizedNode()));
+    public static ReadDataReply fromSerializable(Object serializable) {
+        if(serializable instanceof ReadDataReply) {
+            return (ReadDataReply) serializable;
+        } else {
+            ShardTransactionMessages.ReadDataReply o =
+                    (ShardTransactionMessages.ReadDataReply) serializable;
+            return new ReadDataReply(new NormalizedNodeToNodeCodec(null).decode(o.getNormalizedNode()));
+        }
+    }
+
+    public static ByteString fromSerializableAsByteString(Object serializable) {
+        if(serializable instanceof ReadDataReply) {
+            ReadDataReply r = (ReadDataReply)serializable;
+            return toSerializableReadDataReply(r.getNormalizedNode()).toByteString();
+        } else {
+            ShardTransactionMessages.ReadDataReply o =
+                    (ShardTransactionMessages.ReadDataReply) serializable;
+            return o.getNormalizedNode().toByteString();
+        }
     }
 
-    public static ByteString getNormalizedNodeByteString(Object serializable){
-        ShardTransactionMessages.ReadDataReply o = (ShardTransactionMessages.ReadDataReply) serializable;
-        return ((ShardTransactionMessages.ReadDataReply) serializable).getNormalizedNode().toByteString();
+    public static boolean isSerializedType(Object message) {
+        return SERIALIZABLE_CLASS.isAssignableFrom(message.getClass()) ||
+               message instanceof ShardTransactionMessages.ReadDataReply;
     }
 }
index 581caef..09617ab 100644 (file)
@@ -11,12 +11,15 @@ package org.opendaylight.controller.cluster.datastore.messages;
 import org.opendaylight.controller.protobuff.messages.transaction.ShardTransactionMessages;
 
 public class ReadyTransaction implements SerializableMessage{
-  public static final Class<ShardTransactionMessages.ReadyTransaction> SERIALIZABLE_CLASS =
-          ShardTransactionMessages.ReadyTransaction.class;
+    public static final Class<ShardTransactionMessages.ReadyTransaction> SERIALIZABLE_CLASS =
+            ShardTransactionMessages.ReadyTransaction.class;
 
-  @Override
-  public Object toSerializable() {
-    return ShardTransactionMessages.ReadyTransaction.newBuilder().build();
-  }
+    private static final Object SERIALIZED_INSTANCE = ShardTransactionMessages.ReadyTransaction.newBuilder().build();
 
+    public static final ReadyTransaction INSTANCE = new ReadyTransaction();
+
+    @Override
+    public Object toSerializable() {
+        return SERIALIZED_INSTANCE;
+    }
 }
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/VersionedSerializableMessage.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/VersionedSerializableMessage.java
new file mode 100644 (file)
index 0000000..5c30b10
--- /dev/null
@@ -0,0 +1,17 @@
+/*
+ * Copyright (c) 2014 Brocade Communications Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.datastore.messages;
+
+/**
+ * Interface for a Serializable message with versioning.
+ *
+ * @author Thomas Pantelis
+ */
+public interface VersionedSerializableMessage {
+    Object toSerializable(short toVersion);
+}
index 8aa63ef..c5e3a6b 100644 (file)
@@ -8,35 +8,54 @@
 
 package org.opendaylight.controller.cluster.datastore.messages;
 
+import org.opendaylight.controller.cluster.datastore.DataStoreVersions;
 import org.opendaylight.controller.cluster.datastore.node.NormalizedNodeToNodeCodec;
 import org.opendaylight.controller.cluster.datastore.node.NormalizedNodeToNodeCodec.Decoded;
 import org.opendaylight.controller.cluster.datastore.node.NormalizedNodeToNodeCodec.Encoded;
 import org.opendaylight.controller.protobuff.messages.transaction.ShardTransactionMessages;
 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
-import org.opendaylight.yangtools.yang.model.api.SchemaContext;
 
-public class WriteData extends ModifyData {
+public class WriteData extends ModifyData implements VersionedSerializableMessage {
+    private static final long serialVersionUID = 1L;
 
-    public static final Class<ShardTransactionMessages.WriteData> SERIALIZABLE_CLASS =
-            ShardTransactionMessages.WriteData.class;
+    public static final Class<WriteData> SERIALIZABLE_CLASS = WriteData.class;
 
-    public WriteData(YangInstanceIdentifier path, NormalizedNode<?, ?> data, SchemaContext schemaContext) {
-        super(path, data, schemaContext);
+    public WriteData() {
+    }
+
+    public WriteData(YangInstanceIdentifier path, NormalizedNode<?, ?> data) {
+        super(path, data);
     }
 
     @Override
-    public Object toSerializable() {
-        Encoded encoded = new NormalizedNodeToNodeCodec(schemaContext).encode(path, data);
-        return ShardTransactionMessages.WriteData.newBuilder()
-                .setInstanceIdentifierPathArguments(encoded.getEncodedPath())
-                .setNormalizedNode(encoded.getEncodedNode().getNormalizedNode()).build();
+    public Object toSerializable(short toVersion) {
+        if(toVersion >= DataStoreVersions.LITHIUM_VERSION) {
+            setVersion(toVersion);
+            return this;
+        } else {
+            // To base or R1 Helium version
+            Encoded encoded = new NormalizedNodeToNodeCodec(null).encode(getPath(), getData());
+            return ShardTransactionMessages.WriteData.newBuilder()
+                    .setInstanceIdentifierPathArguments(encoded.getEncodedPath())
+                    .setNormalizedNode(encoded.getEncodedNode().getNormalizedNode()).build();
+        }
+    }
+
+    public static WriteData fromSerializable(Object serializable) {
+        if(serializable instanceof WriteData) {
+            return (WriteData) serializable;
+        } else {
+            // From base or R1 Helium version
+            ShardTransactionMessages.WriteData o = (ShardTransactionMessages.WriteData) serializable;
+            Decoded decoded = new NormalizedNodeToNodeCodec(null).decode(
+                    o.getInstanceIdentifierPathArguments(), o.getNormalizedNode());
+            return new WriteData(decoded.getDecodedPath(), decoded.getDecodedNode());
+        }
     }
 
-    public static WriteData fromSerializable(Object serializable, SchemaContext schemaContext){
-        ShardTransactionMessages.WriteData o = (ShardTransactionMessages.WriteData) serializable;
-        Decoded decoded = new NormalizedNodeToNodeCodec(schemaContext).decode(
-                o.getInstanceIdentifierPathArguments(), o.getNormalizedNode());
-        return new WriteData(decoded.getDecodedPath(), decoded.getDecodedNode(), schemaContext);
+    public static boolean isSerializedType(Object message) {
+        return SERIALIZABLE_CLASS.isAssignableFrom(message.getClass()) ||
+               message instanceof ShardTransactionMessages.WriteData;
     }
 }
index 876105d..8255828 100644 (file)
@@ -10,11 +10,15 @@ package org.opendaylight.controller.cluster.datastore.messages;
 
 import org.opendaylight.controller.protobuff.messages.transaction.ShardTransactionMessages;
 
-public class WriteDataReply implements SerializableMessage{
-  public static final Class<ShardTransactionMessages.WriteDataReply> SERIALIZABLE_CLASS =
-          ShardTransactionMessages.WriteDataReply.class;
-  @Override
-  public Object toSerializable() {
-    return ShardTransactionMessages.WriteDataReply.newBuilder().build();
-  }
+public class WriteDataReply extends EmptyReply {
+    private static final long serialVersionUID = 1L;
+
+    private static final Object LEGACY_SERIALIZED_INSTANCE =
+            ShardTransactionMessages.WriteDataReply.newBuilder().build();
+
+    public static final WriteDataReply INSTANCE = new WriteDataReply();
+
+    public WriteDataReply() {
+        super(LEGACY_SERIALIZED_INSTANCE);
+    }
 }
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/utils/SerializationUtils.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/utils/SerializationUtils.java
new file mode 100644 (file)
index 0000000..8404a6e
--- /dev/null
@@ -0,0 +1,81 @@
+/*
+ * Copyright (c) 2014 Brocade Communications Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.datastore.utils;
+
+import com.google.common.base.Preconditions;
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import org.opendaylight.controller.cluster.datastore.node.utils.stream.NormalizedNodeInputStreamReader;
+import org.opendaylight.controller.cluster.datastore.node.utils.stream.NormalizedNodeOutputStreamWriter;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
+import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
+import org.opendaylight.yangtools.yang.data.api.schema.stream.NormalizedNodeWriter;
+
+/**
+ * Provides various utility methods for serialization and de-serialization.
+ *
+ * @author Thomas Pantelis
+ */
+public final class SerializationUtils {
+    public static interface Applier<T> {
+        void apply(T instance, YangInstanceIdentifier path, NormalizedNode<?, ?> node);
+    }
+
+    public static void serializePathAndNode(YangInstanceIdentifier path, NormalizedNode<?, ?> node,
+            DataOutput out) {
+        Preconditions.checkNotNull(path);
+        Preconditions.checkNotNull(node);
+        try {
+            NormalizedNodeOutputStreamWriter streamWriter = new NormalizedNodeOutputStreamWriter(out);
+            NormalizedNodeWriter.forStreamWriter(streamWriter).write(node);
+            streamWriter.writeYangInstanceIdentifier(path);
+        } catch (IOException e) {
+            throw new IllegalArgumentException(String.format("Error serializing path {} and Node {}",
+                    path, node), e);
+        }
+    }
+
+    public static <T> void deserializePathAndNode(DataInput in, T instance, Applier<T> applier) {
+        try {
+            NormalizedNodeInputStreamReader streamReader = new NormalizedNodeInputStreamReader(in);
+            NormalizedNode<?, ?> node = streamReader.readNormalizedNode();
+            YangInstanceIdentifier path = streamReader.readYangInstanceIdentifier();
+            applier.apply(instance, path, node);
+        } catch (IOException e) {
+            throw new IllegalArgumentException("Error deserializing path and Node", e);
+        }
+    }
+
+    public static void serializeNormalizedNode(NormalizedNode<?, ?> node, DataOutput out) {
+        try {
+            out.writeBoolean(node != null);
+            if(node != null) {
+                NormalizedNodeOutputStreamWriter streamWriter = new NormalizedNodeOutputStreamWriter(out);
+                NormalizedNodeWriter.forStreamWriter(streamWriter).write(node);
+            }
+        } catch (IOException e) {
+            throw new IllegalArgumentException(String.format("Error serializing NormalizedNode {}",
+                    node), e);
+        }
+    }
+
+    public static NormalizedNode<?, ?> deserializeNormalizedNode(DataInput in) {
+            try {
+                boolean present = in.readBoolean();
+                if(present) {
+                    NormalizedNodeInputStreamReader streamReader = new NormalizedNodeInputStreamReader(in);
+                    return streamReader.readNormalizedNode();
+                }
+            } catch (IOException e) {
+                throw new IllegalArgumentException("Error deserializing NormalizedNode", e);
+            }
+
+        return null;
+    }
+}
index 55250dd..5485c57 100644 (file)
@@ -17,11 +17,9 @@ import org.opendaylight.controller.cluster.datastore.utils.MessageCollectorActor
 import org.opendaylight.controller.cluster.datastore.utils.MockClusterWrapper;
 import org.opendaylight.controller.cluster.datastore.utils.MockConfiguration;
 import org.opendaylight.controller.md.cluster.datastore.model.CompositeModel;
-import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
 import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeEvent;
 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
-
 public class DataChangeListenerProxyTest extends AbstractActorTest {
 
   private static class MockDataChangedEvent implements AsyncDataChangeEvent<YangInstanceIdentifier, NormalizedNode<?, ?>> {
@@ -73,7 +71,7 @@ public class DataChangeListenerProxyTest extends AbstractActorTest {
         final ActorRef actorRef = getSystem().actorOf(props);
 
         DataChangeListenerProxy dataChangeListenerProxy = new DataChangeListenerProxy(
-                TestModel.createTestContext(), getSystem().actorSelection(actorRef.path()));
+                getSystem().actorSelection(actorRef.path()));
 
         dataChangeListenerProxy.onDataChanged(new MockDataChangedEvent());
 
index 25d4738..19f0f8c 100644 (file)
@@ -29,7 +29,7 @@ public class DataChangeListenerTest extends AbstractActorTest {
             // Let the DataChangeListener know that notifications should be enabled
             subject.tell(new EnableNotification(true), getRef());
 
-            subject.tell(new DataChanged(CompositeModel.createTestContext(), mockChangeEvent),
+            subject.tell(new DataChanged(mockChangeEvent),
                     getRef());
 
             expectMsgClass(DataChangedReply.class);
@@ -48,7 +48,7 @@ public class DataChangeListenerTest extends AbstractActorTest {
             final ActorRef subject =
                 getSystem().actorOf(props, "testDataChangedNotificationsDisabled");
 
-            subject.tell(new DataChanged(CompositeModel.createTestContext(), mockChangeEvent),
+            subject.tell(new DataChanged(mockChangeEvent),
                     getRef());
 
             new Within(duration("1 seconds")) {
@@ -74,8 +74,7 @@ public class DataChangeListenerTest extends AbstractActorTest {
 
             getSystem().eventStream().subscribe(getRef(), DeadLetter.class);
 
-            subject.tell(new DataChanged(CompositeModel.createTestContext(), mockChangeEvent),
-                    ActorRef.noSender());
+            subject.tell(new DataChanged(mockChangeEvent), ActorRef.noSender());
 
             // Make sure no DataChangedReply is sent to DeadLetters.
             while(true) {
@@ -113,13 +112,13 @@ public class DataChangeListenerTest extends AbstractActorTest {
 
             SchemaContext schemaContext = CompositeModel.createTestContext();
 
-            subject.tell(new DataChanged(schemaContext, mockChangeEvent1),getRef());
+            subject.tell(new DataChanged(mockChangeEvent1),getRef());
             expectMsgClass(DataChangedReply.class);
 
-            subject.tell(new DataChanged(schemaContext, mockChangeEvent2),getRef());
+            subject.tell(new DataChanged(mockChangeEvent2),getRef());
             expectMsgClass(DataChangedReply.class);
 
-            subject.tell(new DataChanged(schemaContext, mockChangeEvent3),getRef());
+            subject.tell(new DataChanged(mockChangeEvent3),getRef());
             expectMsgClass(DataChangedReply.class);
 
             Mockito.verify(mockListener).onDataChanged(mockChangeEvent1);
index ed842b2..42f3043 100644 (file)
@@ -10,7 +10,7 @@ import static org.mockito.Mockito.doAnswer;
 import static org.mockito.Mockito.doReturn;
 import static org.mockito.Mockito.inOrder;
 import static org.mockito.Mockito.mock;
-import static org.opendaylight.controller.cluster.datastore.messages.CreateTransaction.CURRENT_VERSION;
+import static org.opendaylight.controller.cluster.datastore.DataStoreVersions.CURRENT_VERSION;
 import akka.actor.ActorRef;
 import akka.actor.PoisonPill;
 import akka.actor.Props;
index 9f57359..09a4532 100644 (file)
@@ -21,7 +21,6 @@ import org.junit.BeforeClass;
 import org.junit.Test;
 import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
 import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shard.ShardStats;
-import org.opendaylight.controller.cluster.datastore.messages.CreateTransaction;
 import org.opendaylight.controller.cluster.datastore.node.utils.serialization.NormalizedNodeSerializer;
 import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
 import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
@@ -72,7 +71,7 @@ public class ShardTransactionFailureTest extends AbstractActorTest {
         final ActorRef shard = createShard();
         final Props props = ShardTransaction.props(store.newReadOnlyTransaction(), shard,
                 testSchemaContext, datastoreContext, shardStats, "txn",
-                CreateTransaction.CURRENT_VERSION);
+                DataStoreVersions.CURRENT_VERSION);
 
         final TestActorRef<ShardTransaction> subject = TestActorRef
             .create(getSystem(), props,
@@ -102,7 +101,7 @@ public class ShardTransactionFailureTest extends AbstractActorTest {
         final ActorRef shard = createShard();
         final Props props = ShardTransaction.props(store.newReadWriteTransaction(), shard,
                 testSchemaContext, datastoreContext, shardStats, "txn",
-                CreateTransaction.CURRENT_VERSION);
+                DataStoreVersions.CURRENT_VERSION);
 
         final TestActorRef<ShardTransaction> subject = TestActorRef
             .create(getSystem(), props,
@@ -132,7 +131,7 @@ public class ShardTransactionFailureTest extends AbstractActorTest {
         final ActorRef shard = createShard();
         final Props props = ShardTransaction.props(store.newReadWriteTransaction(), shard,
                 testSchemaContext, datastoreContext, shardStats, "txn",
-                CreateTransaction.CURRENT_VERSION);
+                DataStoreVersions.CURRENT_VERSION);
 
         final TestActorRef<ShardTransaction> subject = TestActorRef
             .create(getSystem(), props,
@@ -162,7 +161,7 @@ public class ShardTransactionFailureTest extends AbstractActorTest {
         final ActorRef shard = createShard();
         final Props props = ShardTransaction.props(store.newWriteOnlyTransaction(), shard,
                 testSchemaContext, datastoreContext, shardStats, "txn",
-                CreateTransaction.CURRENT_VERSION);
+                DataStoreVersions.CURRENT_VERSION);
 
         final TestActorRef<ShardTransaction> subject = TestActorRef
             .create(getSystem(), props,
@@ -195,7 +194,7 @@ public class ShardTransactionFailureTest extends AbstractActorTest {
         final ActorRef shard = createShard();
         final Props props = ShardTransaction.props(store.newReadWriteTransaction(), shard,
                 testSchemaContext, datastoreContext, shardStats, "txn",
-                CreateTransaction.CURRENT_VERSION);
+                DataStoreVersions.CURRENT_VERSION);
 
         final TestActorRef<ShardTransaction> subject = TestActorRef
             .create(getSystem(), props,
@@ -233,7 +232,7 @@ public class ShardTransactionFailureTest extends AbstractActorTest {
         final ActorRef shard = createShard();
         final Props props = ShardTransaction.props(store.newReadWriteTransaction(), shard,
                 testSchemaContext, datastoreContext, shardStats, "txn",
-                CreateTransaction.CURRENT_VERSION);
+                DataStoreVersions.CURRENT_VERSION);
 
         final TestActorRef<ShardTransaction> subject = TestActorRef
             .create(getSystem(), props, "testNegativeMergeTransactionReady");
@@ -266,7 +265,7 @@ public class ShardTransactionFailureTest extends AbstractActorTest {
         final ActorRef shard = createShard();
         final Props props = ShardTransaction.props(store.newReadWriteTransaction(), shard,
                 testSchemaContext, datastoreContext, shardStats, "txn",
-                CreateTransaction.CURRENT_VERSION);
+                DataStoreVersions.CURRENT_VERSION);
 
         final TestActorRef<ShardTransaction> subject = TestActorRef
             .create(getSystem(), props,
index af07aee..58cec67 100644 (file)
@@ -7,6 +7,12 @@
  */
 package org.opendaylight.controller.cluster.datastore;
 
+import akka.actor.ActorRef;
+import akka.actor.ActorSelection;
+import akka.actor.PoisonPill;
+import akka.actor.Props;
+import akka.dispatch.Dispatchers;
+import akka.testkit.TestActorRef;
 import java.util.Collections;
 import org.junit.Assert;
 import org.junit.Test;
@@ -27,12 +33,6 @@ import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
 import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
 import scala.concurrent.duration.FiniteDuration;
-import akka.actor.ActorRef;
-import akka.actor.ActorSelection;
-import akka.actor.PoisonPill;
-import akka.actor.Props;
-import akka.dispatch.Dispatchers;
-import akka.testkit.TestActorRef;
 
 /**
  * Tests backwards compatibility support from Helium-1 to Helium.
@@ -46,6 +46,7 @@ import akka.testkit.TestActorRef;
  */
 public class ShardTransactionHeliumBackwardsCompatibilityTest extends AbstractActorTest {
 
+    @SuppressWarnings("unchecked")
     @Test
     public void testTransactionCommit() throws Exception {
         new ShardTestKit(getSystem()) {{
@@ -78,9 +79,10 @@ public class ShardTransactionHeliumBackwardsCompatibilityTest extends AbstractAc
             // Write data to the Tx
 
             txActor.tell(new WriteData(TestModel.TEST_PATH,
-                    ImmutableNodes.containerNode(TestModel.TEST_QNAME), schemaContext), getRef());
+                    ImmutableNodes.containerNode(TestModel.TEST_QNAME)).toSerializable(
+                            DataStoreVersions.BASE_HELIUM_VERSION), getRef());
 
-            expectMsgClass(duration, WriteDataReply.class);
+            expectMsgClass(duration, ShardTransactionMessages.WriteDataReply.class);
 
             // Ready the Tx
 
@@ -151,7 +153,7 @@ public class ShardTransactionHeliumBackwardsCompatibilityTest extends AbstractAc
             // Write data to the Tx
 
             txActor.tell(new WriteData(TestModel.TEST_PATH,
-                    ImmutableNodes.containerNode(TestModel.TEST_QNAME), schemaContext), getRef());
+                    ImmutableNodes.containerNode(TestModel.TEST_QNAME)), getRef());
 
             expectMsgClass(duration, WriteDataReply.class);
 
index f5af93d..79480ce 100644 (file)
@@ -20,7 +20,6 @@ import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier
 import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shard.ShardStats;
 import org.opendaylight.controller.cluster.datastore.messages.CloseTransaction;
 import org.opendaylight.controller.cluster.datastore.messages.CloseTransactionReply;
-import org.opendaylight.controller.cluster.datastore.messages.CreateTransaction;
 import org.opendaylight.controller.cluster.datastore.messages.DataExists;
 import org.opendaylight.controller.cluster.datastore.messages.DataExistsReply;
 import org.opendaylight.controller.cluster.datastore.messages.DeleteData;
@@ -38,6 +37,8 @@ import org.opendaylight.controller.cluster.datastore.modification.DeleteModifica
 import org.opendaylight.controller.cluster.datastore.modification.MergeModification;
 import org.opendaylight.controller.cluster.datastore.modification.Modification;
 import org.opendaylight.controller.cluster.datastore.modification.WriteModification;
+import org.opendaylight.controller.cluster.datastore.node.NormalizedNodeToNodeCodec;
+import org.opendaylight.controller.cluster.datastore.node.NormalizedNodeToNodeCodec.Encoded;
 import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
 import org.opendaylight.controller.md.sal.dom.store.impl.InMemoryDOMDataStore;
 import org.opendaylight.controller.protobuff.messages.transaction.ShardTransactionMessages;
@@ -76,13 +77,13 @@ public class ShardTransactionTest extends AbstractActorTest {
             final ActorRef shard = createShard();
             Props props = ShardTransaction.props(store.newReadOnlyTransaction(), shard,
                     testSchemaContext, datastoreContext, shardStats, "txn",
-                    CreateTransaction.CURRENT_VERSION);
+                    DataStoreVersions.CURRENT_VERSION);
 
             testOnReceiveReadData(getSystem().actorOf(props, "testReadDataRO"));
 
             props = ShardTransaction.props(store.newReadWriteTransaction(), shard,
                     testSchemaContext, datastoreContext, shardStats, "txn",
-                    CreateTransaction.CURRENT_VERSION);
+                    DataStoreVersions.CURRENT_VERSION);
 
             testOnReceiveReadData(getSystem().actorOf(props, "testReadDataRW"));
         }
@@ -92,12 +93,10 @@ public class ShardTransactionTest extends AbstractActorTest {
             transaction.tell(new ReadData(YangInstanceIdentifier.builder().build()).toSerializable(),
                 getRef());
 
-            ShardTransactionMessages.ReadDataReply replySerialized =
-                expectMsgClass(duration("5 seconds"), ReadDataReply.SERIALIZABLE_CLASS);
+            Object replySerialized =
+                    expectMsgClass(duration("5 seconds"), ReadDataReply.SERIALIZABLE_CLASS);
 
-            assertNotNull(ReadDataReply.fromSerializable(
-                testSchemaContext,YangInstanceIdentifier.builder().build(), replySerialized)
-                .getNormalizedNode());
+            assertNotNull(ReadDataReply.fromSerializable(replySerialized).getNormalizedNode());
 
             // unserialized read
             transaction.tell(new ReadData(YangInstanceIdentifier.builder().build()),getRef());
@@ -114,14 +113,14 @@ public class ShardTransactionTest extends AbstractActorTest {
             final ActorRef shard = createShard();
             Props props = ShardTransaction.props( store.newReadOnlyTransaction(), shard,
                     testSchemaContext, datastoreContext, shardStats, "txn",
-                    CreateTransaction.CURRENT_VERSION);
+                    DataStoreVersions.CURRENT_VERSION);
 
             testOnReceiveReadDataWhenDataNotFound(getSystem().actorOf(
                     props, "testReadDataWhenDataNotFoundRO"));
 
             props = ShardTransaction.props( store.newReadWriteTransaction(), shard,
                     testSchemaContext, datastoreContext, shardStats, "txn",
-                    CreateTransaction.CURRENT_VERSION);
+                    DataStoreVersions.CURRENT_VERSION);
 
             testOnReceiveReadDataWhenDataNotFound(getSystem().actorOf(
                     props, "testReadDataWhenDataNotFoundRW"));
@@ -131,11 +130,10 @@ public class ShardTransactionTest extends AbstractActorTest {
             // serialized read
             transaction.tell(new ReadData(TestModel.TEST_PATH).toSerializable(), getRef());
 
-            ShardTransactionMessages.ReadDataReply replySerialized =
-                expectMsgClass(duration("5 seconds"), ReadDataReply.SERIALIZABLE_CLASS);
+            Object replySerialized =
+                    expectMsgClass(duration("5 seconds"), ReadDataReply.SERIALIZABLE_CLASS);
 
-            assertTrue(ReadDataReply.fromSerializable(
-                testSchemaContext, TestModel.TEST_PATH, replySerialized).getNormalizedNode() == null);
+            assertTrue(ReadDataReply.fromSerializable(replySerialized).getNormalizedNode() == null);
 
             // unserialized read
             transaction.tell(new ReadData(TestModel.TEST_PATH),getRef());
@@ -146,19 +144,39 @@ public class ShardTransactionTest extends AbstractActorTest {
         }};
     }
 
+    @Test
+    public void testOnReceiveReadDataHeliumR1() throws Exception {
+        new JavaTestKit(getSystem()) {{
+            final ActorRef shard = createShard();
+            Props props = ShardTransaction.props(store.newReadOnlyTransaction(), shard,
+                    testSchemaContext, datastoreContext, shardStats, "txn",
+                    DataStoreVersions.HELIUM_1_VERSION);
+
+            ActorRef transaction = getSystem().actorOf(props, "testOnReceiveReadDataHeliumR1");
+
+            transaction.tell(new ReadData(YangInstanceIdentifier.builder().build()).toSerializable(),
+                    getRef());
+
+            ShardTransactionMessages.ReadDataReply replySerialized =
+                    expectMsgClass(duration("5 seconds"), ShardTransactionMessages.ReadDataReply.class);
+
+            assertNotNull(ReadDataReply.fromSerializable(replySerialized).getNormalizedNode());
+        }};
+    }
+
     @Test
     public void testOnReceiveDataExistsPositive() throws Exception {
         new JavaTestKit(getSystem()) {{
             final ActorRef shard = createShard();
             Props props = ShardTransaction.props(store.newReadOnlyTransaction(), shard,
                     testSchemaContext, datastoreContext, shardStats, "txn",
-                    CreateTransaction.CURRENT_VERSION);
+                    DataStoreVersions.CURRENT_VERSION);
 
             testOnReceiveDataExistsPositive(getSystem().actorOf(props, "testDataExistsPositiveRO"));
 
             props = ShardTransaction.props(store.newReadWriteTransaction(), shard,
                     testSchemaContext, datastoreContext, shardStats, "txn",
-                    CreateTransaction.CURRENT_VERSION);
+                    DataStoreVersions.CURRENT_VERSION);
 
             testOnReceiveDataExistsPositive(getSystem().actorOf(props, "testDataExistsPositiveRW"));
         }
@@ -187,13 +205,13 @@ public class ShardTransactionTest extends AbstractActorTest {
             final ActorRef shard = createShard();
             Props props = ShardTransaction.props(store.newReadOnlyTransaction(), shard,
                     testSchemaContext, datastoreContext, shardStats, "txn",
-                    CreateTransaction.CURRENT_VERSION);
+                    DataStoreVersions.CURRENT_VERSION);
 
             testOnReceiveDataExistsNegative(getSystem().actorOf(props, "testDataExistsNegativeRO"));
 
             props = ShardTransaction.props(store.newReadWriteTransaction(), shard,
                     testSchemaContext, datastoreContext, shardStats, "txn",
-                    CreateTransaction.CURRENT_VERSION);
+                    DataStoreVersions.CURRENT_VERSION);
 
             testOnReceiveDataExistsNegative(getSystem().actorOf(props, "testDataExistsNegativeRW"));
         }
@@ -234,39 +252,61 @@ public class ShardTransactionTest extends AbstractActorTest {
             final ActorRef shard = createShard();
             final Props props = ShardTransaction.props(store.newWriteOnlyTransaction(), shard,
                     testSchemaContext, datastoreContext, shardStats, "txn",
-                    CreateTransaction.CURRENT_VERSION);
-            final ActorRef transaction = getSystem().actorOf(props, "testWriteData");
+                    DataStoreVersions.CURRENT_VERSION);
+            final ActorRef transaction = getSystem().actorOf(props, "testOnReceiveWriteData");
 
             transaction.tell(new WriteData(TestModel.TEST_PATH,
-                ImmutableNodes.containerNode(TestModel.TEST_QNAME), TestModel.createTestContext()).toSerializable(),
-                getRef());
+                    ImmutableNodes.containerNode(TestModel.TEST_QNAME)).toSerializable(
+                            DataStoreVersions.HELIUM_2_VERSION), getRef());
 
             expectMsgClass(duration("5 seconds"), ShardTransactionMessages.WriteDataReply.class);
 
             assertModification(transaction, WriteModification.class);
 
-            //unserialized write
+            // unserialized write
             transaction.tell(new WriteData(TestModel.TEST_PATH,
-                ImmutableNodes.containerNode(TestModel.TEST_QNAME),
-                TestModel.createTestContext()),
+                ImmutableNodes.containerNode(TestModel.TEST_QNAME)),
                 getRef());
 
             expectMsgClass(duration("5 seconds"), WriteDataReply.class);
         }};
     }
 
+    @Test
+    public void testOnReceiveHeliumR1WriteData() throws Exception {
+        new JavaTestKit(getSystem()) {{
+            final ActorRef shard = createShard();
+            final Props props = ShardTransaction.props(store.newWriteOnlyTransaction(), shard,
+                    testSchemaContext, datastoreContext, shardStats, "txn",
+                    DataStoreVersions.HELIUM_1_VERSION);
+            final ActorRef transaction = getSystem().actorOf(props, "testOnReceiveHeliumR1WriteData");
+
+            Encoded encoded = new NormalizedNodeToNodeCodec(null).encode(TestModel.TEST_PATH,
+                    ImmutableNodes.containerNode(TestModel.TEST_QNAME));
+            ShardTransactionMessages.WriteData serialized = ShardTransactionMessages.WriteData.newBuilder()
+                    .setInstanceIdentifierPathArguments(encoded.getEncodedPath())
+                    .setNormalizedNode(encoded.getEncodedNode().getNormalizedNode()).build();
+
+            transaction.tell(serialized, getRef());
+
+            expectMsgClass(duration("5 seconds"), ShardTransactionMessages.WriteDataReply.class);
+
+            assertModification(transaction, WriteModification.class);
+        }};
+    }
+
     @Test
     public void testOnReceiveMergeData() throws Exception {
         new JavaTestKit(getSystem()) {{
             final ActorRef shard = createShard();
             final Props props = ShardTransaction.props(store.newReadWriteTransaction(), shard,
                     testSchemaContext, datastoreContext, shardStats, "txn",
-                    CreateTransaction.CURRENT_VERSION);
+                    DataStoreVersions.CURRENT_VERSION);
             final ActorRef transaction = getSystem().actorOf(props, "testMergeData");
 
             transaction.tell(new MergeData(TestModel.TEST_PATH,
-                ImmutableNodes.containerNode(TestModel.TEST_QNAME), testSchemaContext).toSerializable(),
-                getRef());
+                    ImmutableNodes.containerNode(TestModel.TEST_QNAME)).toSerializable(
+                            DataStoreVersions.HELIUM_2_VERSION), getRef());
 
             expectMsgClass(duration("5 seconds"), ShardTransactionMessages.MergeDataReply.class);
 
@@ -274,20 +314,43 @@ public class ShardTransactionTest extends AbstractActorTest {
 
             //unserialized merge
             transaction.tell(new MergeData(TestModel.TEST_PATH,
-                ImmutableNodes.containerNode(TestModel.TEST_QNAME), testSchemaContext),
+                ImmutableNodes.containerNode(TestModel.TEST_QNAME)),
                 getRef());
 
             expectMsgClass(duration("5 seconds"), MergeDataReply.class);
         }};
     }
 
+    @Test
+    public void testOnReceiveHeliumR1MergeData() throws Exception {
+        new JavaTestKit(getSystem()) {{
+            final ActorRef shard = createShard();
+            final Props props = ShardTransaction.props(store.newWriteOnlyTransaction(), shard,
+                    testSchemaContext, datastoreContext, shardStats, "txn",
+                    DataStoreVersions.HELIUM_1_VERSION);
+            final ActorRef transaction = getSystem().actorOf(props, "testOnReceiveHeliumR1MergeData");
+
+            Encoded encoded = new NormalizedNodeToNodeCodec(null).encode(TestModel.TEST_PATH,
+                    ImmutableNodes.containerNode(TestModel.TEST_QNAME));
+            ShardTransactionMessages.MergeData serialized = ShardTransactionMessages.MergeData.newBuilder()
+                    .setInstanceIdentifierPathArguments(encoded.getEncodedPath())
+                    .setNormalizedNode(encoded.getEncodedNode().getNormalizedNode()).build();
+
+            transaction.tell(serialized, getRef());
+
+            expectMsgClass(duration("5 seconds"), ShardTransactionMessages.MergeDataReply.class);
+
+            assertModification(transaction, MergeModification.class);
+        }};
+    }
+
     @Test
     public void testOnReceiveDeleteData() throws Exception {
         new JavaTestKit(getSystem()) {{
             final ActorRef shard = createShard();
             final Props props = ShardTransaction.props( store.newWriteOnlyTransaction(), shard,
                     testSchemaContext, datastoreContext, shardStats, "txn",
-                    CreateTransaction.CURRENT_VERSION);
+                    DataStoreVersions.CURRENT_VERSION);
             final ActorRef transaction = getSystem().actorOf(props, "testDeleteData");
 
             transaction.tell(new DeleteData(TestModel.TEST_PATH).toSerializable(), getRef());
@@ -310,7 +373,7 @@ public class ShardTransactionTest extends AbstractActorTest {
             final ActorRef shard = createShard();
             final Props props = ShardTransaction.props( store.newReadWriteTransaction(), shard,
                     testSchemaContext, datastoreContext, shardStats, "txn",
-                    CreateTransaction.CURRENT_VERSION);
+                    DataStoreVersions.CURRENT_VERSION);
             final ActorRef transaction = getSystem().actorOf(props, "testReadyTransaction");
 
             watch(transaction);
@@ -328,7 +391,7 @@ public class ShardTransactionTest extends AbstractActorTest {
             final ActorRef shard = createShard();
             final Props props = ShardTransaction.props( store.newReadWriteTransaction(), shard,
                 testSchemaContext, datastoreContext, shardStats, "txn",
-                CreateTransaction.CURRENT_VERSION);
+                DataStoreVersions.CURRENT_VERSION);
             final ActorRef transaction = getSystem().actorOf(props, "testReadyTransaction2");
 
             watch(transaction);
@@ -343,13 +406,14 @@ public class ShardTransactionTest extends AbstractActorTest {
 
     }
 
+    @SuppressWarnings("unchecked")
     @Test
     public void testOnReceiveCloseTransaction() throws Exception {
         new JavaTestKit(getSystem()) {{
             final ActorRef shard = createShard();
             final Props props = ShardTransaction.props(store.newReadWriteTransaction(), shard,
                     testSchemaContext, datastoreContext, shardStats, "txn",
-                    CreateTransaction.CURRENT_VERSION);
+                    DataStoreVersions.CURRENT_VERSION);
             final ActorRef transaction = getSystem().actorOf(props, "testCloseTransaction");
 
             watch(transaction);
@@ -366,7 +430,7 @@ public class ShardTransactionTest extends AbstractActorTest {
         final ActorRef shard = createShard();
         final Props props = ShardTransaction.props(store.newReadOnlyTransaction(), shard,
                 testSchemaContext, datastoreContext, shardStats, "txn",
-                CreateTransaction.CURRENT_VERSION);
+                DataStoreVersions.CURRENT_VERSION);
         final TestActorRef<ShardTransaction> transaction = TestActorRef.apply(props,getSystem());
 
         transaction.receive(new DeleteData(TestModel.TEST_PATH).toSerializable(), ActorRef.noSender());
@@ -382,7 +446,7 @@ public class ShardTransactionTest extends AbstractActorTest {
             final ActorRef shard = createShard();
             final Props props = ShardTransaction.props(store.newReadWriteTransaction(), shard,
                     testSchemaContext, datastoreContext, shardStats, "txn",
-                    CreateTransaction.CURRENT_VERSION);
+                    DataStoreVersions.CURRENT_VERSION);
             final ActorRef transaction =
                 getSystem().actorOf(props, "testShardTransactionInactivity");
 
index 46060dd..75c93dd 100644 (file)
@@ -1,11 +1,21 @@
 package org.opendaylight.controller.cluster.datastore;
 
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.isA;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
 import akka.actor.ActorPath;
 import akka.actor.ActorSelection;
 import akka.actor.Props;
 import akka.dispatch.Futures;
 import com.google.common.collect.Lists;
 import com.google.common.util.concurrent.ListenableFuture;
+import java.util.List;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
 import org.junit.Before;
 import org.junit.Test;
 import org.mockito.Mock;
@@ -24,18 +34,6 @@ import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
 import org.opendaylight.controller.cluster.datastore.utils.DoNothingActor;
 import scala.concurrent.Future;
 
-import java.util.List;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.TimeUnit;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.fail;
-import static org.mockito.Mockito.any;
-import static org.mockito.Mockito.doReturn;
-import static org.mockito.Mockito.isA;
-import static org.mockito.Mockito.times;
-import static org.mockito.Mockito.verify;
-
 public class ThreePhaseCommitCohortProxyTest extends AbstractActorTest {
 
     @SuppressWarnings("serial")
@@ -112,14 +110,14 @@ public class ThreePhaseCommitCohortProxyTest extends AbstractActorTest {
         ThreePhaseCommitCohortProxy proxy = setupProxy(1);
 
         setupMockActorContext(CanCommitTransaction.SERIALIZABLE_CLASS,
-                new CanCommitTransactionReply(true));
+                CanCommitTransactionReply.YES);
 
         ListenableFuture<Boolean> future = proxy.canCommit();
 
         assertEquals("canCommit", true, future.get(5, TimeUnit.SECONDS));
 
         setupMockActorContext(CanCommitTransaction.SERIALIZABLE_CLASS,
-                new CanCommitTransactionReply(false));
+                CanCommitTransactionReply.NO);
 
         future = proxy.canCommit();
 
@@ -134,7 +132,7 @@ public class ThreePhaseCommitCohortProxyTest extends AbstractActorTest {
         ThreePhaseCommitCohortProxy proxy = setupProxy(2);
 
         setupMockActorContext(CanCommitTransaction.SERIALIZABLE_CLASS,
-                new CanCommitTransactionReply(true), new CanCommitTransactionReply(true));
+                CanCommitTransactionReply.YES, CanCommitTransactionReply.YES);
 
         ListenableFuture<Boolean> future = proxy.canCommit();
 
@@ -149,8 +147,7 @@ public class ThreePhaseCommitCohortProxyTest extends AbstractActorTest {
         ThreePhaseCommitCohortProxy proxy = setupProxy(3);
 
         setupMockActorContext(CanCommitTransaction.SERIALIZABLE_CLASS,
-                new CanCommitTransactionReply(true), new CanCommitTransactionReply(false),
-                new CanCommitTransactionReply(true));
+                CanCommitTransactionReply.YES, CanCommitTransactionReply.NO, CanCommitTransactionReply.YES);
 
         ListenableFuture<Boolean> future = proxy.canCommit();
 
@@ -289,7 +286,7 @@ public class ThreePhaseCommitCohortProxyTest extends AbstractActorTest {
         ThreePhaseCommitCohortProxy proxy = setupProxy(2);
 
         setupMockActorContext(CanCommitTransaction.SERIALIZABLE_CLASS,
-                new CanCommitTransactionReply(true), new CanCommitTransactionReply(true));
+                CanCommitTransactionReply.YES, CanCommitTransactionReply.YES);
 
         setupMockActorContext(PreCommitTransaction.SERIALIZABLE_CLASS,
                 new PreCommitTransactionReply(), new PreCommitTransactionReply());
index 7407897..5e53b29 100644 (file)
@@ -1,5 +1,20 @@
 package org.opendaylight.controller.cluster.datastore;
 
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.anyString;
+import static org.mockito.Matchers.argThat;
+import static org.mockito.Matchers.eq;
+import static org.mockito.Matchers.isA;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.opendaylight.controller.cluster.datastore.TransactionProxy.TransactionType.READ_ONLY;
+import static org.opendaylight.controller.cluster.datastore.TransactionProxy.TransactionType.READ_WRITE;
+import static org.opendaylight.controller.cluster.datastore.TransactionProxy.TransactionType.WRITE_ONLY;
 import akka.actor.ActorRef;
 import akka.actor.ActorSelection;
 import akka.actor.ActorSystem;
@@ -11,12 +26,16 @@ import com.google.common.collect.ImmutableMap;
 import com.google.common.util.concurrent.CheckedFuture;
 import com.typesafe.config.Config;
 import com.typesafe.config.ConfigFactory;
+import java.io.IOException;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
 import org.junit.AfterClass;
 import org.junit.Before;
 import org.junit.BeforeClass;
 import org.junit.Test;
 import org.mockito.ArgumentMatcher;
 import org.mockito.Mock;
+import org.mockito.Mockito;
 import org.mockito.MockitoAnnotations;
 import org.opendaylight.controller.cluster.datastore.TransactionProxy.TransactionType;
 import org.opendaylight.controller.cluster.datastore.exceptions.PrimaryNotFoundException;
@@ -42,6 +61,7 @@ import org.opendaylight.controller.cluster.datastore.utils.DoNothingActor;
 import org.opendaylight.controller.cluster.datastore.utils.MockConfiguration;
 import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
 import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
+import org.opendaylight.controller.protobuff.messages.transaction.ShardTransactionMessages;
 import org.opendaylight.controller.protobuff.messages.transaction.ShardTransactionMessages.CreateTransactionReply;
 import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
@@ -50,24 +70,6 @@ import org.opendaylight.yangtools.yang.model.api.SchemaContext;
 import scala.concurrent.Await;
 import scala.concurrent.Future;
 import scala.concurrent.duration.Duration;
-import java.io.IOException;
-import java.util.List;
-import java.util.concurrent.TimeUnit;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
-import static org.mockito.Matchers.any;
-import static org.mockito.Matchers.anyString;
-import static org.mockito.Mockito.argThat;
-import static org.mockito.Mockito.doReturn;
-import static org.mockito.Mockito.eq;
-import static org.mockito.Mockito.isA;
-import static org.mockito.Mockito.times;
-import static org.mockito.Mockito.verify;
-import static org.opendaylight.controller.cluster.datastore.TransactionProxy.TransactionType.READ_ONLY;
-import static org.opendaylight.controller.cluster.datastore.TransactionProxy.TransactionType.READ_WRITE;
-import static org.opendaylight.controller.cluster.datastore.TransactionProxy.TransactionType.WRITE_ONLY;
 
 @SuppressWarnings("resource")
 public class TransactionProxyTest {
@@ -137,9 +139,13 @@ public class TransactionProxyTest {
         ArgumentMatcher<CreateTransaction> matcher = new ArgumentMatcher<CreateTransaction>() {
             @Override
             public boolean matches(Object argument) {
-                CreateTransaction obj = CreateTransaction.fromSerializable(argument);
-                return obj.getTransactionId().startsWith(memberName) &&
-                       obj.getTransactionType() == type.ordinal();
+                if(CreateTransaction.SERIALIZABLE_CLASS.equals(argument.getClass())) {
+                    CreateTransaction obj = CreateTransaction.fromSerializable(argument);
+                    return obj.getTransactionId().startsWith(memberName) &&
+                            obj.getTransactionType() == type.ordinal();
+                }
+
+                return false;
             }
         };
 
@@ -195,16 +201,25 @@ public class TransactionProxyTest {
     }
 
     private WriteData eqSerializedWriteData(final NormalizedNode<?, ?> nodeToWrite) {
+        return eqSerializedWriteData(nodeToWrite, DataStoreVersions.CURRENT_VERSION);
+    }
+
+    private WriteData eqSerializedWriteData(final NormalizedNode<?, ?> nodeToWrite,
+            final int transactionVersion) {
         ArgumentMatcher<WriteData> matcher = new ArgumentMatcher<WriteData>() {
             @Override
             public boolean matches(Object argument) {
-                if(!WriteData.SERIALIZABLE_CLASS.equals(argument.getClass())) {
-                    return false;
+                if((transactionVersion >= DataStoreVersions.LITHIUM_VERSION &&
+                        WriteData.SERIALIZABLE_CLASS.equals(argument.getClass())) ||
+                   (transactionVersion < DataStoreVersions.LITHIUM_VERSION &&
+                           ShardTransactionMessages.WriteData.class.equals(argument.getClass()))) {
+
+                    WriteData obj = WriteData.fromSerializable(argument);
+                    return obj.getPath().equals(TestModel.TEST_PATH) &&
+                           obj.getData().equals(nodeToWrite);
                 }
 
-                WriteData obj = WriteData.fromSerializable(argument, schemaContext);
-                return obj.getPath().equals(TestModel.TEST_PATH) &&
-                       obj.getData().equals(nodeToWrite);
+                return false;
             }
         };
 
@@ -228,16 +243,25 @@ public class TransactionProxyTest {
     }
 
     private MergeData eqSerializedMergeData(final NormalizedNode<?, ?> nodeToWrite) {
+        return eqSerializedMergeData(nodeToWrite, DataStoreVersions.CURRENT_VERSION);
+    }
+
+    private MergeData eqSerializedMergeData(final NormalizedNode<?, ?> nodeToWrite,
+            final int transactionVersion) {
         ArgumentMatcher<MergeData> matcher = new ArgumentMatcher<MergeData>() {
             @Override
             public boolean matches(Object argument) {
-                if(!MergeData.SERIALIZABLE_CLASS.equals(argument.getClass())) {
-                    return false;
+                if((transactionVersion >= DataStoreVersions.LITHIUM_VERSION &&
+                        MergeData.SERIALIZABLE_CLASS.equals(argument.getClass())) ||
+                   (transactionVersion < DataStoreVersions.LITHIUM_VERSION &&
+                           ShardTransactionMessages.MergeData.class.equals(argument.getClass()))) {
+
+                    MergeData obj = MergeData.fromSerializable(argument);
+                    return obj.getPath().equals(TestModel.TEST_PATH) &&
+                           obj.getData().equals(nodeToWrite);
                 }
 
-                MergeData obj = MergeData.fromSerializable(argument, schemaContext);
-                return obj.getPath().equals(TestModel.TEST_PATH) &&
-                       obj.getData().equals(nodeToWrite);
+                return false;
             }
         };
 
@@ -293,13 +317,17 @@ public class TransactionProxyTest {
         return Futures.successful((Object)new ReadyTransactionReply(path));
     }
 
+    private Future<Object> readSerializedDataReply(NormalizedNode<?, ?> data,
+            short transactionVersion) {
+        return Futures.successful(new ReadDataReply(data).toSerializable(transactionVersion));
+    }
 
     private Future<Object> readSerializedDataReply(NormalizedNode<?, ?> data) {
-        return Futures.successful(new ReadDataReply(schemaContext, data).toSerializable());
+        return readSerializedDataReply(data, DataStoreVersions.CURRENT_VERSION);
     }
 
     private Future<ReadDataReply> readDataReply(NormalizedNode<?, ?> data) {
-        return Futures.successful(new ReadDataReply(schemaContext, data));
+        return Futures.successful(new ReadDataReply(data));
     }
 
     private Future<Object> dataExistsSerializedReply(boolean exists) {
@@ -310,16 +338,24 @@ public class TransactionProxyTest {
         return Futures.successful(new DataExistsReply(exists));
     }
 
+    private Future<Object> writeSerializedDataReply(short version) {
+        return Futures.successful(new WriteDataReply().toSerializable(version));
+    }
+
     private Future<Object> writeSerializedDataReply() {
-        return Futures.successful(new WriteDataReply().toSerializable());
+        return writeSerializedDataReply(DataStoreVersions.CURRENT_VERSION);
     }
 
     private Future<WriteDataReply> writeDataReply() {
         return Futures.successful(new WriteDataReply());
     }
 
+    private Future<Object> mergeSerializedDataReply(short version) {
+        return Futures.successful(new MergeDataReply().toSerializable(version));
+    }
+
     private Future<Object> mergeSerializedDataReply() {
-        return Futures.successful(new MergeDataReply().toSerializable());
+        return mergeSerializedDataReply(DataStoreVersions.CURRENT_VERSION);
     }
 
     private Future<MergeDataReply> mergeDataReply() {
@@ -346,7 +382,8 @@ public class TransactionProxyTest {
             .build();
     }
 
-    private ActorRef setupActorContextWithInitialCreateTransaction(ActorSystem actorSystem, TransactionType type, int transactionVersion) {
+    private ActorRef setupActorContextWithInitialCreateTransaction(ActorSystem actorSystem,
+            TransactionType type, int transactionVersion) {
         ActorRef actorRef = actorSystem.actorOf(Props.create(DoNothingActor.class));
         doReturn(actorSystem.actorSelection(actorRef.path())).
                 when(mockActorContext).actorSelection(actorRef.path().toString());
@@ -358,13 +395,11 @@ public class TransactionProxyTest {
                 executeOperationAsync(eq(actorSystem.actorSelection(actorRef.path())),
                         eqCreateTransaction(memberName, type));
 
-        doReturn(false).when(mockActorContext).isPathLocal(actorRef.path().toString());
-
         return actorRef;
     }
 
     private ActorRef setupActorContextWithInitialCreateTransaction(ActorSystem actorSystem, TransactionType type) {
-        return setupActorContextWithInitialCreateTransaction(actorSystem, type, CreateTransaction.CURRENT_VERSION);
+        return setupActorContextWithInitialCreateTransaction(actorSystem, type, DataStoreVersions.CURRENT_VERSION);
     }
 
 
@@ -718,7 +753,7 @@ public class TransactionProxyTest {
                 eq(actorSelection(actorRef)), eqSerializedWriteData(nodeToWrite));
 
         verifyRecordingOperationFutures(transactionProxy.getRecordedOperationFutures(),
-                WriteDataReply.SERIALIZABLE_CLASS);
+                WriteDataReply.class);
     }
 
     @Test(expected=IllegalStateException.class)
@@ -760,7 +795,7 @@ public class TransactionProxyTest {
                 eq(actorSelection(actorRef)), eqSerializedMergeData(nodeToWrite));
 
         verifyRecordingOperationFutures(transactionProxy.getRecordedOperationFutures(),
-                MergeDataReply.SERIALIZABLE_CLASS);
+                MergeDataReply.class);
     }
 
     @Test
@@ -836,22 +871,25 @@ public class TransactionProxyTest {
         ThreePhaseCommitCohortProxy proxy = (ThreePhaseCommitCohortProxy) ready;
 
         verifyRecordingOperationFutures(transactionProxy.getRecordedOperationFutures(),
-                WriteDataReply.SERIALIZABLE_CLASS);
+                WriteDataReply.class);
 
         verifyCohortFutures(proxy, getSystem().actorSelection(actorRef.path()));
     }
 
-    @Test
-    public void testReadyForwardCompatibility() throws Exception {
-        ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_WRITE, 0);
+    private ActorRef testCompatibilityWithHeliumVersion(short version) throws Exception {
+        ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(),
+                READ_WRITE, version);
 
-        NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
+        NormalizedNode<?, ?> testNode = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
 
-        doReturn(readSerializedDataReply(null)).when(mockActorContext).executeOperationAsync(
+        doReturn(readSerializedDataReply(testNode, version)).when(mockActorContext).executeOperationAsync(
                 eq(actorSelection(actorRef)), eqSerializedReadData());
 
-        doReturn(writeSerializedDataReply()).when(mockActorContext).executeOperationAsync(
-                eq(actorSelection(actorRef)), eqSerializedWriteData(nodeToWrite));
+        doReturn(writeSerializedDataReply(version)).when(mockActorContext).executeOperationAsync(
+                eq(actorSelection(actorRef)), eqSerializedWriteData(testNode, version));
+
+        doReturn(mergeSerializedDataReply(version)).when(mockActorContext).executeOperationAsync(
+                eq(actorSelection(actorRef)), eqSerializedMergeData(testNode, version));
 
         doReturn(readySerializedTxReply(actorRef.path().toString())).when(mockActorContext).executeOperationAsync(
                 eq(actorSelection(actorRef)), isA(ReadyTransaction.SERIALIZABLE_CLASS));
@@ -859,12 +897,17 @@ public class TransactionProxyTest {
         doReturn(actorRef.path().toString()).when(mockActorContext).resolvePath(eq(actorRef.path().toString()),
                 eq(actorRef.path().toString()));
 
-        TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
-                READ_WRITE);
+        TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, READ_WRITE);
 
-        transactionProxy.read(TestModel.TEST_PATH);
+        Optional<NormalizedNode<?, ?>> readOptional = transactionProxy.read(TestModel.TEST_PATH).
+                get(5, TimeUnit.SECONDS);
 
-        transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
+        assertEquals("NormalizedNode isPresent", true, readOptional.isPresent());
+        assertEquals("Response NormalizedNode", testNode, readOptional.get());
+
+        transactionProxy.write(TestModel.TEST_PATH, testNode);
+
+        transactionProxy.merge(TestModel.TEST_PATH, testNode);
 
         DOMStoreThreePhaseCommitCohort ready = transactionProxy.ready();
 
@@ -873,14 +916,29 @@ public class TransactionProxyTest {
         ThreePhaseCommitCohortProxy proxy = (ThreePhaseCommitCohortProxy) ready;
 
         verifyRecordingOperationFutures(transactionProxy.getRecordedOperationFutures(),
-                WriteDataReply.SERIALIZABLE_CLASS);
+                ShardTransactionMessages.WriteDataReply.class, ShardTransactionMessages.MergeDataReply.class);
 
         verifyCohortFutures(proxy, getSystem().actorSelection(actorRef.path()));
 
+        return actorRef;
+    }
+
+    @Test
+    public void testCompatibilityWithBaseHeliumVersion() throws Exception {
+        ActorRef actorRef = testCompatibilityWithHeliumVersion(DataStoreVersions.BASE_HELIUM_VERSION);
+
         verify(mockActorContext).resolvePath(eq(actorRef.path().toString()),
                 eq(actorRef.path().toString()));
     }
 
+    @Test
+    public void testCompatibilityWithHeliumR1Version() throws Exception {
+        ActorRef actorRef = testCompatibilityWithHeliumVersion(DataStoreVersions.HELIUM_1_VERSION);
+
+        verify(mockActorContext, Mockito.never()).resolvePath(eq(actorRef.path().toString()),
+                eq(actorRef.path().toString()));
+    }
+
     @Test
     public void testReadyWithRecordingOperationFailure() throws Exception {
         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), WRITE_ONLY);
@@ -914,7 +972,7 @@ public class TransactionProxyTest {
         verifyCohortFutures(proxy, TestException.class);
 
         verifyRecordingOperationFutures(transactionProxy.getRecordedOperationFutures(),
-                MergeDataReply.SERIALIZABLE_CLASS, TestException.class);
+                MergeDataReply.class, TestException.class);
     }
 
     @Test
@@ -942,7 +1000,7 @@ public class TransactionProxyTest {
         ThreePhaseCommitCohortProxy proxy = (ThreePhaseCommitCohortProxy) ready;
 
         verifyRecordingOperationFutures(transactionProxy.getRecordedOperationFutures(),
-                MergeDataReply.SERIALIZABLE_CLASS);
+                MergeDataReply.class);
 
         verifyCohortFutures(proxy, TestException.class);
     }
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/messages/DataChangedTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/messages/DataChangedTest.java
new file mode 100644 (file)
index 0000000..2c1de7f
--- /dev/null
@@ -0,0 +1,57 @@
+/*
+ * Copyright (c) 2014 Brocade Communications Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.datastore.messages;
+
+import static org.junit.Assert.assertEquals;
+import org.apache.commons.lang.SerializationUtils;
+import org.junit.Test;
+import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
+import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker.DataChangeScope;
+import org.opendaylight.controller.md.sal.dom.store.impl.DOMImmutableDataChangeEvent;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
+import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
+import org.opendaylight.yangtools.yang.data.impl.schema.builder.impl.ImmutableContainerNodeBuilder;
+
+/**
+ * Unit tests for DataChanged.
+ *
+ * @author Thomas Pantelis
+ */
+public class DataChangedTest {
+
+    @Test
+    public void testSerialization() {
+        DOMImmutableDataChangeEvent change = DOMImmutableDataChangeEvent.builder(DataChangeScope.SUBTREE).
+                addCreated(TestModel.TEST_PATH, ImmutableContainerNodeBuilder.create().withNodeIdentifier(
+                        new YangInstanceIdentifier.NodeIdentifier(TestModel.TEST_QNAME)).
+                        withChild(ImmutableNodes.leafNode(TestModel.DESC_QNAME, "foo")).build()).
+                addUpdated(TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME),
+                        ImmutableContainerNodeBuilder.create().withNodeIdentifier(
+                            new YangInstanceIdentifier.NodeIdentifier(TestModel.TEST_QNAME)).
+                            withChild(ImmutableNodes.leafNode(TestModel.NAME_QNAME, "bar")).build())
+.
+                addRemoved(TestModel.OUTER_LIST_PATH,
+                       ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build()).
+                setBefore(ImmutableNodes.containerNode(TestModel.TEST_QNAME)).
+                setAfter(ImmutableContainerNodeBuilder.create().withNodeIdentifier(
+                        new YangInstanceIdentifier.NodeIdentifier(TestModel.TEST_QNAME)).
+                        withChild(ImmutableNodes.leafNode(TestModel.DESC_QNAME, "foo")).
+                        withChild(ImmutableNodes.leafNode(TestModel.NAME_QNAME, "bar")).build()).build();
+
+        DataChanged expected = new DataChanged(change);
+
+        DataChanged actual = (DataChanged) SerializationUtils.clone(expected);
+
+        assertEquals("getCreatedData", change.getCreatedData(), actual.getChange().getCreatedData());
+        assertEquals("getOriginalData", change.getOriginalData(), actual.getChange().getOriginalData());
+        assertEquals("getOriginalSubtree", change.getOriginalSubtree(), actual.getChange().getOriginalSubtree());
+        assertEquals("getRemovedPaths", change.getRemovedPaths(), actual.getChange().getRemovedPaths());
+        assertEquals("getUpdatedData", change.getUpdatedData(), actual.getChange().getUpdatedData());
+        assertEquals("getUpdatedSubtree", change.getUpdatedSubtree(), actual.getChange().getUpdatedSubtree());
+    }
+}
index 8f3ca9c..5b40afd 100644 (file)
@@ -1,21 +1,70 @@
 package org.opendaylight.controller.cluster.datastore.messages;
 
-import org.junit.Assert;
+import static org.junit.Assert.assertEquals;
+import java.io.Serializable;
+import org.apache.commons.lang.SerializationUtils;
 import org.junit.Test;
+import org.opendaylight.controller.cluster.datastore.DataStoreVersions;
 import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
+import org.opendaylight.controller.protobuff.messages.common.NormalizedNodeMessages.InstanceIdentifier;
+import org.opendaylight.controller.protobuff.messages.common.NormalizedNodeMessages.Node;
+import org.opendaylight.controller.protobuff.messages.transaction.ShardTransactionMessages;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
+import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
 import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
-import org.opendaylight.yangtools.yang.model.api.SchemaContext;
+import org.opendaylight.yangtools.yang.data.impl.schema.builder.impl.ImmutableContainerNodeBuilder;
 
 public class MergeDataTest {
 
     @Test
     public void testSerialization() {
-        SchemaContext schemaContext = TestModel.createTestContext();
-        MergeData expected = new MergeData(TestModel.TEST_PATH, ImmutableNodes
-            .containerNode(TestModel.TEST_QNAME), schemaContext);
+        YangInstanceIdentifier path = TestModel.TEST_PATH;
+        NormalizedNode<?, ?> data = ImmutableContainerNodeBuilder.create().withNodeIdentifier(
+                new YangInstanceIdentifier.NodeIdentifier(TestModel.TEST_QNAME)).
+                withChild(ImmutableNodes.leafNode(TestModel.DESC_QNAME, "foo")).build();
 
-        MergeData actual = MergeData.fromSerializable(expected.toSerializable(), schemaContext);
-        Assert.assertEquals("getPath", expected.getPath(), actual.getPath());
-        Assert.assertEquals("getData", expected.getData(), actual.getData());
+        MergeData expected = new MergeData(path, data);
+
+        Object serialized = expected.toSerializable(DataStoreVersions.CURRENT_VERSION);
+        assertEquals("Serialized type", MergeData.class, serialized.getClass());
+        assertEquals("Version", DataStoreVersions.CURRENT_VERSION, ((MergeData)serialized).getVersion());
+
+        Object clone = SerializationUtils.clone((Serializable) serialized);
+        assertEquals("Version", DataStoreVersions.CURRENT_VERSION, ((MergeData)clone).getVersion());
+        MergeData actual = MergeData.fromSerializable(clone);
+        assertEquals("getPath", expected.getPath(), actual.getPath());
+        assertEquals("getData", expected.getData(), actual.getData());
+    }
+
+    @Test
+    public void testIsSerializedType() {
+        assertEquals("isSerializedType", true, MergeData.isSerializedType(
+                ShardTransactionMessages.MergeData.newBuilder()
+                    .setInstanceIdentifierPathArguments(InstanceIdentifier.getDefaultInstance())
+                    .setNormalizedNode(Node.getDefaultInstance()).build()));
+        assertEquals("isSerializedType", true,
+                MergeData.isSerializedType(new MergeData()));
+        assertEquals("isSerializedType", false, MergeData.isSerializedType(new Object()));
+    }
+
+    /**
+     * Tests backwards compatible serialization/deserialization of a MergeData message with the
+     * base and R1 Helium versions, which used the protobuff MergeData message.
+     */
+    @Test
+    public void testSerializationWithHeliumR1Version() throws Exception {
+        YangInstanceIdentifier path = TestModel.TEST_PATH;
+        NormalizedNode<?, ?> data = ImmutableContainerNodeBuilder.create().withNodeIdentifier(
+                new YangInstanceIdentifier.NodeIdentifier(TestModel.TEST_QNAME)).
+                withChild(ImmutableNodes.leafNode(TestModel.DESC_QNAME, "foo")).build();
+
+        MergeData expected = new MergeData(path, data);
+
+        Object serialized = expected.toSerializable(DataStoreVersions.HELIUM_1_VERSION);
+        assertEquals("Serialized type", ShardTransactionMessages.MergeData.class, serialized.getClass());
+
+        MergeData actual = MergeData.fromSerializable(SerializationUtils.clone((Serializable) serialized));
+        assertEquals("getPath", expected.getPath(), actual.getPath());
+        assertEquals("getData", expected.getData(), actual.getData());
     }
 }
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/messages/ReadDataReplyTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/messages/ReadDataReplyTest.java
new file mode 100644 (file)
index 0000000..8ce7329
--- /dev/null
@@ -0,0 +1,72 @@
+/*
+ * Copyright (c) 2014 Brocade Communications Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.datastore.messages;
+
+import static org.junit.Assert.assertEquals;
+import java.io.Serializable;
+import org.apache.commons.lang.SerializationUtils;
+import org.junit.Test;
+import org.opendaylight.controller.cluster.datastore.DataStoreVersions;
+import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
+import org.opendaylight.controller.protobuff.messages.transaction.ShardTransactionMessages;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
+import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
+import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
+import org.opendaylight.yangtools.yang.data.impl.schema.builder.impl.ImmutableContainerNodeBuilder;
+
+/**
+ * Unit tests for ReadDataReply.
+ *
+ * @author Thomas Pantelis
+ */
+public class ReadDataReplyTest {
+
+    @Test
+    public void testSerialization() {
+        NormalizedNode<?, ?> data = ImmutableContainerNodeBuilder.create().withNodeIdentifier(
+                new YangInstanceIdentifier.NodeIdentifier(TestModel.TEST_QNAME)).
+                withChild(ImmutableNodes.leafNode(TestModel.DESC_QNAME, "foo")).build();
+
+        ReadDataReply expected = new ReadDataReply(data);
+
+        Object serialized = expected.toSerializable(DataStoreVersions.CURRENT_VERSION);
+        assertEquals("Serialized type", ReadDataReply.class, serialized.getClass());
+
+        ReadDataReply actual = ReadDataReply.fromSerializable(SerializationUtils.clone(
+                (Serializable) serialized));
+        assertEquals("getNormalizedNode", expected.getNormalizedNode(), actual.getNormalizedNode());
+    }
+
+    @Test
+    public void testIsSerializedType() {
+        assertEquals("isSerializedType", true, ReadDataReply.isSerializedType(
+                ShardTransactionMessages.ReadDataReply.newBuilder().build()));
+        assertEquals("isSerializedType", true, ReadDataReply.isSerializedType(new ReadDataReply()));
+        assertEquals("isSerializedType", false, ReadDataReply.isSerializedType(new Object()));
+    }
+
+    /**
+     * Tests backwards compatible serialization/deserialization of a ReadDataReply message with the
+     * base and R1 Helium versions, which used the protobuff ReadDataReply message.
+     */
+    @Test
+    public void testSerializationWithHeliumR1Version() throws Exception {
+        NormalizedNode<?, ?> data = ImmutableContainerNodeBuilder.create().withNodeIdentifier(
+                new YangInstanceIdentifier.NodeIdentifier(TestModel.TEST_QNAME)).
+                withChild(ImmutableNodes.leafNode(TestModel.DESC_QNAME, "foo")).build();
+
+        ReadDataReply expected = new ReadDataReply(data);
+
+        Object serialized = expected.toSerializable(DataStoreVersions.HELIUM_1_VERSION);
+        assertEquals("Serialized type", ShardTransactionMessages.ReadDataReply.class, serialized.getClass());
+
+        ReadDataReply actual = ReadDataReply.fromSerializable(SerializationUtils.clone(
+                (Serializable) serialized));
+        assertEquals("getNormalizedNode", expected.getNormalizedNode(), actual.getNormalizedNode());
+    }
+}
index 6a5d65f..90a76f2 100644 (file)
@@ -7,11 +7,19 @@
  */
 package org.opendaylight.controller.cluster.datastore.messages;
 
-import org.junit.Assert;
+import static org.junit.Assert.assertEquals;
+import java.io.Serializable;
+import org.apache.commons.lang.SerializationUtils;
 import org.junit.Test;
+import org.opendaylight.controller.cluster.datastore.DataStoreVersions;
 import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
+import org.opendaylight.controller.protobuff.messages.common.NormalizedNodeMessages.InstanceIdentifier;
+import org.opendaylight.controller.protobuff.messages.common.NormalizedNodeMessages.Node;
+import org.opendaylight.controller.protobuff.messages.transaction.ShardTransactionMessages;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
+import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
 import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
-import org.opendaylight.yangtools.yang.model.api.SchemaContext;
+import org.opendaylight.yangtools.yang.data.impl.schema.builder.impl.ImmutableContainerNodeBuilder;
 
 /**
  * Unit tests for WriteData.
@@ -22,12 +30,52 @@ public class WriteDataTest {
 
     @Test
     public void testSerialization() {
-        SchemaContext schemaContext = TestModel.createTestContext();
-        WriteData expected = new WriteData(TestModel.TEST_PATH, ImmutableNodes
-            .containerNode(TestModel.TEST_QNAME), schemaContext);
+        YangInstanceIdentifier path = TestModel.TEST_PATH;
+        NormalizedNode<?, ?> data = ImmutableContainerNodeBuilder.create().withNodeIdentifier(
+                new YangInstanceIdentifier.NodeIdentifier(TestModel.TEST_QNAME)).
+                withChild(ImmutableNodes.leafNode(TestModel.DESC_QNAME, "foo")).build();
 
-        WriteData actual = WriteData.fromSerializable(expected.toSerializable(), schemaContext);
-        Assert.assertEquals("getPath", expected.getPath(), actual.getPath());
-        Assert.assertEquals("getData", expected.getData(), actual.getData());
+        WriteData expected = new WriteData(path, data);
+
+        Object serialized = expected.toSerializable(DataStoreVersions.CURRENT_VERSION);
+        assertEquals("Serialized type", WriteData.class, serialized.getClass());
+        assertEquals("Version", DataStoreVersions.CURRENT_VERSION, ((WriteData)serialized).getVersion());
+
+        Object clone = SerializationUtils.clone((Serializable) serialized);
+        assertEquals("Version", DataStoreVersions.CURRENT_VERSION, ((WriteData)clone).getVersion());
+        WriteData actual = WriteData.fromSerializable(clone);
+        assertEquals("getPath", expected.getPath(), actual.getPath());
+        assertEquals("getData", expected.getData(), actual.getData());
+    }
+
+    @Test
+    public void testIsSerializedType() {
+        assertEquals("isSerializedType", true, WriteData.isSerializedType(
+                ShardTransactionMessages.WriteData.newBuilder()
+                    .setInstanceIdentifierPathArguments(InstanceIdentifier.getDefaultInstance())
+                    .setNormalizedNode(Node.getDefaultInstance()).build()));
+        assertEquals("isSerializedType", true, WriteData.isSerializedType(new WriteData()));
+        assertEquals("isSerializedType", false, WriteData.isSerializedType(new Object()));
+    }
+
+    /**
+     * Tests backwards compatible serialization/deserialization of a WriteData message with the
+     * base and R1 Helium versions, which used the protobuff WriteData message.
+     */
+    @Test
+    public void testSerializationWithHeliumR1Version() throws Exception {
+        YangInstanceIdentifier path = TestModel.TEST_PATH;
+        NormalizedNode<?, ?> data = ImmutableContainerNodeBuilder.create().withNodeIdentifier(
+                new YangInstanceIdentifier.NodeIdentifier(TestModel.TEST_QNAME)).
+                withChild(ImmutableNodes.leafNode(TestModel.DESC_QNAME, "foo")).build();
+
+        WriteData expected = new WriteData(path, data);
+
+        Object serialized = expected.toSerializable(DataStoreVersions.HELIUM_1_VERSION);
+        assertEquals("Serialized type", ShardTransactionMessages.WriteData.class, serialized.getClass());
+
+        WriteData actual = WriteData.fromSerializable(SerializationUtils.clone((Serializable) serialized));
+        assertEquals("getPath", expected.getPath(), actual.getPath());
+        assertEquals("getData", expected.getData(), actual.getData());
     }
 }
index 85441ec..e571e3a 100644 (file)
@@ -7,16 +7,15 @@
  */
 package org.opendaylight.controller.md.cluster.datastore.model;
 
+import java.io.InputStream;
+import java.util.Collections;
+import java.util.Set;
 import org.opendaylight.yangtools.yang.common.QName;
 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
 import org.opendaylight.yangtools.yang.model.api.Module;
 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
 import org.opendaylight.yangtools.yang.parser.impl.YangParserImpl;
 
-import java.io.InputStream;
-import java.util.Collections;
-import java.util.Set;
-
 public class TestModel {
 
   public static final QName TEST_QNAME = QName.create("urn:opendaylight:params:xml:ns:yang:controller:md:sal:dom:store:test", "2014-03-13",
@@ -26,6 +25,7 @@ public class TestModel {
   public static final QName OUTER_CHOICE_QNAME = QName.create(TEST_QNAME, "outer-choice");
   public static final QName ID_QNAME = QName.create(TEST_QNAME, "id");
   public static final QName NAME_QNAME = QName.create(TEST_QNAME, "name");
+  public static final QName DESC_QNAME = QName.create(TEST_QNAME, "desc");
   public static final QName VALUE_QNAME = QName.create(TEST_QNAME, "value");
   private static final String DATASTORE_TEST_YANG = "/odl-datastore-test.yang";