Bug 2265: Use new NormalizedNode streaming in messages 48/12448/19
authortpantelis <tpanteli@brocade.com>
Mon, 19 Jan 2015 03:21:46 +0000 (22:21 -0500)
committertpantelis <tpanteli@brocade.com>
Wed, 21 Jan 2015 04:50:48 +0000 (23:50 -0500)
    Utilized the new NormalizedNode streaming classes for the WriteData,
    MergeData, ReadDataReply and DataChanged messages.

    One solution was to add a bytes field to the protobuff messages and
    make the previous fields optional. For backwards compatibility, in
    the wrapper message's fromSerializable method, check whether or not
    the protobuff message has the bytes field and act accordingly.

    While this works, it results in an undesirable inefficiency.
    Protobuff translates the bytes field to a ByteString type. So when
    streaming, we need to create a ByteArrayOutputStream and pass that to
    the NormalizedNode stream writer. Then call
    ByteString.copyFrom(bos.toByteArray) to get the resulting ByteString.
    However this results in 2 copies of the serialized byte[]. The
    byte[] cannot be passed to ByteString as is as it always copies it to
    maintain immutability. I looked into subclassing ByteString and
    lazily streaming the data on demand but the ByteString ctor is
    package scoped so they don’t allow subclassing.

    So I went with an approach to make each message Externalizable
    instead of using protobuff. The classes writes
    the version number to enable us to handle compatibility in the future.
    So in this manner, we can get the efficient direct streaming we
    want and easily handle backwards and forwards compatibility.

    I added a VersionedSerializableMessage interface whose
    toSerializable method takes a version number. The version # is passed
    from the remote version # obtained from the CreateTransactionReply.
    This allows the message classes to handle backwards compatibility. So
    if the version is Helium-1 or less, send the protobuff message
    otherwise send the Externalizable instance.

    In the fromSerializable method, it checks if the passed Object is an
    instance of the Externalizable class or the protbuff message type.

Change-Id: I5ebb968e70ac8ff92c29183c52e6c3fe5362ae34
Signed-off-by: tpantelis <tpanteli@brocade.com>
48 files changed:
opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/protobuff/messages/transaction/ShardTransactionMessages.java
opendaylight/md-sal/sal-clustering-commons/src/main/resources/ShardTransaction.proto
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DataChangeListener.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DataChangeListenerProxy.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DataStoreVersions.java [new file with mode: 0644]
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardCommitCoordinator.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardReadTransaction.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardReadWriteTransaction.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardTransaction.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardTransactionChain.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardWriteTransaction.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionProxy.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/AbortTransactionReply.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/CanCommitTransactionReply.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/CloseTransaction.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/CloseTransactionChainReply.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/CloseTransactionReply.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/CommitTransactionReply.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/CreateTransaction.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/CreateTransactionReply.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/DataChanged.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/DataChangedReply.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/DeleteDataReply.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/EmptyExternalizable.java [new file with mode: 0644]
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/EmptyReply.java [new file with mode: 0644]
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/MergeData.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/MergeDataReply.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/ModifyData.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/ReadDataReply.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/ReadyTransaction.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/VersionedSerializableMessage.java [new file with mode: 0644]
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/WriteData.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/WriteDataReply.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/utils/SerializationUtils.java [new file with mode: 0644]
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DataChangeListenerProxyTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DataChangeListenerTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTransactionFailureTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTransactionHeliumBackwardsCompatibilityTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTransactionTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ThreePhaseCommitCohortProxyTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/TransactionProxyTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/messages/DataChangedTest.java [new file with mode: 0644]
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/messages/MergeDataTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/messages/ReadDataReplyTest.java [new file with mode: 0644]
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/messages/WriteDataTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/md/cluster/datastore/model/TestModel.java

index 3a1cfaa443a8e735965ed3b8d92f9037ee7ead36..8be6ad14dd7d92649ab210e7078ee1f20e5520fe 100644 (file)
@@ -4823,14 +4823,26 @@ public final class ShardTransactionMessages {
     // required .org.opendaylight.controller.mdsal.InstanceIdentifier instanceIdentifierPathArguments = 1;
     /**
      * <code>required .org.opendaylight.controller.mdsal.InstanceIdentifier instanceIdentifierPathArguments = 1;</code>
+     *
+     * <pre>
+     * base Helium version
+     * </pre>
      */
     boolean hasInstanceIdentifierPathArguments();
     /**
      * <code>required .org.opendaylight.controller.mdsal.InstanceIdentifier instanceIdentifierPathArguments = 1;</code>
+     *
+     * <pre>
+     * base Helium version
+     * </pre>
      */
     org.opendaylight.controller.protobuff.messages.common.NormalizedNodeMessages.InstanceIdentifier getInstanceIdentifierPathArguments();
     /**
      * <code>required .org.opendaylight.controller.mdsal.InstanceIdentifier instanceIdentifierPathArguments = 1;</code>
+     *
+     * <pre>
+     * base Helium version
+     * </pre>
      */
     org.opendaylight.controller.protobuff.messages.common.NormalizedNodeMessages.InstanceIdentifierOrBuilder getInstanceIdentifierPathArgumentsOrBuilder();
 
@@ -4970,18 +4982,30 @@ public final class ShardTransactionMessages {
     private org.opendaylight.controller.protobuff.messages.common.NormalizedNodeMessages.InstanceIdentifier instanceIdentifierPathArguments_;
     /**
      * <code>required .org.opendaylight.controller.mdsal.InstanceIdentifier instanceIdentifierPathArguments = 1;</code>
+     *
+     * <pre>
+     * base Helium version
+     * </pre>
      */
     public boolean hasInstanceIdentifierPathArguments() {
       return ((bitField0_ & 0x00000001) == 0x00000001);
     }
     /**
      * <code>required .org.opendaylight.controller.mdsal.InstanceIdentifier instanceIdentifierPathArguments = 1;</code>
+     *
+     * <pre>
+     * base Helium version
+     * </pre>
      */
     public org.opendaylight.controller.protobuff.messages.common.NormalizedNodeMessages.InstanceIdentifier getInstanceIdentifierPathArguments() {
       return instanceIdentifierPathArguments_;
     }
     /**
      * <code>required .org.opendaylight.controller.mdsal.InstanceIdentifier instanceIdentifierPathArguments = 1;</code>
+     *
+     * <pre>
+     * base Helium version
+     * </pre>
      */
     public org.opendaylight.controller.protobuff.messages.common.NormalizedNodeMessages.InstanceIdentifierOrBuilder getInstanceIdentifierPathArgumentsOrBuilder() {
       return instanceIdentifierPathArguments_;
@@ -5309,12 +5333,20 @@ public final class ShardTransactionMessages {
           org.opendaylight.controller.protobuff.messages.common.NormalizedNodeMessages.InstanceIdentifier, org.opendaylight.controller.protobuff.messages.common.NormalizedNodeMessages.InstanceIdentifier.Builder, org.opendaylight.controller.protobuff.messages.common.NormalizedNodeMessages.InstanceIdentifierOrBuilder> instanceIdentifierPathArgumentsBuilder_;
       /**
        * <code>required .org.opendaylight.controller.mdsal.InstanceIdentifier instanceIdentifierPathArguments = 1;</code>
+       *
+       * <pre>
+       * base Helium version
+       * </pre>
        */
       public boolean hasInstanceIdentifierPathArguments() {
         return ((bitField0_ & 0x00000001) == 0x00000001);
       }
       /**
        * <code>required .org.opendaylight.controller.mdsal.InstanceIdentifier instanceIdentifierPathArguments = 1;</code>
+       *
+       * <pre>
+       * base Helium version
+       * </pre>
        */
       public org.opendaylight.controller.protobuff.messages.common.NormalizedNodeMessages.InstanceIdentifier getInstanceIdentifierPathArguments() {
         if (instanceIdentifierPathArgumentsBuilder_ == null) {
@@ -5325,6 +5357,10 @@ public final class ShardTransactionMessages {
       }
       /**
        * <code>required .org.opendaylight.controller.mdsal.InstanceIdentifier instanceIdentifierPathArguments = 1;</code>
+       *
+       * <pre>
+       * base Helium version
+       * </pre>
        */
       public Builder setInstanceIdentifierPathArguments(org.opendaylight.controller.protobuff.messages.common.NormalizedNodeMessages.InstanceIdentifier value) {
         if (instanceIdentifierPathArgumentsBuilder_ == null) {
@@ -5341,6 +5377,10 @@ public final class ShardTransactionMessages {
       }
       /**
        * <code>required .org.opendaylight.controller.mdsal.InstanceIdentifier instanceIdentifierPathArguments = 1;</code>
+       *
+       * <pre>
+       * base Helium version
+       * </pre>
        */
       public Builder setInstanceIdentifierPathArguments(
           org.opendaylight.controller.protobuff.messages.common.NormalizedNodeMessages.InstanceIdentifier.Builder builderForValue) {
@@ -5355,6 +5395,10 @@ public final class ShardTransactionMessages {
       }
       /**
        * <code>required .org.opendaylight.controller.mdsal.InstanceIdentifier instanceIdentifierPathArguments = 1;</code>
+       *
+       * <pre>
+       * base Helium version
+       * </pre>
        */
       public Builder mergeInstanceIdentifierPathArguments(org.opendaylight.controller.protobuff.messages.common.NormalizedNodeMessages.InstanceIdentifier value) {
         if (instanceIdentifierPathArgumentsBuilder_ == null) {
@@ -5374,6 +5418,10 @@ public final class ShardTransactionMessages {
       }
       /**
        * <code>required .org.opendaylight.controller.mdsal.InstanceIdentifier instanceIdentifierPathArguments = 1;</code>
+       *
+       * <pre>
+       * base Helium version
+       * </pre>
        */
       public Builder clearInstanceIdentifierPathArguments() {
         if (instanceIdentifierPathArgumentsBuilder_ == null) {
@@ -5387,6 +5435,10 @@ public final class ShardTransactionMessages {
       }
       /**
        * <code>required .org.opendaylight.controller.mdsal.InstanceIdentifier instanceIdentifierPathArguments = 1;</code>
+       *
+       * <pre>
+       * base Helium version
+       * </pre>
        */
       public org.opendaylight.controller.protobuff.messages.common.NormalizedNodeMessages.InstanceIdentifier.Builder getInstanceIdentifierPathArgumentsBuilder() {
         bitField0_ |= 0x00000001;
@@ -5395,6 +5447,10 @@ public final class ShardTransactionMessages {
       }
       /**
        * <code>required .org.opendaylight.controller.mdsal.InstanceIdentifier instanceIdentifierPathArguments = 1;</code>
+       *
+       * <pre>
+       * base Helium version
+       * </pre>
        */
       public org.opendaylight.controller.protobuff.messages.common.NormalizedNodeMessages.InstanceIdentifierOrBuilder getInstanceIdentifierPathArgumentsOrBuilder() {
         if (instanceIdentifierPathArgumentsBuilder_ != null) {
@@ -5405,6 +5461,10 @@ public final class ShardTransactionMessages {
       }
       /**
        * <code>required .org.opendaylight.controller.mdsal.InstanceIdentifier instanceIdentifierPathArguments = 1;</code>
+       *
+       * <pre>
+       * base Helium version
+       * </pre>
        */
       private com.google.protobuf.SingleFieldBuilder<
           org.opendaylight.controller.protobuff.messages.common.NormalizedNodeMessages.InstanceIdentifier, org.opendaylight.controller.protobuff.messages.common.NormalizedNodeMessages.InstanceIdentifier.Builder, org.opendaylight.controller.protobuff.messages.common.NormalizedNodeMessages.InstanceIdentifierOrBuilder>
@@ -5863,14 +5923,26 @@ public final class ShardTransactionMessages {
     // required .org.opendaylight.controller.mdsal.InstanceIdentifier instanceIdentifierPathArguments = 1;
     /**
      * <code>required .org.opendaylight.controller.mdsal.InstanceIdentifier instanceIdentifierPathArguments = 1;</code>
+     *
+     * <pre>
+     * base Helium version
+     * </pre>
      */
     boolean hasInstanceIdentifierPathArguments();
     /**
      * <code>required .org.opendaylight.controller.mdsal.InstanceIdentifier instanceIdentifierPathArguments = 1;</code>
+     *
+     * <pre>
+     * base Helium version
+     * </pre>
      */
     org.opendaylight.controller.protobuff.messages.common.NormalizedNodeMessages.InstanceIdentifier getInstanceIdentifierPathArguments();
     /**
      * <code>required .org.opendaylight.controller.mdsal.InstanceIdentifier instanceIdentifierPathArguments = 1;</code>
+     *
+     * <pre>
+     * base Helium version
+     * </pre>
      */
     org.opendaylight.controller.protobuff.messages.common.NormalizedNodeMessages.InstanceIdentifierOrBuilder getInstanceIdentifierPathArgumentsOrBuilder();
 
@@ -6010,18 +6082,30 @@ public final class ShardTransactionMessages {
     private org.opendaylight.controller.protobuff.messages.common.NormalizedNodeMessages.InstanceIdentifier instanceIdentifierPathArguments_;
     /**
      * <code>required .org.opendaylight.controller.mdsal.InstanceIdentifier instanceIdentifierPathArguments = 1;</code>
+     *
+     * <pre>
+     * base Helium version
+     * </pre>
      */
     public boolean hasInstanceIdentifierPathArguments() {
       return ((bitField0_ & 0x00000001) == 0x00000001);
     }
     /**
      * <code>required .org.opendaylight.controller.mdsal.InstanceIdentifier instanceIdentifierPathArguments = 1;</code>
+     *
+     * <pre>
+     * base Helium version
+     * </pre>
      */
     public org.opendaylight.controller.protobuff.messages.common.NormalizedNodeMessages.InstanceIdentifier getInstanceIdentifierPathArguments() {
       return instanceIdentifierPathArguments_;
     }
     /**
      * <code>required .org.opendaylight.controller.mdsal.InstanceIdentifier instanceIdentifierPathArguments = 1;</code>
+     *
+     * <pre>
+     * base Helium version
+     * </pre>
      */
     public org.opendaylight.controller.protobuff.messages.common.NormalizedNodeMessages.InstanceIdentifierOrBuilder getInstanceIdentifierPathArgumentsOrBuilder() {
       return instanceIdentifierPathArguments_;
@@ -6349,12 +6433,20 @@ public final class ShardTransactionMessages {
           org.opendaylight.controller.protobuff.messages.common.NormalizedNodeMessages.InstanceIdentifier, org.opendaylight.controller.protobuff.messages.common.NormalizedNodeMessages.InstanceIdentifier.Builder, org.opendaylight.controller.protobuff.messages.common.NormalizedNodeMessages.InstanceIdentifierOrBuilder> instanceIdentifierPathArgumentsBuilder_;
       /**
        * <code>required .org.opendaylight.controller.mdsal.InstanceIdentifier instanceIdentifierPathArguments = 1;</code>
+       *
+       * <pre>
+       * base Helium version
+       * </pre>
        */
       public boolean hasInstanceIdentifierPathArguments() {
         return ((bitField0_ & 0x00000001) == 0x00000001);
       }
       /**
        * <code>required .org.opendaylight.controller.mdsal.InstanceIdentifier instanceIdentifierPathArguments = 1;</code>
+       *
+       * <pre>
+       * base Helium version
+       * </pre>
        */
       public org.opendaylight.controller.protobuff.messages.common.NormalizedNodeMessages.InstanceIdentifier getInstanceIdentifierPathArguments() {
         if (instanceIdentifierPathArgumentsBuilder_ == null) {
@@ -6365,6 +6457,10 @@ public final class ShardTransactionMessages {
       }
       /**
        * <code>required .org.opendaylight.controller.mdsal.InstanceIdentifier instanceIdentifierPathArguments = 1;</code>
+       *
+       * <pre>
+       * base Helium version
+       * </pre>
        */
       public Builder setInstanceIdentifierPathArguments(org.opendaylight.controller.protobuff.messages.common.NormalizedNodeMessages.InstanceIdentifier value) {
         if (instanceIdentifierPathArgumentsBuilder_ == null) {
@@ -6381,6 +6477,10 @@ public final class ShardTransactionMessages {
       }
       /**
        * <code>required .org.opendaylight.controller.mdsal.InstanceIdentifier instanceIdentifierPathArguments = 1;</code>
+       *
+       * <pre>
+       * base Helium version
+       * </pre>
        */
       public Builder setInstanceIdentifierPathArguments(
           org.opendaylight.controller.protobuff.messages.common.NormalizedNodeMessages.InstanceIdentifier.Builder builderForValue) {
@@ -6395,6 +6495,10 @@ public final class ShardTransactionMessages {
       }
       /**
        * <code>required .org.opendaylight.controller.mdsal.InstanceIdentifier instanceIdentifierPathArguments = 1;</code>
+       *
+       * <pre>
+       * base Helium version
+       * </pre>
        */
       public Builder mergeInstanceIdentifierPathArguments(org.opendaylight.controller.protobuff.messages.common.NormalizedNodeMessages.InstanceIdentifier value) {
         if (instanceIdentifierPathArgumentsBuilder_ == null) {
@@ -6414,6 +6518,10 @@ public final class ShardTransactionMessages {
       }
       /**
        * <code>required .org.opendaylight.controller.mdsal.InstanceIdentifier instanceIdentifierPathArguments = 1;</code>
+       *
+       * <pre>
+       * base Helium version
+       * </pre>
        */
       public Builder clearInstanceIdentifierPathArguments() {
         if (instanceIdentifierPathArgumentsBuilder_ == null) {
@@ -6427,6 +6535,10 @@ public final class ShardTransactionMessages {
       }
       /**
        * <code>required .org.opendaylight.controller.mdsal.InstanceIdentifier instanceIdentifierPathArguments = 1;</code>
+       *
+       * <pre>
+       * base Helium version
+       * </pre>
        */
       public org.opendaylight.controller.protobuff.messages.common.NormalizedNodeMessages.InstanceIdentifier.Builder getInstanceIdentifierPathArgumentsBuilder() {
         bitField0_ |= 0x00000001;
@@ -6435,6 +6547,10 @@ public final class ShardTransactionMessages {
       }
       /**
        * <code>required .org.opendaylight.controller.mdsal.InstanceIdentifier instanceIdentifierPathArguments = 1;</code>
+       *
+       * <pre>
+       * base Helium version
+       * </pre>
        */
       public org.opendaylight.controller.protobuff.messages.common.NormalizedNodeMessages.InstanceIdentifierOrBuilder getInstanceIdentifierPathArgumentsOrBuilder() {
         if (instanceIdentifierPathArgumentsBuilder_ != null) {
@@ -6445,6 +6561,10 @@ public final class ShardTransactionMessages {
       }
       /**
        * <code>required .org.opendaylight.controller.mdsal.InstanceIdentifier instanceIdentifierPathArguments = 1;</code>
+       *
+       * <pre>
+       * base Helium version
+       * </pre>
        */
       private com.google.protobuf.SingleFieldBuilder<
           org.opendaylight.controller.protobuff.messages.common.NormalizedNodeMessages.InstanceIdentifier, org.opendaylight.controller.protobuff.messages.common.NormalizedNodeMessages.InstanceIdentifier.Builder, org.opendaylight.controller.protobuff.messages.common.NormalizedNodeMessages.InstanceIdentifierOrBuilder>
index c5e4ee45c037df8d3f03c6c3b2e13015c8317ef6..ac9cb2203318d2b9e287e87714227a1d2a60e87d 100644 (file)
@@ -49,9 +49,9 @@ message ReadDataReply{
 }
 
 message WriteData {
- required InstanceIdentifier instanceIdentifierPathArguments = 1;
-required Node normalizedNode =2;
-
+  // base Helium version
+  required InstanceIdentifier instanceIdentifierPathArguments = 1;
+  required Node normalizedNode = 2;
 }
 
 message WriteDataReply{
@@ -59,9 +59,9 @@ message WriteDataReply{
 }
 
 message MergeData {
- required InstanceIdentifier instanceIdentifierPathArguments = 1;
-required Node normalizedNode =2;
-
+  // base Helium version
+  required InstanceIdentifier instanceIdentifierPathArguments = 1;
+  required Node normalizedNode = 2;
 }
 
 message MergeDataReply{
index 6f14af304f403e8340ea904bbbf22a0a1d40673d..1bc835f1e3c6883196d4a9a6b1f334bb16f9dca7 100644 (file)
@@ -71,7 +71,7 @@ public class DataChangeListener extends AbstractUntypedActor {
         // It seems the sender is never null but it doesn't hurt to check. If the caller passes in
         // a null sender (ActorRef.noSender()), akka translates that to the deadLetters actor.
         if(getSender() != null && !getContext().system().deadLetters().equals(getSender())) {
-            getSender().tell(new DataChangedReply(), getSelf());
+            getSender().tell(DataChangedReply.INSTANCE, getSelf());
         }
     }
 
index 1a0ee8c2fa6866dfdab97cfeab7087ce37d52ca6..afec1a07d484d0852abe626d1a387e607014b8c6 100644 (file)
@@ -16,22 +16,21 @@ import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeEvent;
 import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeListener;
 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
-import org.opendaylight.yangtools.yang.model.api.SchemaContext;
 
 /**
  * DataChangeListenerProxy represents a single remote DataChangeListener
  */
 public class DataChangeListenerProxy implements AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>>{
     private final ActorSelection dataChangeListenerActor;
-    private final SchemaContext schemaContext;
 
-    public DataChangeListenerProxy(SchemaContext schemaContext, ActorSelection dataChangeListenerActor) {
-        this.dataChangeListenerActor = Preconditions.checkNotNull(dataChangeListenerActor, "dataChangeListenerActor should not be null");
-        this.schemaContext = schemaContext;
+    public DataChangeListenerProxy(ActorSelection dataChangeListenerActor) {
+        this.dataChangeListenerActor = Preconditions.checkNotNull(dataChangeListenerActor,
+                "dataChangeListenerActor should not be null");
     }
 
-    @Override public void onDataChanged(
+    @Override
+    public void onDataChanged(
         AsyncDataChangeEvent<YangInstanceIdentifier, NormalizedNode<?, ?>> change) {
-        dataChangeListenerActor.tell(new DataChanged(schemaContext, change), ActorRef.noSender());
+        dataChangeListenerActor.tell(new DataChanged(change), ActorRef.noSender());
     }
 }
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DataStoreVersions.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DataStoreVersions.java
new file mode 100644 (file)
index 0000000..1f22222
--- /dev/null
@@ -0,0 +1,21 @@
+/*
+ * Copyright (c) 2014 Brocade Communications Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.datastore;
+
+/**
+ * Defines version numbers.
+ *
+ * @author Thomas Pantelis
+ */
+public interface DataStoreVersions {
+    short BASE_HELIUM_VERSION = 0;
+    short HELIUM_1_VERSION = 1;
+    short HELIUM_2_VERSION = 2;
+    short LITHIUM_VERSION = 3;
+    short CURRENT_VERSION = LITHIUM_VERSION;
+}
index 7ef6e040a9f3f0d94bf2fdc47790377d505184b3..1661bb4b5dc95477f9e10c413171bc8d5ba03fb8 100644 (file)
@@ -95,8 +95,6 @@ import scala.concurrent.duration.FiniteDuration;
  */
 public class Shard extends RaftActor {
 
-    private static final Object COMMIT_TRANSACTION_REPLY = new CommitTransactionReply().toSerializable();
-
     private static final Object TX_COMMIT_TIMEOUT_CHECK_MESSAGE = "txCommitTimeoutCheck";
 
     public static final String DEFAULT_NAME = "default";
@@ -354,7 +352,7 @@ public class Shard extends RaftActor {
             cohortEntry = commitCoordinator.getAndRemoveCohortEntry(transactionID);
             if(cohortEntry != null) {
                 commitWithNewTransaction(cohortEntry.getModification());
-                sender.tell(COMMIT_TRANSACTION_REPLY, getSelf());
+                sender.tell(CommitTransactionReply.INSTANCE.toSerializable(), getSelf());
             } else {
                 // This really shouldn't happen - it likely means that persistence or replication
                 // took so long to complete such that the cohort entry was expired from the cache.
@@ -376,7 +374,7 @@ public class Shard extends RaftActor {
             // currently uses a same thread executor anyway.
             cohortEntry.getCohort().commit().get();
 
-            sender.tell(COMMIT_TRANSACTION_REPLY, getSelf());
+            sender.tell(CommitTransactionReply.INSTANCE.toSerializable(), getSelf());
 
             shardMBean.incrementCommittedTransactionCount();
             shardMBean.setLastCommittedTransactionTime(System.currentTimeMillis());
@@ -412,7 +410,7 @@ public class Shard extends RaftActor {
         // transactionId so to maintain backwards compatibility, we create a separate cohort actor
         // to provide the compatible behavior.
         ActorRef replyActorPath = self();
-        if(ready.getTxnClientVersion() < CreateTransaction.HELIUM_1_VERSION) {
+        if(ready.getTxnClientVersion() < DataStoreVersions.HELIUM_1_VERSION) {
             LOG.debug("Creating BackwardsCompatibleThreePhaseCommitCohort");
             replyActorPath = getContext().actorOf(BackwardsCompatibleThreePhaseCommitCohort.props(
                     ready.getTransactionID()));
@@ -447,7 +445,7 @@ public class Shard extends RaftActor {
                     shardMBean.incrementAbortTransactionsCount();
 
                     if(sender != null) {
-                        sender.tell(new AbortTransactionReply().toSerializable(), self);
+                        sender.tell(AbortTransactionReply.INSTANCE.toSerializable(), self);
                     }
                 }
 
@@ -480,7 +478,7 @@ public class Shard extends RaftActor {
         // This must be for install snapshot. Don't want to open this up and trigger
         // deSerialization
 
-        self().tell(new CaptureSnapshotReply(ReadDataReply.getNormalizedNodeByteString(message)),
+        self().tell(new CaptureSnapshotReply(ReadDataReply.fromSerializableAsByteString(message)),
                 self());
 
         createSnapshotTransaction = null;
@@ -500,7 +498,8 @@ public class Shard extends RaftActor {
     }
 
     private ActorRef createTypedTransactionActor(int transactionType,
-            ShardTransactionIdentifier transactionId, String transactionChainId, int clientVersion ) {
+            ShardTransactionIdentifier transactionId, String transactionChainId,
+            short clientVersion ) {
 
         DOMStoreTransactionFactory factory = store;
 
@@ -568,7 +567,7 @@ public class Shard extends RaftActor {
     }
 
     private ActorRef createTransaction(int transactionType, String remoteTransactionId,
-            String transactionChainId, int clientVersion) {
+            String transactionChainId, short clientVersion) {
 
         ShardTransactionIdentifier transactionId =
             ShardTransactionIdentifier.builder()
@@ -659,7 +658,7 @@ public class Shard extends RaftActor {
         dataChangeListeners.add(dataChangeListenerPath);
 
         AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>> listener =
-                new DataChangeListenerProxy(schemaContext, dataChangeListenerPath);
+                new DataChangeListenerProxy(dataChangeListenerPath);
 
         LOG.debug("Registering for path {}", registerChangeListener.getPath());
 
@@ -818,7 +817,7 @@ public class Shard extends RaftActor {
             createSnapshotTransaction = createTransaction(
                 TransactionProxy.TransactionType.READ_ONLY.ordinal(),
                 "createSnapshot" + ++createSnapshotTransactionCounter, "",
-                CreateTransaction.CURRENT_VERSION);
+                DataStoreVersions.CURRENT_VERSION);
 
             createSnapshotTransaction.tell(
                 new ReadData(YangInstanceIdentifier.builder().build()).toSerializable(), self());
index f3b4e416403b0594a22da9ff47de2f68a1d284cc..19fa26682e2a4cea7b637fda85064a3aea0226e5 100644 (file)
@@ -7,6 +7,10 @@
  */
 package org.opendaylight.controller.cluster.datastore;
 
+import akka.actor.ActorRef;
+import akka.actor.Status;
+import com.google.common.cache.Cache;
+import com.google.common.cache.CacheBuilder;
 import java.util.LinkedList;
 import java.util.Queue;
 import java.util.concurrent.ExecutionException;
@@ -17,10 +21,6 @@ import org.opendaylight.controller.cluster.datastore.modification.Modification;
 import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-import akka.actor.ActorRef;
-import akka.actor.Status;
-import com.google.common.cache.Cache;
-import com.google.common.cache.CacheBuilder;
 
 /**
  * Coordinates commits for a shard ensuring only one concurrent 3-phase commit.
@@ -31,12 +31,6 @@ public class ShardCommitCoordinator {
 
     private static final Logger LOG = LoggerFactory.getLogger(ShardCommitCoordinator.class);
 
-    private static final Object CAN_COMMIT_REPLY_TRUE =
-            new CanCommitTransactionReply(Boolean.TRUE).toSerializable();
-
-    private static final Object CAN_COMMIT_REPLY_FALSE =
-            new CanCommitTransactionReply(Boolean.FALSE).toSerializable();
-
     private final Cache<String, CohortEntry> cohortCache;
 
     private CohortEntry currentCohortEntry;
@@ -138,7 +132,8 @@ public class ShardCommitCoordinator {
             Boolean canCommit = cohortEntry.getCohort().canCommit().get();
 
             cohortEntry.getCanCommitSender().tell(
-                    canCommit ? CAN_COMMIT_REPLY_TRUE : CAN_COMMIT_REPLY_FALSE, cohortEntry.getShard());
+                    canCommit ? CanCommitTransactionReply.YES.toSerializable() :
+                        CanCommitTransactionReply.NO.toSerializable(), cohortEntry.getShard());
 
             if(!canCommit) {
                 // Remove the entry from the cache now since the Tx will be aborted.
index 9d8f57252a2fc43bd316f213c9f8b68f4f4c453f..be9c4d80e311484d0e6edea2050d85a0ebf28d1f 100644 (file)
@@ -27,8 +27,8 @@ public class ShardReadTransaction extends ShardTransaction {
 
     public ShardReadTransaction(DOMStoreReadTransaction transaction, ActorRef shardActor,
             SchemaContext schemaContext, ShardStats shardStats, String transactionID,
-            int txnClientVersion) {
-        super(shardActor, schemaContext, shardStats, transactionID, txnClientVersion);
+            short clientTxVersion) {
+        super(shardActor, schemaContext, shardStats, transactionID, clientTxVersion);
         this.transaction = transaction;
     }
 
index e558677ebbaba5598d1ca84875a6402764887f34..b394da88e853f4387fb2cbc70ff3b93ab5436999 100644 (file)
@@ -11,7 +11,6 @@
 package org.opendaylight.controller.cluster.datastore;
 
 import akka.actor.ActorRef;
-
 import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shard.ShardStats;
 import org.opendaylight.controller.cluster.datastore.messages.DataExists;
 import org.opendaylight.controller.cluster.datastore.messages.ReadData;
@@ -27,8 +26,8 @@ public class ShardReadWriteTransaction extends ShardWriteTransaction {
 
     public ShardReadWriteTransaction(DOMStoreReadWriteTransaction transaction, ActorRef shardActor,
             SchemaContext schemaContext, ShardStats shardStats, String transactionID,
-            int txnClientVersion) {
-        super(transaction, shardActor, schemaContext, shardStats, transactionID, txnClientVersion);
+            short clientTxVersion) {
+        super(transaction, shardActor, schemaContext, shardStats, transactionID, clientTxVersion);
         this.transaction = transaction;
     }
 
index 59bb4bfd77892e1a915563b3d435150de27c4ca1..678b7815693382179328a8c13adb567d80d61b13 100644 (file)
@@ -63,21 +63,21 @@ public abstract class ShardTransaction extends AbstractUntypedActorWithMetering
     private final SchemaContext schemaContext;
     private final ShardStats shardStats;
     private final String transactionID;
-    private final int txnClientVersion;
+    private final short clientTxVersion;
 
     protected ShardTransaction(ActorRef shardActor, SchemaContext schemaContext,
-            ShardStats shardStats, String transactionID, int txnClientVersion) {
+            ShardStats shardStats, String transactionID, short clientTxVersion) {
         super("shard-tx"); //actor name override used for metering. This does not change the "real" actor name
         this.shardActor = shardActor;
         this.schemaContext = schemaContext;
         this.shardStats = shardStats;
         this.transactionID = transactionID;
-        this.txnClientVersion = txnClientVersion;
+        this.clientTxVersion = clientTxVersion;
     }
 
     public static Props props(DOMStoreTransaction transaction, ActorRef shardActor,
             SchemaContext schemaContext,DatastoreContext datastoreContext, ShardStats shardStats,
-            String transactionID, int txnClientVersion) {
+            String transactionID, short txnClientVersion) {
         return Props.create(new ShardTransactionCreator(transaction, shardActor, schemaContext,
            datastoreContext, shardStats, transactionID, txnClientVersion));
     }
@@ -96,8 +96,8 @@ public abstract class ShardTransaction extends AbstractUntypedActorWithMetering
         return schemaContext;
     }
 
-    protected int getTxnClientVersion() {
-        return txnClientVersion;
+    protected short getClientTxVersion() {
+        return clientTxVersion;
     }
 
     @Override
@@ -118,28 +118,28 @@ public abstract class ShardTransaction extends AbstractUntypedActorWithMetering
         getDOMStoreTransaction().close();
 
         if(sendReply) {
-            getSender().tell(new CloseTransactionReply().toSerializable(), getSelf());
+            getSender().tell(CloseTransactionReply.INSTANCE.toSerializable(), getSelf());
         }
 
         getSelf().tell(PoisonPill.getInstance(), getSelf());
     }
 
-    protected void readData(DOMStoreReadTransaction transaction, ReadData message, final boolean returnSerialized) {
+    protected void readData(DOMStoreReadTransaction transaction, ReadData message,
+            final boolean returnSerialized) {
         final ActorRef sender = getSender();
         final ActorRef self = getSelf();
         final YangInstanceIdentifier path = message.getPath();
         final CheckedFuture<Optional<NormalizedNode<?, ?>>, ReadFailedException> future =
                 transaction.read(path);
 
-
         future.addListener(new Runnable() {
             @Override
             public void run() {
                 try {
                     Optional<NormalizedNode<?, ?>> optional = future.checkedGet();
-                    ReadDataReply readDataReply = new ReadDataReply(schemaContext, optional.orNull());
+                    ReadDataReply readDataReply = new ReadDataReply(optional.orNull());
 
-                    sender.tell((returnSerialized ? readDataReply.toSerializable():
+                    sender.tell((returnSerialized ? readDataReply.toSerializable(clientTxVersion):
                         readDataReply), self);
 
                 } catch (Exception e) {
@@ -176,11 +176,11 @@ public abstract class ShardTransaction extends AbstractUntypedActorWithMetering
         final DatastoreContext datastoreContext;
         final ShardStats shardStats;
         final String transactionID;
-        final int txnClientVersion;
+        final short txnClientVersion;
 
         ShardTransactionCreator(DOMStoreTransaction transaction, ActorRef shardActor,
                 SchemaContext schemaContext, DatastoreContext datastoreContext,
-                ShardStats shardStats, String transactionID, int txnClientVersion) {
+                ShardStats shardStats, String transactionID, short txnClientVersion) {
             this.transaction = transaction;
             this.shardActor = shardActor;
             this.shardStats = shardStats;
index 78c6a558f4f291bca277c2793676f01fc6686cc7..8ba613958a9a5dfc1e0e2a9b45b921c3b6243b4e 100644 (file)
@@ -12,7 +12,6 @@ import akka.actor.ActorRef;
 import akka.actor.Props;
 import akka.japi.Creator;
 import org.opendaylight.controller.cluster.common.actor.AbstractUntypedActor;
-
 import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shard.ShardStats;
 import org.opendaylight.controller.cluster.datastore.messages.CloseTransactionChain;
 import org.opendaylight.controller.cluster.datastore.messages.CloseTransactionChainReply;
@@ -46,7 +45,7 @@ public class ShardTransactionChain extends AbstractUntypedActor {
             createTransaction(createTransaction);
         } else if (message.getClass().equals(CloseTransactionChain.SERIALIZABLE_CLASS)) {
             chain.close();
-            getSender().tell(new CloseTransactionChainReply().toSerializable(), getSelf());
+            getSender().tell(CloseTransactionChainReply.INSTANCE.toSerializable(), getSelf());
         }else{
             unknownMessage(message);
         }
index 44f2c7bd0a2794715d09cb29bb7a5ce42b352344..2e43219523e0d34b5b015b0ef0fc39370dbf89b8 100644 (file)
@@ -43,8 +43,8 @@ public class ShardWriteTransaction extends ShardTransaction {
 
     public ShardWriteTransaction(DOMStoreWriteTransaction transaction, ActorRef shardActor,
             SchemaContext schemaContext, ShardStats shardStats, String transactionID,
-            int txnClientVersion) {
-        super(shardActor, schemaContext, shardStats, transactionID, txnClientVersion);
+            short clientTxVersion) {
+        super(shardActor, schemaContext, shardStats, transactionID, clientTxVersion);
         this.transaction = transaction;
     }
 
@@ -66,19 +66,19 @@ public class ShardWriteTransaction extends ShardTransaction {
             deleteData(transaction, (DeleteData) message, !SERIALIZED_REPLY);
 
         } else if (message instanceof ReadyTransaction) {
-            readyTransaction(transaction, new ReadyTransaction(), !SERIALIZED_REPLY);
+            readyTransaction(transaction, !SERIALIZED_REPLY);
 
-        } else if(WriteData.SERIALIZABLE_CLASS.equals(message.getClass())) {
-            writeData(transaction, WriteData.fromSerializable(message, getSchemaContext()), SERIALIZED_REPLY);
+        } else if(WriteData.isSerializedType(message)) {
+            writeData(transaction, WriteData.fromSerializable(message), SERIALIZED_REPLY);
 
-        } else if(MergeData.SERIALIZABLE_CLASS.equals(message.getClass())) {
-            mergeData(transaction, MergeData.fromSerializable(message, getSchemaContext()), SERIALIZED_REPLY);
+        } else if(MergeData.isSerializedType(message)) {
+            mergeData(transaction, MergeData.fromSerializable(message), SERIALIZED_REPLY);
 
         } else if(DeleteData.SERIALIZABLE_CLASS.equals(message.getClass())) {
             deleteData(transaction, DeleteData.fromSerializable(message), SERIALIZED_REPLY);
 
         } else if(ReadyTransaction.SERIALIZABLE_CLASS.equals(message.getClass())) {
-            readyTransaction(transaction, new ReadyTransaction(), SERIALIZED_REPLY);
+            readyTransaction(transaction, SERIALIZED_REPLY);
 
         } else if (message instanceof GetCompositedModification) {
             // This is here for testing only
@@ -97,9 +97,9 @@ public class ShardWriteTransaction extends ShardTransaction {
                 new WriteModification(message.getPath(), message.getData(), getSchemaContext()));
         try {
             transaction.write(message.getPath(), message.getData());
-            WriteDataReply writeDataReply = new WriteDataReply();
-            getSender().tell(returnSerialized ? writeDataReply.toSerializable() : writeDataReply,
-                getSelf());
+            WriteDataReply writeDataReply = WriteDataReply.INSTANCE;
+            getSender().tell(returnSerialized ? writeDataReply.toSerializable(message.getVersion()) :
+                writeDataReply, getSelf());
         }catch(Exception e){
             getSender().tell(new akka.actor.Status.Failure(e), getSelf());
         }
@@ -114,9 +114,9 @@ public class ShardWriteTransaction extends ShardTransaction {
 
         try {
             transaction.merge(message.getPath(), message.getData());
-            MergeDataReply mergeDataReply = new MergeDataReply();
-            getSender().tell(returnSerialized ? mergeDataReply.toSerializable() : mergeDataReply ,
-                getSelf());
+            MergeDataReply mergeDataReply = MergeDataReply.INSTANCE;
+            getSender().tell(returnSerialized ? mergeDataReply.toSerializable(message.getVersion()) :
+                mergeDataReply, getSelf());
         }catch(Exception e){
             getSender().tell(new akka.actor.Status.Failure(e), getSelf());
         }
@@ -137,15 +137,14 @@ public class ShardWriteTransaction extends ShardTransaction {
         }
     }
 
-    private void readyTransaction(DOMStoreWriteTransaction transaction, ReadyTransaction message,
-            boolean returnSerialized) {
+    private void readyTransaction(DOMStoreWriteTransaction transaction, boolean returnSerialized) {
         String transactionID = getTransactionID();
 
         LOG.debug("readyTransaction : {}", transactionID);
 
         DOMStoreThreePhaseCommitCohort cohort =  transaction.ready();
 
-        getShardActor().forward(new ForwardedReadyTransaction(transactionID, getTxnClientVersion(),
+        getShardActor().forward(new ForwardedReadyTransaction(transactionID, getClientTxVersion(),
                 cohort, modification, returnSerialized), getContext());
 
         // The shard will handle the commit from here so we're no longer needed - self-destruct.
index 7703f484c73e687391ff5dd9ffe1739afd201c0f..f34e88fb279c0262516dfb9fa6cdd8672c65003c 100644 (file)
@@ -43,6 +43,7 @@ import org.opendaylight.controller.cluster.datastore.messages.ReadDataReply;
 import org.opendaylight.controller.cluster.datastore.messages.ReadyTransaction;
 import org.opendaylight.controller.cluster.datastore.messages.ReadyTransactionReply;
 import org.opendaylight.controller.cluster.datastore.messages.SerializableMessage;
+import org.opendaylight.controller.cluster.datastore.messages.VersionedSerializableMessage;
 import org.opendaylight.controller.cluster.datastore.messages.WriteData;
 import org.opendaylight.controller.cluster.datastore.shardstrategy.ShardStrategyFactory;
 import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
@@ -157,8 +158,7 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction {
             if(remoteTransactionActorsMB.get()) {
                 for(ActorSelection actor : remoteTransactionActors) {
                     LOG.trace("Sending CloseTransaction to {}", actor);
-                    actorContext.sendOperationAsync(actor,
-                            new CloseTransaction().toSerializable());
+                    actorContext.sendOperationAsync(actor, CloseTransaction.INSTANCE.toSerializable());
                 }
             }
         }
@@ -617,10 +617,6 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction {
             }
         }
 
-
-
-
-
         /**
          * Performs a CreateTransaction try async.
          */
@@ -763,11 +759,11 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction {
         private final String transactionPath;
         private final ActorSelection actor;
         private final boolean isTxActorLocal;
-        private final int remoteTransactionVersion;
+        private final short remoteTransactionVersion;
 
         private TransactionContextImpl(String transactionPath, ActorSelection actor, TransactionIdentifier identifier,
                 ActorContext actorContext, SchemaContext schemaContext,
-                boolean isTxActorLocal, int remoteTransactionVersion) {
+                boolean isTxActorLocal, short remoteTransactionVersion) {
             super(identifier);
             this.transactionPath = transactionPath;
             this.actor = actor;
@@ -785,11 +781,16 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction {
             return actorContext.executeOperationAsync(getActor(), isTxActorLocal ? msg : msg.toSerializable());
         }
 
+        private Future<Object> executeOperationAsync(VersionedSerializableMessage msg) {
+            return actorContext.executeOperationAsync(getActor(), isTxActorLocal ? msg :
+                msg.toSerializable(remoteTransactionVersion));
+        }
+
         @Override
         public void closeTransaction() {
             LOG.debug("Tx {} closeTransaction called", identifier);
 
-            actorContext.sendOperationAsync(getActor(), new CloseTransaction().toSerializable());
+            actorContext.sendOperationAsync(getActor(), CloseTransaction.INSTANCE.toSerializable());
         }
 
         @Override
@@ -799,7 +800,7 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction {
 
             // Send the ReadyTransaction message to the Tx actor.
 
-            final Future<Object> replyFuture = executeOperationAsync(new ReadyTransaction());
+            final Future<Object> replyFuture = executeOperationAsync(ReadyTransaction.INSTANCE);
 
             // Combine all the previously recorded put/merge/delete operation reply Futures and the
             // ReadyTransactionReply Future into one Future. If any one fails then the combined
@@ -846,7 +847,8 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction {
                         // At some point in the future when upgrades from Helium are not supported
                         // we could remove this code to resolvePath and just use the cohortPath as the
                         // resolved cohortPath
-                        if(TransactionContextImpl.this.remoteTransactionVersion < CreateTransaction.HELIUM_1_VERSION) {
+                        if(TransactionContextImpl.this.remoteTransactionVersion <
+                                DataStoreVersions.HELIUM_1_VERSION) {
                             cohortPath = actorContext.resolvePath(transactionPath, cohortPath);
                         }
 
@@ -872,14 +874,14 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction {
         public void mergeData(YangInstanceIdentifier path, NormalizedNode<?, ?> data) {
             LOG.debug("Tx {} mergeData called path = {}", identifier, path);
 
-            recordedOperationFutures.add(executeOperationAsync(new MergeData(path, data, schemaContext)));
+            recordedOperationFutures.add(executeOperationAsync(new MergeData(path, data)));
         }
 
         @Override
         public void writeData(YangInstanceIdentifier path, NormalizedNode<?, ?> data) {
             LOG.debug("Tx {} writeData called path = {}", identifier, path);
 
-            recordedOperationFutures.add(executeOperationAsync(new WriteData(path, data, schemaContext)));
+            recordedOperationFutures.add(executeOperationAsync(new WriteData(path, data)));
         }
 
         @Override
@@ -950,8 +952,8 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction {
                             ReadDataReply reply = (ReadDataReply) readResponse;
                             returnFuture.set(Optional.<NormalizedNode<?, ?>>fromNullable(reply.getNormalizedNode()));
 
-                        } else if (readResponse.getClass().equals(ReadDataReply.SERIALIZABLE_CLASS)) {
-                            ReadDataReply reply = ReadDataReply.fromSerializable(schemaContext, path, readResponse);
+                        } else if (ReadDataReply.isSerializedType(readResponse)) {
+                            ReadDataReply reply = ReadDataReply.fromSerializable(readResponse);
                             returnFuture.set(Optional.<NormalizedNode<?, ?>>fromNullable(reply.getNormalizedNode()));
 
                         } else {
index 79c6b036fa997c94fef08517335d4315cae0629f..3680aec4f36035abbdb36b4b84f7fda93375fc2d 100644 (file)
@@ -14,8 +14,13 @@ public class AbortTransactionReply implements SerializableMessage {
     public static final Class<ThreePhaseCommitCohortMessages.AbortTransactionReply> SERIALIZABLE_CLASS =
             ThreePhaseCommitCohortMessages.AbortTransactionReply.class;
 
+    private static final Object SERIALIZED_INSTANCE =
+            ThreePhaseCommitCohortMessages.AbortTransactionReply.newBuilder().build();
+
+    public static final AbortTransactionReply INSTANCE = new AbortTransactionReply();
+
     @Override
     public Object toSerializable() {
-        return ThreePhaseCommitCohortMessages.AbortTransactionReply.newBuilder().build();
+        return SERIALIZED_INSTANCE;
     }
 }
index 4d121bae0adbc1aa0dee6d2888b0982293c70810..7db4846ef4ff69c4c318ab459a07cc09b7943a47 100644 (file)
@@ -14,23 +14,30 @@ public class CanCommitTransactionReply implements SerializableMessage {
     public static final Class<ThreePhaseCommitCohortMessages.CanCommitTransactionReply> SERIALIZABLE_CLASS =
             ThreePhaseCommitCohortMessages.CanCommitTransactionReply.class;
 
-    private final Boolean canCommit;
+    public static final CanCommitTransactionReply YES = new CanCommitTransactionReply(true);
+    public static final CanCommitTransactionReply NO = new CanCommitTransactionReply(false);
 
-    public CanCommitTransactionReply(final Boolean canCommit) {
+    private final boolean canCommit;
+    private final Object serializedMessage;
+
+    private CanCommitTransactionReply(final boolean canCommit) {
         this.canCommit = canCommit;
+        this.serializedMessage = ThreePhaseCommitCohortMessages.CanCommitTransactionReply.newBuilder().
+                setCanCommit(canCommit).build();
     }
 
-    public Boolean getCanCommit() {
+    public boolean getCanCommit() {
         return canCommit;
     }
 
     @Override
     public Object toSerializable() {
-        return ThreePhaseCommitCohortMessages.CanCommitTransactionReply.newBuilder().setCanCommit(canCommit).build();
+        return serializedMessage;
     }
 
     public static CanCommitTransactionReply fromSerializable(final Object message) {
-        return new CanCommitTransactionReply(
-                ((ThreePhaseCommitCohortMessages.CanCommitTransactionReply) message).getCanCommit());
+        ThreePhaseCommitCohortMessages.CanCommitTransactionReply serialized =
+                (ThreePhaseCommitCohortMessages.CanCommitTransactionReply) message;
+        return serialized.getCanCommit() ? YES : NO;
     }
 }
index c73111f2dbde517989214c7bd5a17145df63bc2c..ef1aac8d4b81ae4b6f1f4bee125b7dfb130d14e6 100644 (file)
@@ -11,10 +11,16 @@ package org.opendaylight.controller.cluster.datastore.messages;
 import org.opendaylight.controller.protobuff.messages.transaction.ShardTransactionMessages;
 
 public class CloseTransaction implements SerializableMessage{
-  public static final Class<ShardTransactionMessages.CloseTransaction> SERIALIZABLE_CLASS =
-          ShardTransactionMessages.CloseTransaction.class;
-  @Override
-  public Object toSerializable() {
-    return ShardTransactionMessages.CloseTransaction.newBuilder().build();
-  }
+    public static final Class<ShardTransactionMessages.CloseTransaction> SERIALIZABLE_CLASS =
+            ShardTransactionMessages.CloseTransaction.class;
+
+    private static final Object SERIALIZED_INSTANCE =
+            ShardTransactionMessages.CloseTransaction.newBuilder().build();
+
+    public static final CloseTransaction INSTANCE = new CloseTransaction();
+
+    @Override
+    public Object toSerializable() {
+        return SERIALIZED_INSTANCE;
+    }
 }
index c001ae185a6dc9641f08f2440d7825624936cb1d..b4673e8a08042668105091223b7723844037edd7 100644 (file)
@@ -11,11 +11,16 @@ package org.opendaylight.controller.cluster.datastore.messages;
 import org.opendaylight.controller.protobuff.messages.transaction.ShardTransactionChainMessages;
 
 public class CloseTransactionChainReply implements SerializableMessage {
-  public static final Class<ShardTransactionChainMessages.CloseTransactionChainReply> SERIALIZABLE_CLASS =
-          ShardTransactionChainMessages.CloseTransactionChainReply.class;
-  @Override
-  public Object toSerializable() {
-    return ShardTransactionChainMessages.CloseTransactionChainReply.newBuilder().build();
-  }
+    public static final Class<ShardTransactionChainMessages.CloseTransactionChainReply> SERIALIZABLE_CLASS =
+            ShardTransactionChainMessages.CloseTransactionChainReply.class;
 
+    private static final Object SERIALIZED_INSTANCE =
+            ShardTransactionChainMessages.CloseTransactionChainReply.newBuilder().build();
+
+    public static final CloseTransactionChainReply INSTANCE = new CloseTransactionChainReply();
+
+    @Override
+    public Object toSerializable() {
+        return SERIALIZED_INSTANCE;
+    }
 }
index 124eeb2235f04d64e30dd33c581166a1edb347fc..1c47a1827f11f86f26ee13c2d81237adcdd55f6b 100644 (file)
@@ -11,10 +11,16 @@ package org.opendaylight.controller.cluster.datastore.messages;
 import org.opendaylight.controller.protobuff.messages.transaction.ShardTransactionMessages;
 
 public class CloseTransactionReply implements SerializableMessage {
-  public static final Class<ShardTransactionMessages.CloseTransactionReply> SERIALIZABLE_CLASS =
-          ShardTransactionMessages.CloseTransactionReply.class;
-  @Override
-  public Object toSerializable() {
-    return ShardTransactionMessages.CloseTransactionReply.newBuilder().build();
-  }
+    public static final Class<ShardTransactionMessages.CloseTransactionReply> SERIALIZABLE_CLASS =
+            ShardTransactionMessages.CloseTransactionReply.class;
+
+    private static final Object SERIALIZED_INSTANCE =
+            ShardTransactionMessages.CloseTransactionReply.newBuilder().build();
+
+    public static final CloseTransactionReply INSTANCE = new CloseTransactionReply();
+
+    @Override
+    public Object toSerializable() {
+        return SERIALIZED_INSTANCE;
+    }
 }
index 3d4a168450bbf4ef07b23a1071e7a6678fba3c13..47adea5ea0305678de89c8b11c4c87a6ea91bedc 100644 (file)
@@ -14,8 +14,13 @@ public class CommitTransactionReply implements SerializableMessage {
     public static final Class<ThreePhaseCommitCohortMessages.CommitTransactionReply> SERIALIZABLE_CLASS =
             ThreePhaseCommitCohortMessages.CommitTransactionReply.class;
 
+    private static final Object SERIALIZED_INSTANCE =
+            ThreePhaseCommitCohortMessages.CommitTransactionReply.newBuilder().build();
+
+    public static final CommitTransactionReply INSTANCE = new CommitTransactionReply();
+
     @Override
     public Object toSerializable() {
-        return ThreePhaseCommitCohortMessages.CommitTransactionReply.newBuilder().build();
+        return SERIALIZED_INSTANCE;
     }
 }
index bf82e660369c8d4f89e5cf48b22f6555f89bb65e..ea3caef093320ab1b9c4aa7d27e74df2f144b102 100644 (file)
@@ -9,6 +9,7 @@
 package org.opendaylight.controller.cluster.datastore.messages;
 
 
+import org.opendaylight.controller.cluster.datastore.DataStoreVersions;
 import org.opendaylight.controller.protobuff.messages.transaction.ShardTransactionMessages;
 
 
@@ -16,24 +17,21 @@ public class CreateTransaction implements SerializableMessage {
     public static final Class<ShardTransactionMessages.CreateTransaction> SERIALIZABLE_CLASS =
             ShardTransactionMessages.CreateTransaction.class;
 
-    public static final int HELIUM_1_VERSION = 1;
-    public static final int CURRENT_VERSION = HELIUM_1_VERSION;
-
     private final String transactionId;
     private final int transactionType;
     private final String transactionChainId;
-    private final int version;
+    private final short version;
 
     public CreateTransaction(String transactionId, int transactionType) {
         this(transactionId, transactionType, "");
     }
 
     public CreateTransaction(String transactionId, int transactionType, String transactionChainId) {
-        this(transactionId, transactionType, transactionChainId, CURRENT_VERSION);
+        this(transactionId, transactionType, transactionChainId, DataStoreVersions.CURRENT_VERSION);
     }
 
     private CreateTransaction(String transactionId, int transactionType, String transactionChainId,
-            int version) {
+            short version) {
         this.transactionId = transactionId;
         this.transactionType = transactionType;
         this.transactionChainId = transactionChainId;
@@ -48,7 +46,7 @@ public class CreateTransaction implements SerializableMessage {
         return transactionType;
     }
 
-    public int getVersion() {
+    public short getVersion() {
         return version;
     }
 
@@ -66,7 +64,7 @@ public class CreateTransaction implements SerializableMessage {
             (ShardTransactionMessages.CreateTransaction) message;
         return new CreateTransaction(createTransaction.getTransactionId(),
             createTransaction.getTransactionType(), createTransaction.getTransactionChainId(),
-            createTransaction.getMessageVersion());
+            (short)createTransaction.getMessageVersion());
     }
 
     public String getTransactionChainId() {
index 83e68c9cb41882c2b7c377f96aaecb5f5a07f328..3fde6cc7fc2477eba6c6f466cd376d6ff132d1e1 100644 (file)
@@ -8,23 +8,22 @@
 
 package org.opendaylight.controller.cluster.datastore.messages;
 
+import org.opendaylight.controller.cluster.datastore.DataStoreVersions;
 import org.opendaylight.controller.protobuff.messages.transaction.ShardTransactionMessages;
 
 public class CreateTransactionReply implements SerializableMessage {
 
-    public static final Class<ShardTransactionMessages.CreateTransactionReply> SERIALIZABLE_CLASS =
-            ShardTransactionMessages.CreateTransactionReply.class;
+    public static final Class SERIALIZABLE_CLASS = ShardTransactionMessages.CreateTransactionReply.class;
     private final String transactionPath;
     private final String transactionId;
-    private final int version;
+    private final short version;
 
-    public CreateTransactionReply(final String transactionPath,
-        final String transactionId) {
-        this(transactionPath, transactionId, CreateTransaction.CURRENT_VERSION);
+    public CreateTransactionReply(String transactionPath, String transactionId) {
+        this(transactionPath, transactionId, DataStoreVersions.CURRENT_VERSION);
     }
 
     public CreateTransactionReply(final String transactionPath,
-                                  final String transactionId, final int version) {
+                                  final String transactionId, final short version) {
         this.transactionPath = transactionPath;
         this.transactionId = transactionId;
         this.version = version;
@@ -39,7 +38,7 @@ public class CreateTransactionReply implements SerializableMessage {
         return transactionId;
     }
 
-    public int getVersion() {
+    public short getVersion() {
         return version;
     }
 
@@ -52,9 +51,10 @@ public class CreateTransactionReply implements SerializableMessage {
             .build();
     }
 
-    public static CreateTransactionReply fromSerializable(final Object serializable){
+    public static CreateTransactionReply fromSerializable(Object serializable){
         ShardTransactionMessages.CreateTransactionReply o = (ShardTransactionMessages.CreateTransactionReply) serializable;
-        return new CreateTransactionReply(o.getTransactionActorPath(), o.getTransactionId(), o.getMessageVersion());
+        return new CreateTransactionReply(o.getTransactionActorPath(), o.getTransactionId(),
+                (short)o.getMessageVersion());
     }
 
 }
index 5b5f076d43713b48e6245a35a6d21684ecf909cb..fe81e27e3dcdec117efc637f8dd392c37e0c8c13 100644 (file)
 
 package org.opendaylight.controller.cluster.datastore.messages;
 
-import org.opendaylight.controller.cluster.datastore.node.NormalizedNodeToNodeCodec;
-import org.opendaylight.controller.cluster.datastore.util.InstanceIdentifierUtils;
+import java.io.Externalizable;
+import java.io.IOException;
+import java.io.ObjectInput;
+import java.io.ObjectOutput;
+import java.util.Map;
+import java.util.Set;
+import org.opendaylight.controller.cluster.datastore.DataStoreVersions;
+import org.opendaylight.controller.cluster.datastore.node.utils.stream.NormalizedNodeInputStreamReader;
+import org.opendaylight.controller.cluster.datastore.node.utils.stream.NormalizedNodeOutputStreamWriter;
+import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker.DataChangeScope;
 import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeEvent;
-import org.opendaylight.controller.protobuff.messages.common.NormalizedNodeMessages;
-import org.opendaylight.controller.protobuff.messages.datachange.notification.DataChangeListenerMessages;
+import org.opendaylight.controller.md.sal.dom.store.impl.DOMImmutableDataChangeEvent;
+import org.opendaylight.controller.md.sal.dom.store.impl.DOMImmutableDataChangeEvent.Builder;
 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
-import org.opendaylight.yangtools.yang.model.api.SchemaContext;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-
-public class DataChanged implements SerializableMessage {
-    public static final Class<DataChangeListenerMessages.DataChanged> SERIALIZABLE_CLASS =
-        DataChangeListenerMessages.DataChanged.class;
+import org.opendaylight.yangtools.yang.data.api.schema.stream.NormalizedNodeWriter;
 
-    final private SchemaContext schemaContext;
-    private final AsyncDataChangeEvent<YangInstanceIdentifier, NormalizedNode<?, ?>>
-        change;
+public class DataChanged implements Externalizable {
+    private static final long serialVersionUID = 1L;
 
+    private AsyncDataChangeEvent<YangInstanceIdentifier, NormalizedNode<?, ?>> change;
 
+    public DataChanged() {
+    }
 
-    public DataChanged(SchemaContext schemaContext,
-        AsyncDataChangeEvent<YangInstanceIdentifier, NormalizedNode<?, ?>> change) {
+    public DataChanged(AsyncDataChangeEvent<YangInstanceIdentifier, NormalizedNode<?, ?>> change) {
         this.change = change;
-        this.schemaContext = schemaContext;
     }
 
-
     public AsyncDataChangeEvent<YangInstanceIdentifier, NormalizedNode<?, ?>> getChange() {
         return change;
     }
 
+    @Override
+    public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
+        in.readShort(); // Read the version
 
-    private NormalizedNodeMessages.Node convertToNodeTree(
-        NormalizedNode<?, ?> normalizedNode) {
+        NormalizedNodeInputStreamReader streamReader = new NormalizedNodeInputStreamReader(in);
 
-        return new NormalizedNodeToNodeCodec(schemaContext)
-            .encode(normalizedNode)
-            .getNormalizedNode();
+        // Note: the scope passed to builder is not actually used.
+        Builder builder = DOMImmutableDataChangeEvent.builder(DataChangeScope.SUBTREE);
 
-    }
+        // Read created data
 
-    private Iterable<NormalizedNodeMessages.InstanceIdentifier> convertToRemovePaths(
-        Set<YangInstanceIdentifier> removedPaths) {
-        final Set<NormalizedNodeMessages.InstanceIdentifier> removedPathInstanceIds = new HashSet<>();
-        for (YangInstanceIdentifier id : removedPaths) {
-            removedPathInstanceIds.add(InstanceIdentifierUtils.toSerializable(id));
+        int size = in.readInt();
+        for(int i = 0; i < size; i++) {
+            YangInstanceIdentifier path = streamReader.readYangInstanceIdentifier();
+            NormalizedNode<?, ?> node = streamReader.readNormalizedNode();
+            builder.addCreated(path, node);
         }
-        return new Iterable<NormalizedNodeMessages.InstanceIdentifier>() {
-            @Override
-            public Iterator<NormalizedNodeMessages.InstanceIdentifier> iterator() {
-                return removedPathInstanceIds.iterator();
-            }
-        };
 
-    }
+        // Read updated data
 
-    private NormalizedNodeMessages.NodeMap convertToNodeMap(
-        Map<YangInstanceIdentifier, NormalizedNode<?, ?>> data) {
-        NormalizedNodeToNodeCodec normalizedNodeToNodeCodec =
-            new NormalizedNodeToNodeCodec(schemaContext);
-        NormalizedNodeMessages.NodeMap.Builder nodeMapBuilder =
-            NormalizedNodeMessages.NodeMap.newBuilder();
-        NormalizedNodeMessages.NodeMapEntry.Builder builder =
-            NormalizedNodeMessages.NodeMapEntry.newBuilder();
-        for (Map.Entry<YangInstanceIdentifier, NormalizedNode<?, ?>> entry : data
-            .entrySet()) {
-
-
-            NormalizedNodeMessages.InstanceIdentifier instanceIdentifier =
-                InstanceIdentifierUtils.toSerializable(entry.getKey());
-
-            builder.setInstanceIdentifierPath(instanceIdentifier)
-                .setNormalizedNode(normalizedNodeToNodeCodec
-                    .encode(entry.getValue())
-                    .getNormalizedNode());
-            nodeMapBuilder.addMapEntries(builder.build());
+        size = in.readInt();
+        for(int i = 0; i < size; i++) {
+            YangInstanceIdentifier path = streamReader.readYangInstanceIdentifier();
+            NormalizedNode<?, ?> before = streamReader.readNormalizedNode();
+            NormalizedNode<?, ?> after = streamReader.readNormalizedNode();
+            builder.addUpdated(path, before, after);
         }
-        return nodeMapBuilder.build();
-    }
-
 
-    @Override
-    public Object toSerializable() {
-        return DataChangeListenerMessages.DataChanged.newBuilder()
-            .addAllRemovedPaths(convertToRemovePaths(change.getRemovedPaths()))
-            .setCreatedData(convertToNodeMap(change.getCreatedData()))
-            .setOriginalData(convertToNodeMap(change.getOriginalData()))
-            .setUpdatedData(convertToNodeMap(change.getUpdatedData()))
-            .setOriginalSubTree(convertToNodeTree(change.getOriginalSubtree()))
-            .setUpdatedSubTree(convertToNodeTree(change.getUpdatedSubtree()))
-            .build();
-    }
+        // Read removed data
 
-    public static DataChanged fromSerialize(SchemaContext sc, Object message,
-        YangInstanceIdentifier pathId) {
-        DataChangeListenerMessages.DataChanged dataChanged =
-            (DataChangeListenerMessages.DataChanged) message;
-        DataChangedEvent event = new DataChangedEvent(sc);
-        if (dataChanged.getCreatedData() != null && dataChanged.getCreatedData()
-            .isInitialized()) {
-            event.setCreatedData(dataChanged.getCreatedData());
-        }
-        if (dataChanged.getOriginalData() != null && dataChanged
-            .getOriginalData().isInitialized()) {
-            event.setOriginalData(dataChanged.getOriginalData());
+        size = in.readInt();
+        for(int i = 0; i < size; i++) {
+            YangInstanceIdentifier path = streamReader.readYangInstanceIdentifier();
+            NormalizedNode<?, ?> node = streamReader.readNormalizedNode();
+            builder.addRemoved(path, node);
         }
 
-        if (dataChanged.getUpdatedData() != null && dataChanged.getUpdatedData()
-            .isInitialized()) {
-            event.setUpdateData(dataChanged.getUpdatedData());
-        }
+        // Read original subtree
 
-        if (dataChanged.getOriginalSubTree() != null && dataChanged
-            .getOriginalSubTree().isInitialized()) {
-            event.setOriginalSubtree(dataChanged.getOriginalSubTree(), pathId);
+        boolean present = in.readBoolean();
+        if(present) {
+            builder.setBefore(streamReader.readNormalizedNode());
         }
 
-        if (dataChanged.getUpdatedSubTree() != null && dataChanged
-            .getUpdatedSubTree().isInitialized()) {
-            event.setUpdatedSubtree(dataChanged.getOriginalSubTree(), pathId);
-        }
+        // Read updated subtree
 
-        if (dataChanged.getRemovedPathsList() != null && !dataChanged
-            .getRemovedPathsList().isEmpty()) {
-            event.setRemovedPaths(dataChanged.getRemovedPathsList());
+        present = in.readBoolean();
+        if(present) {
+            builder.setAfter(streamReader.readNormalizedNode());
         }
 
-        return new DataChanged(sc, event);
-
+        change = builder.build();
     }
 
-    static class DataChangedEvent implements
-        AsyncDataChangeEvent<YangInstanceIdentifier, NormalizedNode<?, ?>> {
-        private Map<YangInstanceIdentifier, NormalizedNode<?, ?>> createdData;
-        private final NormalizedNodeToNodeCodec nodeCodec;
-        private Map<YangInstanceIdentifier, NormalizedNode<?, ?>> updatedData;
-        private Map<YangInstanceIdentifier, NormalizedNode<?, ?>> originalData;
-        private NormalizedNode<?, ?> originalSubTree;
-        private NormalizedNode<?, ?> updatedSubTree;
-        private Set<YangInstanceIdentifier> removedPathIds;
-
-        DataChangedEvent(SchemaContext schemaContext) {
-            nodeCodec = new NormalizedNodeToNodeCodec(schemaContext);
-        }
-
-        @Override
-        public Map<YangInstanceIdentifier, NormalizedNode<?, ?>> getCreatedData() {
-            if(createdData == null){
-                return Collections.emptyMap();
-            }
-            return createdData;
-        }
-
-        DataChangedEvent setCreatedData(
-            NormalizedNodeMessages.NodeMap nodeMap) {
-            this.createdData = convertNodeMapToMap(nodeMap);
-            return this;
-        }
-
-        private Map<YangInstanceIdentifier, NormalizedNode<?, ?>> convertNodeMapToMap(
-            NormalizedNodeMessages.NodeMap nodeMap) {
-            Map<YangInstanceIdentifier, NormalizedNode<?, ?>> mapEntries =
-                new HashMap<YangInstanceIdentifier, NormalizedNode<?, ?>>();
-            for (NormalizedNodeMessages.NodeMapEntry nodeMapEntry : nodeMap
-                .getMapEntriesList()) {
-                YangInstanceIdentifier id = InstanceIdentifierUtils
-                    .fromSerializable(nodeMapEntry.getInstanceIdentifierPath());
-                mapEntries.put(id,
-                    nodeCodec.decode(nodeMapEntry.getNormalizedNode()));
-            }
-            return mapEntries;
-        }
+    @Override
+    public void writeExternal(ObjectOutput out) throws IOException {
+        out.writeShort(DataStoreVersions.CURRENT_VERSION);
 
+        NormalizedNodeOutputStreamWriter streamWriter = new NormalizedNodeOutputStreamWriter(out);
+        NormalizedNodeWriter nodeWriter = NormalizedNodeWriter.forStreamWriter(streamWriter);
 
-        @Override
-        public Map<YangInstanceIdentifier, NormalizedNode<?, ?>> getUpdatedData() {
-            if(updatedData == null){
-                return Collections.emptyMap();
-            }
-            return updatedData;
-        }
+        // Write created data
 
-        DataChangedEvent setUpdateData(NormalizedNodeMessages.NodeMap nodeMap) {
-            this.updatedData = convertNodeMapToMap(nodeMap);
-            return this;
+        Map<YangInstanceIdentifier, NormalizedNode<?, ?>> createdData = change.getCreatedData();
+        out.writeInt(createdData.size());
+        for(Map.Entry<YangInstanceIdentifier, NormalizedNode<?, ?>> e: createdData.entrySet()) {
+            streamWriter.writeYangInstanceIdentifier(e.getKey());
+            nodeWriter.write(e.getValue());
         }
 
-        @Override
-        public Set<YangInstanceIdentifier> getRemovedPaths() {
-            if (removedPathIds == null) {
-                return Collections.emptySet();
-            }
-            return removedPathIds;
-        }
+        // Write updated data
 
-        public DataChangedEvent setRemovedPaths(List<NormalizedNodeMessages.InstanceIdentifier> removedPaths) {
-            Set<YangInstanceIdentifier> removedIds = new HashSet<>();
-            for (NormalizedNodeMessages.InstanceIdentifier path : removedPaths) {
-                removedIds.add(InstanceIdentifierUtils.fromSerializable(path));
-            }
-            this.removedPathIds = removedIds;
-            return this;
+        Map<YangInstanceIdentifier, NormalizedNode<?, ?>> originalData = change.getOriginalData();
+        Map<YangInstanceIdentifier, NormalizedNode<?, ?>> updatedData = change.getUpdatedData();
+        out.writeInt(updatedData.size());
+        for(Map.Entry<YangInstanceIdentifier, NormalizedNode<?, ?>> e: updatedData.entrySet()) {
+            streamWriter.writeYangInstanceIdentifier(e.getKey());
+            nodeWriter.write(originalData.get(e.getKey()));
+            nodeWriter.write(e.getValue());
         }
 
-        @Override
-        public Map<YangInstanceIdentifier, NormalizedNode<?, ?>> getOriginalData() {
-            if (originalData == null) {
-                Collections.emptyMap();
-            }
-            return originalData;
-        }
+        // Write removed data
 
-        DataChangedEvent setOriginalData(
-            NormalizedNodeMessages.NodeMap nodeMap) {
-            this.originalData = convertNodeMapToMap(nodeMap);
-            return this;
+        Set<YangInstanceIdentifier> removed = change.getRemovedPaths();
+        out.writeInt(removed.size());
+        for(YangInstanceIdentifier path: removed) {
+            streamWriter.writeYangInstanceIdentifier(path);
+            nodeWriter.write(originalData.get(path));
         }
 
-        @Override
-        public NormalizedNode<?, ?> getOriginalSubtree() {
-            return originalSubTree;
-        }
+        // Write original subtree
 
-        DataChangedEvent setOriginalSubtree(NormalizedNodeMessages.Node node,
-            YangInstanceIdentifier instanceIdentifierPath) {
-            originalSubTree = nodeCodec.decode(node);
-            return this;
+        NormalizedNode<?, ?> originalSubtree = change.getOriginalSubtree();
+        out.writeBoolean(originalSubtree != null);
+        if(originalSubtree != null) {
+            nodeWriter.write(originalSubtree);
         }
 
-        @Override
-        public NormalizedNode<?, ?> getUpdatedSubtree() {
-            return updatedSubTree;
-        }
+        // Write original subtree
 
-        DataChangedEvent setUpdatedSubtree(NormalizedNodeMessages.Node node,
-            YangInstanceIdentifier instanceIdentifierPath) {
-            updatedSubTree = nodeCodec.decode(node);
-            return this;
+        NormalizedNode<?, ?> updatedSubtree = change.getUpdatedSubtree();
+        out.writeBoolean(updatedSubtree != null);
+        if(updatedSubtree != null) {
+            nodeWriter.write(updatedSubtree);
         }
-
-
     }
-
-
-
 }
index e10a40729220941761a925350b5ac25288b0e90c..2db03446d9145abb664ec1e1b379ba0d918205c1 100644 (file)
@@ -11,10 +11,16 @@ package org.opendaylight.controller.cluster.datastore.messages;
 import org.opendaylight.controller.protobuff.messages.datachange.notification.DataChangeListenerMessages;
 
 public class DataChangedReply implements SerializableMessage {
-  public static final Class<DataChangeListenerMessages.DataChangedReply> SERIALIZABLE_CLASS =
-          DataChangeListenerMessages.DataChangedReply.class;
-  @Override
-  public Object toSerializable() {
-    return DataChangeListenerMessages.DataChangedReply.newBuilder().build();
-  }
+    public static final Class<DataChangeListenerMessages.DataChangedReply> SERIALIZABLE_CLASS =
+            DataChangeListenerMessages.DataChangedReply.class;
+
+    private static final Object SERIALIZED_INSTANCE =
+            DataChangeListenerMessages.DataChangedReply.newBuilder().build();
+
+    public static final DataChangedReply INSTANCE = new DataChangedReply();
+
+    @Override
+    public Object toSerializable() {
+        return SERIALIZED_INSTANCE;
+    }
 }
index 2e02664e1dcd41bc032fb2be2a2913baf0e000f5..eba9c39170b4eef45509eb0de9f488172c5f8bfb 100644 (file)
@@ -11,10 +11,15 @@ package org.opendaylight.controller.cluster.datastore.messages;
 import org.opendaylight.controller.protobuff.messages.transaction.ShardTransactionMessages;
 
 public class DeleteDataReply implements SerializableMessage{
-  public static final Class<ShardTransactionMessages.DeleteDataReply> SERIALIZABLE_CLASS =
-          ShardTransactionMessages.DeleteDataReply.class;
-  @Override
-  public Object toSerializable() {
-    return ShardTransactionMessages.DeleteDataReply.newBuilder().build();
-  }
+    public static final Class<ShardTransactionMessages.DeleteDataReply> SERIALIZABLE_CLASS =
+            ShardTransactionMessages.DeleteDataReply.class;
+
+    private static final Object SERIALIZED_INSTANCE = ShardTransactionMessages.DeleteDataReply.newBuilder().build();
+
+    public static final DeleteDataReply INSTANCE = new DeleteDataReply();
+
+    @Override
+    public Object toSerializable() {
+        return SERIALIZED_INSTANCE;
+    }
 }
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/EmptyExternalizable.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/EmptyExternalizable.java
new file mode 100644 (file)
index 0000000..0b7b262
--- /dev/null
@@ -0,0 +1,29 @@
+/*
+ * Copyright (c) 2015 Brocade Communications Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.datastore.messages;
+
+import java.io.Externalizable;
+import java.io.IOException;
+import java.io.ObjectInput;
+import java.io.ObjectOutput;
+
+/**
+ * Externalizable with no data.
+ *
+ * @author Thomas Pantelis
+ */
+public class EmptyExternalizable implements Externalizable {
+
+    @Override
+    public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
+    }
+
+    @Override
+    public void writeExternal(ObjectOutput out) throws IOException {
+    }
+}
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/EmptyReply.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/EmptyReply.java
new file mode 100644 (file)
index 0000000..284c6ef
--- /dev/null
@@ -0,0 +1,30 @@
+/*
+ * Copyright (c) 2015 Brocade Communications Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.datastore.messages;
+
+import org.opendaylight.controller.cluster.datastore.DataStoreVersions;
+
+/**
+ * A reply with no data.
+ *
+ * @author Thomas Pantelis
+ */
+public abstract class EmptyReply extends EmptyExternalizable implements VersionedSerializableMessage {
+
+    private final Object legacySerializedInstance;
+
+    protected EmptyReply(Object legacySerializedInstance) {
+        super();
+        this.legacySerializedInstance = legacySerializedInstance;
+    }
+
+    @Override
+    public Object toSerializable(short toVersion) {
+        return toVersion >= DataStoreVersions.LITHIUM_VERSION ? this : legacySerializedInstance;
+    }
+}
index eb1f3495bdf2c04328ad5336eb44b248149bd242..9234385b3536bfbe8ccaf528075efcdedc1be792 100644 (file)
@@ -8,36 +8,54 @@
 
 package org.opendaylight.controller.cluster.datastore.messages;
 
+import org.opendaylight.controller.cluster.datastore.DataStoreVersions;
 import org.opendaylight.controller.cluster.datastore.node.NormalizedNodeToNodeCodec;
 import org.opendaylight.controller.cluster.datastore.node.NormalizedNodeToNodeCodec.Decoded;
 import org.opendaylight.controller.cluster.datastore.node.NormalizedNodeToNodeCodec.Encoded;
 import org.opendaylight.controller.protobuff.messages.transaction.ShardTransactionMessages;
 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
-import org.opendaylight.yangtools.yang.model.api.SchemaContext;
 
-public class MergeData extends ModifyData{
+public class MergeData extends ModifyData implements VersionedSerializableMessage {
+    private static final long serialVersionUID = 1L;
 
-    public static final Class<ShardTransactionMessages.MergeData> SERIALIZABLE_CLASS =
-            ShardTransactionMessages.MergeData.class;
+    public static final Class<MergeData> SERIALIZABLE_CLASS = MergeData.class;
 
-    public MergeData(YangInstanceIdentifier path, NormalizedNode<?, ?> data,
-        SchemaContext context) {
-        super(path, data, context);
+    public MergeData() {
+    }
+
+    public MergeData(YangInstanceIdentifier path, NormalizedNode<?, ?> data) {
+        super(path, data);
     }
 
     @Override
-    public Object toSerializable() {
-        Encoded encoded = new NormalizedNodeToNodeCodec(schemaContext).encode(path, data);
-        return ShardTransactionMessages.MergeData.newBuilder()
-            .setInstanceIdentifierPathArguments(encoded.getEncodedPath())
-            .setNormalizedNode(encoded.getEncodedNode().getNormalizedNode()).build();
+    public Object toSerializable(short toVersion) {
+        if(toVersion >= DataStoreVersions.LITHIUM_VERSION) {
+            setVersion(toVersion);
+            return this;
+        } else {
+            // To base or R1 Helium version
+            Encoded encoded = new NormalizedNodeToNodeCodec(null).encode(getPath(), getData());
+            return ShardTransactionMessages.MergeData.newBuilder()
+                    .setInstanceIdentifierPathArguments(encoded.getEncodedPath())
+                    .setNormalizedNode(encoded.getEncodedNode().getNormalizedNode()).build();
+        }
+    }
+
+    public static MergeData fromSerializable(Object serializable){
+        if(serializable instanceof MergeData) {
+            return (MergeData) serializable;
+        } else {
+            // From base or R1 Helium version
+            ShardTransactionMessages.MergeData o = (ShardTransactionMessages.MergeData) serializable;
+            Decoded decoded = new NormalizedNodeToNodeCodec(null).decode(
+                    o.getInstanceIdentifierPathArguments(), o.getNormalizedNode());
+            return new MergeData(decoded.getDecodedPath(), decoded.getDecodedNode());
+        }
     }
 
-    public static MergeData fromSerializable(Object serializable, SchemaContext schemaContext){
-        ShardTransactionMessages.MergeData o = (ShardTransactionMessages.MergeData) serializable;
-        Decoded decoded = new NormalizedNodeToNodeCodec(schemaContext).decode(
-                o.getInstanceIdentifierPathArguments(), o.getNormalizedNode());
-        return new MergeData(decoded.getDecodedPath(), decoded.getDecodedNode(), schemaContext);
+    public static boolean isSerializedType(Object message) {
+        return SERIALIZABLE_CLASS.isAssignableFrom(message.getClass()) ||
+               message instanceof ShardTransactionMessages.MergeData;
     }
 }
index 92d6d728477aac50dfb68975db69c299249537dc..a4c514bdbf0751b7399ba5928c367089a8210e82 100644 (file)
@@ -10,12 +10,15 @@ package org.opendaylight.controller.cluster.datastore.messages;
 
 import org.opendaylight.controller.protobuff.messages.transaction.ShardTransactionMessages;
 
-public class MergeDataReply implements SerializableMessage{
-  public static final Class<ShardTransactionMessages.MergeDataReply> SERIALIZABLE_CLASS =
-          ShardTransactionMessages.MergeDataReply.class;
+public class MergeDataReply extends EmptyReply {
+    private static final long serialVersionUID = 1L;
 
-  @Override
-  public Object toSerializable() {
-    return ShardTransactionMessages.MergeDataReply.newBuilder().build();
-  }
+    private static final Object LEGACY_SERIALIZED_INSTANCE =
+            ShardTransactionMessages.MergeDataReply.newBuilder().build();
+
+    public static final MergeDataReply INSTANCE = new MergeDataReply();
+
+    public MergeDataReply() {
+        super(LEGACY_SERIALIZED_INSTANCE);
+    }
 }
index b5c39d1c3fa630a674efb1302d3441f8dd8403ed..69c41c2a5663f5021a7a22401aef48ca9b3986d2 100644 (file)
@@ -8,25 +8,28 @@
 
 package org.opendaylight.controller.cluster.datastore.messages;
 
-import com.google.common.base.Preconditions;
+import java.io.Externalizable;
+import java.io.IOException;
+import java.io.ObjectInput;
+import java.io.ObjectOutput;
+import org.opendaylight.controller.cluster.datastore.utils.SerializationUtils;
+import org.opendaylight.controller.cluster.datastore.utils.SerializationUtils.Applier;
 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
-import org.opendaylight.yangtools.yang.model.api.SchemaContext;
 
-public abstract class ModifyData implements SerializableMessage {
-    protected final YangInstanceIdentifier path;
-    protected final NormalizedNode<?, ?> data;
-    protected final SchemaContext schemaContext;
+public abstract class ModifyData implements Externalizable {
+    private static final long serialVersionUID = 1L;
 
-    public ModifyData(YangInstanceIdentifier path, NormalizedNode<?, ?> data,
-        SchemaContext context) {
-        Preconditions.checkNotNull(context,
-            "Cannot serialize an object which does not have a schema schemaContext");
+    private YangInstanceIdentifier path;
+    private NormalizedNode<?, ?> data;
+    private short version;
 
+    protected ModifyData() {
+    }
 
+    protected ModifyData(YangInstanceIdentifier path, NormalizedNode<?, ?> data) {
         this.path = path;
         this.data = data;
-        this.schemaContext = context;
     }
 
     public YangInstanceIdentifier getPath() {
@@ -37,4 +40,31 @@ public abstract class ModifyData implements SerializableMessage {
         return data;
     }
 
+    public short getVersion() {
+        return version;
+    }
+
+    protected void setVersion(short version) {
+        this.version = version;
+    }
+
+    @Override
+    public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
+        version = in.readShort();
+        SerializationUtils.deserializePathAndNode(in, this, APPLIER);
+    }
+
+    @Override
+    public void writeExternal(ObjectOutput out) throws IOException {
+        out.writeShort(version);
+        SerializationUtils.serializePathAndNode(path, data, out);
+    }
+
+    private static final Applier<ModifyData> APPLIER = new Applier<ModifyData>() {
+        @Override
+        public void apply(ModifyData instance, YangInstanceIdentifier path, NormalizedNode<?, ?> data) {
+            instance.path = path;
+            instance.data = data;
+        }
+    };
 }
index 43dd81252c3a52f35da645e6993d716e2a0d7715..8ac6e1b1494a68fd3e7f2f04e0081a4d7f538a10 100644 (file)
@@ -9,23 +9,29 @@
 package org.opendaylight.controller.cluster.datastore.messages;
 
 import com.google.protobuf.ByteString;
+import java.io.Externalizable;
+import java.io.IOException;
+import java.io.ObjectInput;
+import java.io.ObjectOutput;
+import org.opendaylight.controller.cluster.datastore.DataStoreVersions;
 import org.opendaylight.controller.cluster.datastore.node.NormalizedNodeToNodeCodec;
+import org.opendaylight.controller.cluster.datastore.utils.SerializationUtils;
 import org.opendaylight.controller.protobuff.messages.transaction.ShardTransactionMessages;
-import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
-import org.opendaylight.yangtools.yang.model.api.SchemaContext;
 
-public class ReadDataReply implements SerializableMessage {
-    public static final Class<ShardTransactionMessages.ReadDataReply> SERIALIZABLE_CLASS =
-            ShardTransactionMessages.ReadDataReply.class;
+public class ReadDataReply implements VersionedSerializableMessage, Externalizable {
+    private static final long serialVersionUID = 1L;
 
-    private final NormalizedNode<?, ?> normalizedNode;
-    private final SchemaContext schemaContext;
+    public static final Class<ReadDataReply> SERIALIZABLE_CLASS = ReadDataReply.class;
 
-    public ReadDataReply(SchemaContext context,NormalizedNode<?, ?> normalizedNode){
+    private NormalizedNode<?, ?> normalizedNode;
+    private short version;
 
+    public ReadDataReply() {
+    }
+
+    public ReadDataReply(NormalizedNode<?, ?> normalizedNode) {
         this.normalizedNode = normalizedNode;
-        this.schemaContext = context;
     }
 
     public NormalizedNode<?, ?> getNormalizedNode() {
@@ -33,26 +39,62 @@ public class ReadDataReply implements SerializableMessage {
     }
 
     @Override
-    public Object toSerializable(){
+    public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
+        version = in.readShort();
+        normalizedNode = SerializationUtils.deserializeNormalizedNode(in);
+    }
+
+    @Override
+    public void writeExternal(ObjectOutput out) throws IOException {
+        out.writeShort(version);
+        SerializationUtils.serializeNormalizedNode(normalizedNode, out);
+    }
+
+    @Override
+    public Object toSerializable(short toVersion) {
+        if(toVersion >= DataStoreVersions.LITHIUM_VERSION) {
+            version = toVersion;
+            return this;
+        } else {
+            return toSerializableReadDataReply(normalizedNode);
+        }
+    }
+
+    private static ShardTransactionMessages.ReadDataReply toSerializableReadDataReply(
+            NormalizedNode<?, ?> normalizedNode) {
         if(normalizedNode != null) {
             return ShardTransactionMessages.ReadDataReply.newBuilder()
-                    .setNormalizedNode(new NormalizedNodeToNodeCodec(schemaContext)
-                        .encode(normalizedNode).getNormalizedNode()).build();
+                    .setNormalizedNode(new NormalizedNodeToNodeCodec(null)
+                    .encode(normalizedNode).getNormalizedNode()).build();
         } else {
             return ShardTransactionMessages.ReadDataReply.newBuilder().build();
 
         }
     }
 
-    public static ReadDataReply fromSerializable(SchemaContext schemaContext,
-            YangInstanceIdentifier id, Object serializable) {
-        ShardTransactionMessages.ReadDataReply o = (ShardTransactionMessages.ReadDataReply) serializable;
-        return new ReadDataReply(schemaContext, new NormalizedNodeToNodeCodec(schemaContext).decode(
-                o.getNormalizedNode()));
+    public static ReadDataReply fromSerializable(Object serializable) {
+        if(serializable instanceof ReadDataReply) {
+            return (ReadDataReply) serializable;
+        } else {
+            ShardTransactionMessages.ReadDataReply o =
+                    (ShardTransactionMessages.ReadDataReply) serializable;
+            return new ReadDataReply(new NormalizedNodeToNodeCodec(null).decode(o.getNormalizedNode()));
+        }
+    }
+
+    public static ByteString fromSerializableAsByteString(Object serializable) {
+        if(serializable instanceof ReadDataReply) {
+            ReadDataReply r = (ReadDataReply)serializable;
+            return toSerializableReadDataReply(r.getNormalizedNode()).toByteString();
+        } else {
+            ShardTransactionMessages.ReadDataReply o =
+                    (ShardTransactionMessages.ReadDataReply) serializable;
+            return o.getNormalizedNode().toByteString();
+        }
     }
 
-    public static ByteString getNormalizedNodeByteString(Object serializable){
-        ShardTransactionMessages.ReadDataReply o = (ShardTransactionMessages.ReadDataReply) serializable;
-        return ((ShardTransactionMessages.ReadDataReply) serializable).getNormalizedNode().toByteString();
+    public static boolean isSerializedType(Object message) {
+        return SERIALIZABLE_CLASS.isAssignableFrom(message.getClass()) ||
+               message instanceof ShardTransactionMessages.ReadDataReply;
     }
 }
index 581caefd04e9fb83263a06cba9b6014f3b92109d..09617abde9b370351a45207d8ea42f60c4c6d5fc 100644 (file)
@@ -11,12 +11,15 @@ package org.opendaylight.controller.cluster.datastore.messages;
 import org.opendaylight.controller.protobuff.messages.transaction.ShardTransactionMessages;
 
 public class ReadyTransaction implements SerializableMessage{
-  public static final Class<ShardTransactionMessages.ReadyTransaction> SERIALIZABLE_CLASS =
-          ShardTransactionMessages.ReadyTransaction.class;
+    public static final Class<ShardTransactionMessages.ReadyTransaction> SERIALIZABLE_CLASS =
+            ShardTransactionMessages.ReadyTransaction.class;
 
-  @Override
-  public Object toSerializable() {
-    return ShardTransactionMessages.ReadyTransaction.newBuilder().build();
-  }
+    private static final Object SERIALIZED_INSTANCE = ShardTransactionMessages.ReadyTransaction.newBuilder().build();
 
+    public static final ReadyTransaction INSTANCE = new ReadyTransaction();
+
+    @Override
+    public Object toSerializable() {
+        return SERIALIZED_INSTANCE;
+    }
 }
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/VersionedSerializableMessage.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/VersionedSerializableMessage.java
new file mode 100644 (file)
index 0000000..5c30b10
--- /dev/null
@@ -0,0 +1,17 @@
+/*
+ * Copyright (c) 2014 Brocade Communications Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.datastore.messages;
+
+/**
+ * Interface for a Serializable message with versioning.
+ *
+ * @author Thomas Pantelis
+ */
+public interface VersionedSerializableMessage {
+    Object toSerializable(short toVersion);
+}
index 8aa63ef262a1366b63de5844979ed0caafb27208..c5e3a6b05966c9c30f12c5f6f7dab93a78fbebf7 100644 (file)
@@ -8,35 +8,54 @@
 
 package org.opendaylight.controller.cluster.datastore.messages;
 
+import org.opendaylight.controller.cluster.datastore.DataStoreVersions;
 import org.opendaylight.controller.cluster.datastore.node.NormalizedNodeToNodeCodec;
 import org.opendaylight.controller.cluster.datastore.node.NormalizedNodeToNodeCodec.Decoded;
 import org.opendaylight.controller.cluster.datastore.node.NormalizedNodeToNodeCodec.Encoded;
 import org.opendaylight.controller.protobuff.messages.transaction.ShardTransactionMessages;
 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
-import org.opendaylight.yangtools.yang.model.api.SchemaContext;
 
-public class WriteData extends ModifyData {
+public class WriteData extends ModifyData implements VersionedSerializableMessage {
+    private static final long serialVersionUID = 1L;
 
-    public static final Class<ShardTransactionMessages.WriteData> SERIALIZABLE_CLASS =
-            ShardTransactionMessages.WriteData.class;
+    public static final Class<WriteData> SERIALIZABLE_CLASS = WriteData.class;
 
-    public WriteData(YangInstanceIdentifier path, NormalizedNode<?, ?> data, SchemaContext schemaContext) {
-        super(path, data, schemaContext);
+    public WriteData() {
+    }
+
+    public WriteData(YangInstanceIdentifier path, NormalizedNode<?, ?> data) {
+        super(path, data);
     }
 
     @Override
-    public Object toSerializable() {
-        Encoded encoded = new NormalizedNodeToNodeCodec(schemaContext).encode(path, data);
-        return ShardTransactionMessages.WriteData.newBuilder()
-                .setInstanceIdentifierPathArguments(encoded.getEncodedPath())
-                .setNormalizedNode(encoded.getEncodedNode().getNormalizedNode()).build();
+    public Object toSerializable(short toVersion) {
+        if(toVersion >= DataStoreVersions.LITHIUM_VERSION) {
+            setVersion(toVersion);
+            return this;
+        } else {
+            // To base or R1 Helium version
+            Encoded encoded = new NormalizedNodeToNodeCodec(null).encode(getPath(), getData());
+            return ShardTransactionMessages.WriteData.newBuilder()
+                    .setInstanceIdentifierPathArguments(encoded.getEncodedPath())
+                    .setNormalizedNode(encoded.getEncodedNode().getNormalizedNode()).build();
+        }
+    }
+
+    public static WriteData fromSerializable(Object serializable) {
+        if(serializable instanceof WriteData) {
+            return (WriteData) serializable;
+        } else {
+            // From base or R1 Helium version
+            ShardTransactionMessages.WriteData o = (ShardTransactionMessages.WriteData) serializable;
+            Decoded decoded = new NormalizedNodeToNodeCodec(null).decode(
+                    o.getInstanceIdentifierPathArguments(), o.getNormalizedNode());
+            return new WriteData(decoded.getDecodedPath(), decoded.getDecodedNode());
+        }
     }
 
-    public static WriteData fromSerializable(Object serializable, SchemaContext schemaContext){
-        ShardTransactionMessages.WriteData o = (ShardTransactionMessages.WriteData) serializable;
-        Decoded decoded = new NormalizedNodeToNodeCodec(schemaContext).decode(
-                o.getInstanceIdentifierPathArguments(), o.getNormalizedNode());
-        return new WriteData(decoded.getDecodedPath(), decoded.getDecodedNode(), schemaContext);
+    public static boolean isSerializedType(Object message) {
+        return SERIALIZABLE_CLASS.isAssignableFrom(message.getClass()) ||
+               message instanceof ShardTransactionMessages.WriteData;
     }
 }
index 876105de18abf7c4beee7b1c7afca342fc4086b2..8255828819cd494a93fc61dacd32bc48ba55edba 100644 (file)
@@ -10,11 +10,15 @@ package org.opendaylight.controller.cluster.datastore.messages;
 
 import org.opendaylight.controller.protobuff.messages.transaction.ShardTransactionMessages;
 
-public class WriteDataReply implements SerializableMessage{
-  public static final Class<ShardTransactionMessages.WriteDataReply> SERIALIZABLE_CLASS =
-          ShardTransactionMessages.WriteDataReply.class;
-  @Override
-  public Object toSerializable() {
-    return ShardTransactionMessages.WriteDataReply.newBuilder().build();
-  }
+public class WriteDataReply extends EmptyReply {
+    private static final long serialVersionUID = 1L;
+
+    private static final Object LEGACY_SERIALIZED_INSTANCE =
+            ShardTransactionMessages.WriteDataReply.newBuilder().build();
+
+    public static final WriteDataReply INSTANCE = new WriteDataReply();
+
+    public WriteDataReply() {
+        super(LEGACY_SERIALIZED_INSTANCE);
+    }
 }
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/utils/SerializationUtils.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/utils/SerializationUtils.java
new file mode 100644 (file)
index 0000000..8404a6e
--- /dev/null
@@ -0,0 +1,81 @@
+/*
+ * Copyright (c) 2014 Brocade Communications Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.datastore.utils;
+
+import com.google.common.base.Preconditions;
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import org.opendaylight.controller.cluster.datastore.node.utils.stream.NormalizedNodeInputStreamReader;
+import org.opendaylight.controller.cluster.datastore.node.utils.stream.NormalizedNodeOutputStreamWriter;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
+import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
+import org.opendaylight.yangtools.yang.data.api.schema.stream.NormalizedNodeWriter;
+
+/**
+ * Provides various utility methods for serialization and de-serialization.
+ *
+ * @author Thomas Pantelis
+ */
+public final class SerializationUtils {
+    public static interface Applier<T> {
+        void apply(T instance, YangInstanceIdentifier path, NormalizedNode<?, ?> node);
+    }
+
+    public static void serializePathAndNode(YangInstanceIdentifier path, NormalizedNode<?, ?> node,
+            DataOutput out) {
+        Preconditions.checkNotNull(path);
+        Preconditions.checkNotNull(node);
+        try {
+            NormalizedNodeOutputStreamWriter streamWriter = new NormalizedNodeOutputStreamWriter(out);
+            NormalizedNodeWriter.forStreamWriter(streamWriter).write(node);
+            streamWriter.writeYangInstanceIdentifier(path);
+        } catch (IOException e) {
+            throw new IllegalArgumentException(String.format("Error serializing path {} and Node {}",
+                    path, node), e);
+        }
+    }
+
+    public static <T> void deserializePathAndNode(DataInput in, T instance, Applier<T> applier) {
+        try {
+            NormalizedNodeInputStreamReader streamReader = new NormalizedNodeInputStreamReader(in);
+            NormalizedNode<?, ?> node = streamReader.readNormalizedNode();
+            YangInstanceIdentifier path = streamReader.readYangInstanceIdentifier();
+            applier.apply(instance, path, node);
+        } catch (IOException e) {
+            throw new IllegalArgumentException("Error deserializing path and Node", e);
+        }
+    }
+
+    public static void serializeNormalizedNode(NormalizedNode<?, ?> node, DataOutput out) {
+        try {
+            out.writeBoolean(node != null);
+            if(node != null) {
+                NormalizedNodeOutputStreamWriter streamWriter = new NormalizedNodeOutputStreamWriter(out);
+                NormalizedNodeWriter.forStreamWriter(streamWriter).write(node);
+            }
+        } catch (IOException e) {
+            throw new IllegalArgumentException(String.format("Error serializing NormalizedNode {}",
+                    node), e);
+        }
+    }
+
+    public static NormalizedNode<?, ?> deserializeNormalizedNode(DataInput in) {
+            try {
+                boolean present = in.readBoolean();
+                if(present) {
+                    NormalizedNodeInputStreamReader streamReader = new NormalizedNodeInputStreamReader(in);
+                    return streamReader.readNormalizedNode();
+                }
+            } catch (IOException e) {
+                throw new IllegalArgumentException("Error deserializing NormalizedNode", e);
+            }
+
+        return null;
+    }
+}
index 55250dd5e9cd3ea71c0356f5313c09d930700c00..5485c57fd6dabd6c8cc0e32dcf757dddfabaa8ec 100644 (file)
@@ -17,11 +17,9 @@ import org.opendaylight.controller.cluster.datastore.utils.MessageCollectorActor
 import org.opendaylight.controller.cluster.datastore.utils.MockClusterWrapper;
 import org.opendaylight.controller.cluster.datastore.utils.MockConfiguration;
 import org.opendaylight.controller.md.cluster.datastore.model.CompositeModel;
-import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
 import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeEvent;
 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
-
 public class DataChangeListenerProxyTest extends AbstractActorTest {
 
   private static class MockDataChangedEvent implements AsyncDataChangeEvent<YangInstanceIdentifier, NormalizedNode<?, ?>> {
@@ -73,7 +71,7 @@ public class DataChangeListenerProxyTest extends AbstractActorTest {
         final ActorRef actorRef = getSystem().actorOf(props);
 
         DataChangeListenerProxy dataChangeListenerProxy = new DataChangeListenerProxy(
-                TestModel.createTestContext(), getSystem().actorSelection(actorRef.path()));
+                getSystem().actorSelection(actorRef.path()));
 
         dataChangeListenerProxy.onDataChanged(new MockDataChangedEvent());
 
index 25d47388fe49579fdb66271e4f33ca7effeef314..19f0f8c5514663224f1bdb83c083bf202a4a4c93 100644 (file)
@@ -29,7 +29,7 @@ public class DataChangeListenerTest extends AbstractActorTest {
             // Let the DataChangeListener know that notifications should be enabled
             subject.tell(new EnableNotification(true), getRef());
 
-            subject.tell(new DataChanged(CompositeModel.createTestContext(), mockChangeEvent),
+            subject.tell(new DataChanged(mockChangeEvent),
                     getRef());
 
             expectMsgClass(DataChangedReply.class);
@@ -48,7 +48,7 @@ public class DataChangeListenerTest extends AbstractActorTest {
             final ActorRef subject =
                 getSystem().actorOf(props, "testDataChangedNotificationsDisabled");
 
-            subject.tell(new DataChanged(CompositeModel.createTestContext(), mockChangeEvent),
+            subject.tell(new DataChanged(mockChangeEvent),
                     getRef());
 
             new Within(duration("1 seconds")) {
@@ -74,8 +74,7 @@ public class DataChangeListenerTest extends AbstractActorTest {
 
             getSystem().eventStream().subscribe(getRef(), DeadLetter.class);
 
-            subject.tell(new DataChanged(CompositeModel.createTestContext(), mockChangeEvent),
-                    ActorRef.noSender());
+            subject.tell(new DataChanged(mockChangeEvent), ActorRef.noSender());
 
             // Make sure no DataChangedReply is sent to DeadLetters.
             while(true) {
@@ -113,13 +112,13 @@ public class DataChangeListenerTest extends AbstractActorTest {
 
             SchemaContext schemaContext = CompositeModel.createTestContext();
 
-            subject.tell(new DataChanged(schemaContext, mockChangeEvent1),getRef());
+            subject.tell(new DataChanged(mockChangeEvent1),getRef());
             expectMsgClass(DataChangedReply.class);
 
-            subject.tell(new DataChanged(schemaContext, mockChangeEvent2),getRef());
+            subject.tell(new DataChanged(mockChangeEvent2),getRef());
             expectMsgClass(DataChangedReply.class);
 
-            subject.tell(new DataChanged(schemaContext, mockChangeEvent3),getRef());
+            subject.tell(new DataChanged(mockChangeEvent3),getRef());
             expectMsgClass(DataChangedReply.class);
 
             Mockito.verify(mockListener).onDataChanged(mockChangeEvent1);
index ed842b2021475b3fef53d95244a37e03ac053d6b..42f30437c9061b916169365e2cfa506a65e8de9a 100644 (file)
@@ -10,7 +10,7 @@ import static org.mockito.Mockito.doAnswer;
 import static org.mockito.Mockito.doReturn;
 import static org.mockito.Mockito.inOrder;
 import static org.mockito.Mockito.mock;
-import static org.opendaylight.controller.cluster.datastore.messages.CreateTransaction.CURRENT_VERSION;
+import static org.opendaylight.controller.cluster.datastore.DataStoreVersions.CURRENT_VERSION;
 import akka.actor.ActorRef;
 import akka.actor.PoisonPill;
 import akka.actor.Props;
index 9f57359429bf5938721f269604109ce5cb49981e..09a4532b53906ab1b803aae880c1e69660537f07 100644 (file)
@@ -21,7 +21,6 @@ import org.junit.BeforeClass;
 import org.junit.Test;
 import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
 import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shard.ShardStats;
-import org.opendaylight.controller.cluster.datastore.messages.CreateTransaction;
 import org.opendaylight.controller.cluster.datastore.node.utils.serialization.NormalizedNodeSerializer;
 import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
 import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
@@ -72,7 +71,7 @@ public class ShardTransactionFailureTest extends AbstractActorTest {
         final ActorRef shard = createShard();
         final Props props = ShardTransaction.props(store.newReadOnlyTransaction(), shard,
                 testSchemaContext, datastoreContext, shardStats, "txn",
-                CreateTransaction.CURRENT_VERSION);
+                DataStoreVersions.CURRENT_VERSION);
 
         final TestActorRef<ShardTransaction> subject = TestActorRef
             .create(getSystem(), props,
@@ -102,7 +101,7 @@ public class ShardTransactionFailureTest extends AbstractActorTest {
         final ActorRef shard = createShard();
         final Props props = ShardTransaction.props(store.newReadWriteTransaction(), shard,
                 testSchemaContext, datastoreContext, shardStats, "txn",
-                CreateTransaction.CURRENT_VERSION);
+                DataStoreVersions.CURRENT_VERSION);
 
         final TestActorRef<ShardTransaction> subject = TestActorRef
             .create(getSystem(), props,
@@ -132,7 +131,7 @@ public class ShardTransactionFailureTest extends AbstractActorTest {
         final ActorRef shard = createShard();
         final Props props = ShardTransaction.props(store.newReadWriteTransaction(), shard,
                 testSchemaContext, datastoreContext, shardStats, "txn",
-                CreateTransaction.CURRENT_VERSION);
+                DataStoreVersions.CURRENT_VERSION);
 
         final TestActorRef<ShardTransaction> subject = TestActorRef
             .create(getSystem(), props,
@@ -162,7 +161,7 @@ public class ShardTransactionFailureTest extends AbstractActorTest {
         final ActorRef shard = createShard();
         final Props props = ShardTransaction.props(store.newWriteOnlyTransaction(), shard,
                 testSchemaContext, datastoreContext, shardStats, "txn",
-                CreateTransaction.CURRENT_VERSION);
+                DataStoreVersions.CURRENT_VERSION);
 
         final TestActorRef<ShardTransaction> subject = TestActorRef
             .create(getSystem(), props,
@@ -195,7 +194,7 @@ public class ShardTransactionFailureTest extends AbstractActorTest {
         final ActorRef shard = createShard();
         final Props props = ShardTransaction.props(store.newReadWriteTransaction(), shard,
                 testSchemaContext, datastoreContext, shardStats, "txn",
-                CreateTransaction.CURRENT_VERSION);
+                DataStoreVersions.CURRENT_VERSION);
 
         final TestActorRef<ShardTransaction> subject = TestActorRef
             .create(getSystem(), props,
@@ -233,7 +232,7 @@ public class ShardTransactionFailureTest extends AbstractActorTest {
         final ActorRef shard = createShard();
         final Props props = ShardTransaction.props(store.newReadWriteTransaction(), shard,
                 testSchemaContext, datastoreContext, shardStats, "txn",
-                CreateTransaction.CURRENT_VERSION);
+                DataStoreVersions.CURRENT_VERSION);
 
         final TestActorRef<ShardTransaction> subject = TestActorRef
             .create(getSystem(), props, "testNegativeMergeTransactionReady");
@@ -266,7 +265,7 @@ public class ShardTransactionFailureTest extends AbstractActorTest {
         final ActorRef shard = createShard();
         final Props props = ShardTransaction.props(store.newReadWriteTransaction(), shard,
                 testSchemaContext, datastoreContext, shardStats, "txn",
-                CreateTransaction.CURRENT_VERSION);
+                DataStoreVersions.CURRENT_VERSION);
 
         final TestActorRef<ShardTransaction> subject = TestActorRef
             .create(getSystem(), props,
index af07aeebcf0ce324014812d986391a438ffd2076..58cec67a2d6cce1f30e3cdf41fdaf7b7d23185a7 100644 (file)
@@ -7,6 +7,12 @@
  */
 package org.opendaylight.controller.cluster.datastore;
 
+import akka.actor.ActorRef;
+import akka.actor.ActorSelection;
+import akka.actor.PoisonPill;
+import akka.actor.Props;
+import akka.dispatch.Dispatchers;
+import akka.testkit.TestActorRef;
 import java.util.Collections;
 import org.junit.Assert;
 import org.junit.Test;
@@ -27,12 +33,6 @@ import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
 import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
 import scala.concurrent.duration.FiniteDuration;
-import akka.actor.ActorRef;
-import akka.actor.ActorSelection;
-import akka.actor.PoisonPill;
-import akka.actor.Props;
-import akka.dispatch.Dispatchers;
-import akka.testkit.TestActorRef;
 
 /**
  * Tests backwards compatibility support from Helium-1 to Helium.
@@ -46,6 +46,7 @@ import akka.testkit.TestActorRef;
  */
 public class ShardTransactionHeliumBackwardsCompatibilityTest extends AbstractActorTest {
 
+    @SuppressWarnings("unchecked")
     @Test
     public void testTransactionCommit() throws Exception {
         new ShardTestKit(getSystem()) {{
@@ -78,9 +79,10 @@ public class ShardTransactionHeliumBackwardsCompatibilityTest extends AbstractAc
             // Write data to the Tx
 
             txActor.tell(new WriteData(TestModel.TEST_PATH,
-                    ImmutableNodes.containerNode(TestModel.TEST_QNAME), schemaContext), getRef());
+                    ImmutableNodes.containerNode(TestModel.TEST_QNAME)).toSerializable(
+                            DataStoreVersions.BASE_HELIUM_VERSION), getRef());
 
-            expectMsgClass(duration, WriteDataReply.class);
+            expectMsgClass(duration, ShardTransactionMessages.WriteDataReply.class);
 
             // Ready the Tx
 
@@ -151,7 +153,7 @@ public class ShardTransactionHeliumBackwardsCompatibilityTest extends AbstractAc
             // Write data to the Tx
 
             txActor.tell(new WriteData(TestModel.TEST_PATH,
-                    ImmutableNodes.containerNode(TestModel.TEST_QNAME), schemaContext), getRef());
+                    ImmutableNodes.containerNode(TestModel.TEST_QNAME)), getRef());
 
             expectMsgClass(duration, WriteDataReply.class);
 
index f5af93d584ce4ff62678f9f499cd629c96ccc80b..79480ce5926dbce0be66580eb602ed8592c30bc0 100644 (file)
@@ -20,7 +20,6 @@ import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier
 import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shard.ShardStats;
 import org.opendaylight.controller.cluster.datastore.messages.CloseTransaction;
 import org.opendaylight.controller.cluster.datastore.messages.CloseTransactionReply;
-import org.opendaylight.controller.cluster.datastore.messages.CreateTransaction;
 import org.opendaylight.controller.cluster.datastore.messages.DataExists;
 import org.opendaylight.controller.cluster.datastore.messages.DataExistsReply;
 import org.opendaylight.controller.cluster.datastore.messages.DeleteData;
@@ -38,6 +37,8 @@ import org.opendaylight.controller.cluster.datastore.modification.DeleteModifica
 import org.opendaylight.controller.cluster.datastore.modification.MergeModification;
 import org.opendaylight.controller.cluster.datastore.modification.Modification;
 import org.opendaylight.controller.cluster.datastore.modification.WriteModification;
+import org.opendaylight.controller.cluster.datastore.node.NormalizedNodeToNodeCodec;
+import org.opendaylight.controller.cluster.datastore.node.NormalizedNodeToNodeCodec.Encoded;
 import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
 import org.opendaylight.controller.md.sal.dom.store.impl.InMemoryDOMDataStore;
 import org.opendaylight.controller.protobuff.messages.transaction.ShardTransactionMessages;
@@ -76,13 +77,13 @@ public class ShardTransactionTest extends AbstractActorTest {
             final ActorRef shard = createShard();
             Props props = ShardTransaction.props(store.newReadOnlyTransaction(), shard,
                     testSchemaContext, datastoreContext, shardStats, "txn",
-                    CreateTransaction.CURRENT_VERSION);
+                    DataStoreVersions.CURRENT_VERSION);
 
             testOnReceiveReadData(getSystem().actorOf(props, "testReadDataRO"));
 
             props = ShardTransaction.props(store.newReadWriteTransaction(), shard,
                     testSchemaContext, datastoreContext, shardStats, "txn",
-                    CreateTransaction.CURRENT_VERSION);
+                    DataStoreVersions.CURRENT_VERSION);
 
             testOnReceiveReadData(getSystem().actorOf(props, "testReadDataRW"));
         }
@@ -92,12 +93,10 @@ public class ShardTransactionTest extends AbstractActorTest {
             transaction.tell(new ReadData(YangInstanceIdentifier.builder().build()).toSerializable(),
                 getRef());
 
-            ShardTransactionMessages.ReadDataReply replySerialized =
-                expectMsgClass(duration("5 seconds"), ReadDataReply.SERIALIZABLE_CLASS);
+            Object replySerialized =
+                    expectMsgClass(duration("5 seconds"), ReadDataReply.SERIALIZABLE_CLASS);
 
-            assertNotNull(ReadDataReply.fromSerializable(
-                testSchemaContext,YangInstanceIdentifier.builder().build(), replySerialized)
-                .getNormalizedNode());
+            assertNotNull(ReadDataReply.fromSerializable(replySerialized).getNormalizedNode());
 
             // unserialized read
             transaction.tell(new ReadData(YangInstanceIdentifier.builder().build()),getRef());
@@ -114,14 +113,14 @@ public class ShardTransactionTest extends AbstractActorTest {
             final ActorRef shard = createShard();
             Props props = ShardTransaction.props( store.newReadOnlyTransaction(), shard,
                     testSchemaContext, datastoreContext, shardStats, "txn",
-                    CreateTransaction.CURRENT_VERSION);
+                    DataStoreVersions.CURRENT_VERSION);
 
             testOnReceiveReadDataWhenDataNotFound(getSystem().actorOf(
                     props, "testReadDataWhenDataNotFoundRO"));
 
             props = ShardTransaction.props( store.newReadWriteTransaction(), shard,
                     testSchemaContext, datastoreContext, shardStats, "txn",
-                    CreateTransaction.CURRENT_VERSION);
+                    DataStoreVersions.CURRENT_VERSION);
 
             testOnReceiveReadDataWhenDataNotFound(getSystem().actorOf(
                     props, "testReadDataWhenDataNotFoundRW"));
@@ -131,11 +130,10 @@ public class ShardTransactionTest extends AbstractActorTest {
             // serialized read
             transaction.tell(new ReadData(TestModel.TEST_PATH).toSerializable(), getRef());
 
-            ShardTransactionMessages.ReadDataReply replySerialized =
-                expectMsgClass(duration("5 seconds"), ReadDataReply.SERIALIZABLE_CLASS);
+            Object replySerialized =
+                    expectMsgClass(duration("5 seconds"), ReadDataReply.SERIALIZABLE_CLASS);
 
-            assertTrue(ReadDataReply.fromSerializable(
-                testSchemaContext, TestModel.TEST_PATH, replySerialized).getNormalizedNode() == null);
+            assertTrue(ReadDataReply.fromSerializable(replySerialized).getNormalizedNode() == null);
 
             // unserialized read
             transaction.tell(new ReadData(TestModel.TEST_PATH),getRef());
@@ -146,19 +144,39 @@ public class ShardTransactionTest extends AbstractActorTest {
         }};
     }
 
+    @Test
+    public void testOnReceiveReadDataHeliumR1() throws Exception {
+        new JavaTestKit(getSystem()) {{
+            final ActorRef shard = createShard();
+            Props props = ShardTransaction.props(store.newReadOnlyTransaction(), shard,
+                    testSchemaContext, datastoreContext, shardStats, "txn",
+                    DataStoreVersions.HELIUM_1_VERSION);
+
+            ActorRef transaction = getSystem().actorOf(props, "testOnReceiveReadDataHeliumR1");
+
+            transaction.tell(new ReadData(YangInstanceIdentifier.builder().build()).toSerializable(),
+                    getRef());
+
+            ShardTransactionMessages.ReadDataReply replySerialized =
+                    expectMsgClass(duration("5 seconds"), ShardTransactionMessages.ReadDataReply.class);
+
+            assertNotNull(ReadDataReply.fromSerializable(replySerialized).getNormalizedNode());
+        }};
+    }
+
     @Test
     public void testOnReceiveDataExistsPositive() throws Exception {
         new JavaTestKit(getSystem()) {{
             final ActorRef shard = createShard();
             Props props = ShardTransaction.props(store.newReadOnlyTransaction(), shard,
                     testSchemaContext, datastoreContext, shardStats, "txn",
-                    CreateTransaction.CURRENT_VERSION);
+                    DataStoreVersions.CURRENT_VERSION);
 
             testOnReceiveDataExistsPositive(getSystem().actorOf(props, "testDataExistsPositiveRO"));
 
             props = ShardTransaction.props(store.newReadWriteTransaction(), shard,
                     testSchemaContext, datastoreContext, shardStats, "txn",
-                    CreateTransaction.CURRENT_VERSION);
+                    DataStoreVersions.CURRENT_VERSION);
 
             testOnReceiveDataExistsPositive(getSystem().actorOf(props, "testDataExistsPositiveRW"));
         }
@@ -187,13 +205,13 @@ public class ShardTransactionTest extends AbstractActorTest {
             final ActorRef shard = createShard();
             Props props = ShardTransaction.props(store.newReadOnlyTransaction(), shard,
                     testSchemaContext, datastoreContext, shardStats, "txn",
-                    CreateTransaction.CURRENT_VERSION);
+                    DataStoreVersions.CURRENT_VERSION);
 
             testOnReceiveDataExistsNegative(getSystem().actorOf(props, "testDataExistsNegativeRO"));
 
             props = ShardTransaction.props(store.newReadWriteTransaction(), shard,
                     testSchemaContext, datastoreContext, shardStats, "txn",
-                    CreateTransaction.CURRENT_VERSION);
+                    DataStoreVersions.CURRENT_VERSION);
 
             testOnReceiveDataExistsNegative(getSystem().actorOf(props, "testDataExistsNegativeRW"));
         }
@@ -234,39 +252,61 @@ public class ShardTransactionTest extends AbstractActorTest {
             final ActorRef shard = createShard();
             final Props props = ShardTransaction.props(store.newWriteOnlyTransaction(), shard,
                     testSchemaContext, datastoreContext, shardStats, "txn",
-                    CreateTransaction.CURRENT_VERSION);
-            final ActorRef transaction = getSystem().actorOf(props, "testWriteData");
+                    DataStoreVersions.CURRENT_VERSION);
+            final ActorRef transaction = getSystem().actorOf(props, "testOnReceiveWriteData");
 
             transaction.tell(new WriteData(TestModel.TEST_PATH,
-                ImmutableNodes.containerNode(TestModel.TEST_QNAME), TestModel.createTestContext()).toSerializable(),
-                getRef());
+                    ImmutableNodes.containerNode(TestModel.TEST_QNAME)).toSerializable(
+                            DataStoreVersions.HELIUM_2_VERSION), getRef());
 
             expectMsgClass(duration("5 seconds"), ShardTransactionMessages.WriteDataReply.class);
 
             assertModification(transaction, WriteModification.class);
 
-            //unserialized write
+            // unserialized write
             transaction.tell(new WriteData(TestModel.TEST_PATH,
-                ImmutableNodes.containerNode(TestModel.TEST_QNAME),
-                TestModel.createTestContext()),
+                ImmutableNodes.containerNode(TestModel.TEST_QNAME)),
                 getRef());
 
             expectMsgClass(duration("5 seconds"), WriteDataReply.class);
         }};
     }
 
+    @Test
+    public void testOnReceiveHeliumR1WriteData() throws Exception {
+        new JavaTestKit(getSystem()) {{
+            final ActorRef shard = createShard();
+            final Props props = ShardTransaction.props(store.newWriteOnlyTransaction(), shard,
+                    testSchemaContext, datastoreContext, shardStats, "txn",
+                    DataStoreVersions.HELIUM_1_VERSION);
+            final ActorRef transaction = getSystem().actorOf(props, "testOnReceiveHeliumR1WriteData");
+
+            Encoded encoded = new NormalizedNodeToNodeCodec(null).encode(TestModel.TEST_PATH,
+                    ImmutableNodes.containerNode(TestModel.TEST_QNAME));
+            ShardTransactionMessages.WriteData serialized = ShardTransactionMessages.WriteData.newBuilder()
+                    .setInstanceIdentifierPathArguments(encoded.getEncodedPath())
+                    .setNormalizedNode(encoded.getEncodedNode().getNormalizedNode()).build();
+
+            transaction.tell(serialized, getRef());
+
+            expectMsgClass(duration("5 seconds"), ShardTransactionMessages.WriteDataReply.class);
+
+            assertModification(transaction, WriteModification.class);
+        }};
+    }
+
     @Test
     public void testOnReceiveMergeData() throws Exception {
         new JavaTestKit(getSystem()) {{
             final ActorRef shard = createShard();
             final Props props = ShardTransaction.props(store.newReadWriteTransaction(), shard,
                     testSchemaContext, datastoreContext, shardStats, "txn",
-                    CreateTransaction.CURRENT_VERSION);
+                    DataStoreVersions.CURRENT_VERSION);
             final ActorRef transaction = getSystem().actorOf(props, "testMergeData");
 
             transaction.tell(new MergeData(TestModel.TEST_PATH,
-                ImmutableNodes.containerNode(TestModel.TEST_QNAME), testSchemaContext).toSerializable(),
-                getRef());
+                    ImmutableNodes.containerNode(TestModel.TEST_QNAME)).toSerializable(
+                            DataStoreVersions.HELIUM_2_VERSION), getRef());
 
             expectMsgClass(duration("5 seconds"), ShardTransactionMessages.MergeDataReply.class);
 
@@ -274,20 +314,43 @@ public class ShardTransactionTest extends AbstractActorTest {
 
             //unserialized merge
             transaction.tell(new MergeData(TestModel.TEST_PATH,
-                ImmutableNodes.containerNode(TestModel.TEST_QNAME), testSchemaContext),
+                ImmutableNodes.containerNode(TestModel.TEST_QNAME)),
                 getRef());
 
             expectMsgClass(duration("5 seconds"), MergeDataReply.class);
         }};
     }
 
+    @Test
+    public void testOnReceiveHeliumR1MergeData() throws Exception {
+        new JavaTestKit(getSystem()) {{
+            final ActorRef shard = createShard();
+            final Props props = ShardTransaction.props(store.newWriteOnlyTransaction(), shard,
+                    testSchemaContext, datastoreContext, shardStats, "txn",
+                    DataStoreVersions.HELIUM_1_VERSION);
+            final ActorRef transaction = getSystem().actorOf(props, "testOnReceiveHeliumR1MergeData");
+
+            Encoded encoded = new NormalizedNodeToNodeCodec(null).encode(TestModel.TEST_PATH,
+                    ImmutableNodes.containerNode(TestModel.TEST_QNAME));
+            ShardTransactionMessages.MergeData serialized = ShardTransactionMessages.MergeData.newBuilder()
+                    .setInstanceIdentifierPathArguments(encoded.getEncodedPath())
+                    .setNormalizedNode(encoded.getEncodedNode().getNormalizedNode()).build();
+
+            transaction.tell(serialized, getRef());
+
+            expectMsgClass(duration("5 seconds"), ShardTransactionMessages.MergeDataReply.class);
+
+            assertModification(transaction, MergeModification.class);
+        }};
+    }
+
     @Test
     public void testOnReceiveDeleteData() throws Exception {
         new JavaTestKit(getSystem()) {{
             final ActorRef shard = createShard();
             final Props props = ShardTransaction.props( store.newWriteOnlyTransaction(), shard,
                     testSchemaContext, datastoreContext, shardStats, "txn",
-                    CreateTransaction.CURRENT_VERSION);
+                    DataStoreVersions.CURRENT_VERSION);
             final ActorRef transaction = getSystem().actorOf(props, "testDeleteData");
 
             transaction.tell(new DeleteData(TestModel.TEST_PATH).toSerializable(), getRef());
@@ -310,7 +373,7 @@ public class ShardTransactionTest extends AbstractActorTest {
             final ActorRef shard = createShard();
             final Props props = ShardTransaction.props( store.newReadWriteTransaction(), shard,
                     testSchemaContext, datastoreContext, shardStats, "txn",
-                    CreateTransaction.CURRENT_VERSION);
+                    DataStoreVersions.CURRENT_VERSION);
             final ActorRef transaction = getSystem().actorOf(props, "testReadyTransaction");
 
             watch(transaction);
@@ -328,7 +391,7 @@ public class ShardTransactionTest extends AbstractActorTest {
             final ActorRef shard = createShard();
             final Props props = ShardTransaction.props( store.newReadWriteTransaction(), shard,
                 testSchemaContext, datastoreContext, shardStats, "txn",
-                CreateTransaction.CURRENT_VERSION);
+                DataStoreVersions.CURRENT_VERSION);
             final ActorRef transaction = getSystem().actorOf(props, "testReadyTransaction2");
 
             watch(transaction);
@@ -343,13 +406,14 @@ public class ShardTransactionTest extends AbstractActorTest {
 
     }
 
+    @SuppressWarnings("unchecked")
     @Test
     public void testOnReceiveCloseTransaction() throws Exception {
         new JavaTestKit(getSystem()) {{
             final ActorRef shard = createShard();
             final Props props = ShardTransaction.props(store.newReadWriteTransaction(), shard,
                     testSchemaContext, datastoreContext, shardStats, "txn",
-                    CreateTransaction.CURRENT_VERSION);
+                    DataStoreVersions.CURRENT_VERSION);
             final ActorRef transaction = getSystem().actorOf(props, "testCloseTransaction");
 
             watch(transaction);
@@ -366,7 +430,7 @@ public class ShardTransactionTest extends AbstractActorTest {
         final ActorRef shard = createShard();
         final Props props = ShardTransaction.props(store.newReadOnlyTransaction(), shard,
                 testSchemaContext, datastoreContext, shardStats, "txn",
-                CreateTransaction.CURRENT_VERSION);
+                DataStoreVersions.CURRENT_VERSION);
         final TestActorRef<ShardTransaction> transaction = TestActorRef.apply(props,getSystem());
 
         transaction.receive(new DeleteData(TestModel.TEST_PATH).toSerializable(), ActorRef.noSender());
@@ -382,7 +446,7 @@ public class ShardTransactionTest extends AbstractActorTest {
             final ActorRef shard = createShard();
             final Props props = ShardTransaction.props(store.newReadWriteTransaction(), shard,
                     testSchemaContext, datastoreContext, shardStats, "txn",
-                    CreateTransaction.CURRENT_VERSION);
+                    DataStoreVersions.CURRENT_VERSION);
             final ActorRef transaction =
                 getSystem().actorOf(props, "testShardTransactionInactivity");
 
index 46060dda2a9bdd1cf1ec94e98d24c53ec97ed5a3..75c93dd5d2fd2de9521d7540da7169fe656c71e9 100644 (file)
@@ -1,11 +1,21 @@
 package org.opendaylight.controller.cluster.datastore;
 
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.isA;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
 import akka.actor.ActorPath;
 import akka.actor.ActorSelection;
 import akka.actor.Props;
 import akka.dispatch.Futures;
 import com.google.common.collect.Lists;
 import com.google.common.util.concurrent.ListenableFuture;
+import java.util.List;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
 import org.junit.Before;
 import org.junit.Test;
 import org.mockito.Mock;
@@ -24,18 +34,6 @@ import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
 import org.opendaylight.controller.cluster.datastore.utils.DoNothingActor;
 import scala.concurrent.Future;
 
-import java.util.List;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.TimeUnit;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.fail;
-import static org.mockito.Mockito.any;
-import static org.mockito.Mockito.doReturn;
-import static org.mockito.Mockito.isA;
-import static org.mockito.Mockito.times;
-import static org.mockito.Mockito.verify;
-
 public class ThreePhaseCommitCohortProxyTest extends AbstractActorTest {
 
     @SuppressWarnings("serial")
@@ -112,14 +110,14 @@ public class ThreePhaseCommitCohortProxyTest extends AbstractActorTest {
         ThreePhaseCommitCohortProxy proxy = setupProxy(1);
 
         setupMockActorContext(CanCommitTransaction.SERIALIZABLE_CLASS,
-                new CanCommitTransactionReply(true));
+                CanCommitTransactionReply.YES);
 
         ListenableFuture<Boolean> future = proxy.canCommit();
 
         assertEquals("canCommit", true, future.get(5, TimeUnit.SECONDS));
 
         setupMockActorContext(CanCommitTransaction.SERIALIZABLE_CLASS,
-                new CanCommitTransactionReply(false));
+                CanCommitTransactionReply.NO);
 
         future = proxy.canCommit();
 
@@ -134,7 +132,7 @@ public class ThreePhaseCommitCohortProxyTest extends AbstractActorTest {
         ThreePhaseCommitCohortProxy proxy = setupProxy(2);
 
         setupMockActorContext(CanCommitTransaction.SERIALIZABLE_CLASS,
-                new CanCommitTransactionReply(true), new CanCommitTransactionReply(true));
+                CanCommitTransactionReply.YES, CanCommitTransactionReply.YES);
 
         ListenableFuture<Boolean> future = proxy.canCommit();
 
@@ -149,8 +147,7 @@ public class ThreePhaseCommitCohortProxyTest extends AbstractActorTest {
         ThreePhaseCommitCohortProxy proxy = setupProxy(3);
 
         setupMockActorContext(CanCommitTransaction.SERIALIZABLE_CLASS,
-                new CanCommitTransactionReply(true), new CanCommitTransactionReply(false),
-                new CanCommitTransactionReply(true));
+                CanCommitTransactionReply.YES, CanCommitTransactionReply.NO, CanCommitTransactionReply.YES);
 
         ListenableFuture<Boolean> future = proxy.canCommit();
 
@@ -289,7 +286,7 @@ public class ThreePhaseCommitCohortProxyTest extends AbstractActorTest {
         ThreePhaseCommitCohortProxy proxy = setupProxy(2);
 
         setupMockActorContext(CanCommitTransaction.SERIALIZABLE_CLASS,
-                new CanCommitTransactionReply(true), new CanCommitTransactionReply(true));
+                CanCommitTransactionReply.YES, CanCommitTransactionReply.YES);
 
         setupMockActorContext(PreCommitTransaction.SERIALIZABLE_CLASS,
                 new PreCommitTransactionReply(), new PreCommitTransactionReply());
index 7407897dfac504872e37d35c374397369a8c5576..5e53b29db13f7fff0accf1397dc691a1f071d8a6 100644 (file)
@@ -1,5 +1,20 @@
 package org.opendaylight.controller.cluster.datastore;
 
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.anyString;
+import static org.mockito.Matchers.argThat;
+import static org.mockito.Matchers.eq;
+import static org.mockito.Matchers.isA;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.opendaylight.controller.cluster.datastore.TransactionProxy.TransactionType.READ_ONLY;
+import static org.opendaylight.controller.cluster.datastore.TransactionProxy.TransactionType.READ_WRITE;
+import static org.opendaylight.controller.cluster.datastore.TransactionProxy.TransactionType.WRITE_ONLY;
 import akka.actor.ActorRef;
 import akka.actor.ActorSelection;
 import akka.actor.ActorSystem;
@@ -11,12 +26,16 @@ import com.google.common.collect.ImmutableMap;
 import com.google.common.util.concurrent.CheckedFuture;
 import com.typesafe.config.Config;
 import com.typesafe.config.ConfigFactory;
+import java.io.IOException;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
 import org.junit.AfterClass;
 import org.junit.Before;
 import org.junit.BeforeClass;
 import org.junit.Test;
 import org.mockito.ArgumentMatcher;
 import org.mockito.Mock;
+import org.mockito.Mockito;
 import org.mockito.MockitoAnnotations;
 import org.opendaylight.controller.cluster.datastore.TransactionProxy.TransactionType;
 import org.opendaylight.controller.cluster.datastore.exceptions.PrimaryNotFoundException;
@@ -42,6 +61,7 @@ import org.opendaylight.controller.cluster.datastore.utils.DoNothingActor;
 import org.opendaylight.controller.cluster.datastore.utils.MockConfiguration;
 import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
 import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
+import org.opendaylight.controller.protobuff.messages.transaction.ShardTransactionMessages;
 import org.opendaylight.controller.protobuff.messages.transaction.ShardTransactionMessages.CreateTransactionReply;
 import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
@@ -50,24 +70,6 @@ import org.opendaylight.yangtools.yang.model.api.SchemaContext;
 import scala.concurrent.Await;
 import scala.concurrent.Future;
 import scala.concurrent.duration.Duration;
-import java.io.IOException;
-import java.util.List;
-import java.util.concurrent.TimeUnit;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
-import static org.mockito.Matchers.any;
-import static org.mockito.Matchers.anyString;
-import static org.mockito.Mockito.argThat;
-import static org.mockito.Mockito.doReturn;
-import static org.mockito.Mockito.eq;
-import static org.mockito.Mockito.isA;
-import static org.mockito.Mockito.times;
-import static org.mockito.Mockito.verify;
-import static org.opendaylight.controller.cluster.datastore.TransactionProxy.TransactionType.READ_ONLY;
-import static org.opendaylight.controller.cluster.datastore.TransactionProxy.TransactionType.READ_WRITE;
-import static org.opendaylight.controller.cluster.datastore.TransactionProxy.TransactionType.WRITE_ONLY;
 
 @SuppressWarnings("resource")
 public class TransactionProxyTest {
@@ -137,9 +139,13 @@ public class TransactionProxyTest {
         ArgumentMatcher<CreateTransaction> matcher = new ArgumentMatcher<CreateTransaction>() {
             @Override
             public boolean matches(Object argument) {
-                CreateTransaction obj = CreateTransaction.fromSerializable(argument);
-                return obj.getTransactionId().startsWith(memberName) &&
-                       obj.getTransactionType() == type.ordinal();
+                if(CreateTransaction.SERIALIZABLE_CLASS.equals(argument.getClass())) {
+                    CreateTransaction obj = CreateTransaction.fromSerializable(argument);
+                    return obj.getTransactionId().startsWith(memberName) &&
+                            obj.getTransactionType() == type.ordinal();
+                }
+
+                return false;
             }
         };
 
@@ -195,16 +201,25 @@ public class TransactionProxyTest {
     }
 
     private WriteData eqSerializedWriteData(final NormalizedNode<?, ?> nodeToWrite) {
+        return eqSerializedWriteData(nodeToWrite, DataStoreVersions.CURRENT_VERSION);
+    }
+
+    private WriteData eqSerializedWriteData(final NormalizedNode<?, ?> nodeToWrite,
+            final int transactionVersion) {
         ArgumentMatcher<WriteData> matcher = new ArgumentMatcher<WriteData>() {
             @Override
             public boolean matches(Object argument) {
-                if(!WriteData.SERIALIZABLE_CLASS.equals(argument.getClass())) {
-                    return false;
+                if((transactionVersion >= DataStoreVersions.LITHIUM_VERSION &&
+                        WriteData.SERIALIZABLE_CLASS.equals(argument.getClass())) ||
+                   (transactionVersion < DataStoreVersions.LITHIUM_VERSION &&
+                           ShardTransactionMessages.WriteData.class.equals(argument.getClass()))) {
+
+                    WriteData obj = WriteData.fromSerializable(argument);
+                    return obj.getPath().equals(TestModel.TEST_PATH) &&
+                           obj.getData().equals(nodeToWrite);
                 }
 
-                WriteData obj = WriteData.fromSerializable(argument, schemaContext);
-                return obj.getPath().equals(TestModel.TEST_PATH) &&
-                       obj.getData().equals(nodeToWrite);
+                return false;
             }
         };
 
@@ -228,16 +243,25 @@ public class TransactionProxyTest {
     }
 
     private MergeData eqSerializedMergeData(final NormalizedNode<?, ?> nodeToWrite) {
+        return eqSerializedMergeData(nodeToWrite, DataStoreVersions.CURRENT_VERSION);
+    }
+
+    private MergeData eqSerializedMergeData(final NormalizedNode<?, ?> nodeToWrite,
+            final int transactionVersion) {
         ArgumentMatcher<MergeData> matcher = new ArgumentMatcher<MergeData>() {
             @Override
             public boolean matches(Object argument) {
-                if(!MergeData.SERIALIZABLE_CLASS.equals(argument.getClass())) {
-                    return false;
+                if((transactionVersion >= DataStoreVersions.LITHIUM_VERSION &&
+                        MergeData.SERIALIZABLE_CLASS.equals(argument.getClass())) ||
+                   (transactionVersion < DataStoreVersions.LITHIUM_VERSION &&
+                           ShardTransactionMessages.MergeData.class.equals(argument.getClass()))) {
+
+                    MergeData obj = MergeData.fromSerializable(argument);
+                    return obj.getPath().equals(TestModel.TEST_PATH) &&
+                           obj.getData().equals(nodeToWrite);
                 }
 
-                MergeData obj = MergeData.fromSerializable(argument, schemaContext);
-                return obj.getPath().equals(TestModel.TEST_PATH) &&
-                       obj.getData().equals(nodeToWrite);
+                return false;
             }
         };
 
@@ -293,13 +317,17 @@ public class TransactionProxyTest {
         return Futures.successful((Object)new ReadyTransactionReply(path));
     }
 
+    private Future<Object> readSerializedDataReply(NormalizedNode<?, ?> data,
+            short transactionVersion) {
+        return Futures.successful(new ReadDataReply(data).toSerializable(transactionVersion));
+    }
 
     private Future<Object> readSerializedDataReply(NormalizedNode<?, ?> data) {
-        return Futures.successful(new ReadDataReply(schemaContext, data).toSerializable());
+        return readSerializedDataReply(data, DataStoreVersions.CURRENT_VERSION);
     }
 
     private Future<ReadDataReply> readDataReply(NormalizedNode<?, ?> data) {
-        return Futures.successful(new ReadDataReply(schemaContext, data));
+        return Futures.successful(new ReadDataReply(data));
     }
 
     private Future<Object> dataExistsSerializedReply(boolean exists) {
@@ -310,16 +338,24 @@ public class TransactionProxyTest {
         return Futures.successful(new DataExistsReply(exists));
     }
 
+    private Future<Object> writeSerializedDataReply(short version) {
+        return Futures.successful(new WriteDataReply().toSerializable(version));
+    }
+
     private Future<Object> writeSerializedDataReply() {
-        return Futures.successful(new WriteDataReply().toSerializable());
+        return writeSerializedDataReply(DataStoreVersions.CURRENT_VERSION);
     }
 
     private Future<WriteDataReply> writeDataReply() {
         return Futures.successful(new WriteDataReply());
     }
 
+    private Future<Object> mergeSerializedDataReply(short version) {
+        return Futures.successful(new MergeDataReply().toSerializable(version));
+    }
+
     private Future<Object> mergeSerializedDataReply() {
-        return Futures.successful(new MergeDataReply().toSerializable());
+        return mergeSerializedDataReply(DataStoreVersions.CURRENT_VERSION);
     }
 
     private Future<MergeDataReply> mergeDataReply() {
@@ -346,7 +382,8 @@ public class TransactionProxyTest {
             .build();
     }
 
-    private ActorRef setupActorContextWithInitialCreateTransaction(ActorSystem actorSystem, TransactionType type, int transactionVersion) {
+    private ActorRef setupActorContextWithInitialCreateTransaction(ActorSystem actorSystem,
+            TransactionType type, int transactionVersion) {
         ActorRef actorRef = actorSystem.actorOf(Props.create(DoNothingActor.class));
         doReturn(actorSystem.actorSelection(actorRef.path())).
                 when(mockActorContext).actorSelection(actorRef.path().toString());
@@ -358,13 +395,11 @@ public class TransactionProxyTest {
                 executeOperationAsync(eq(actorSystem.actorSelection(actorRef.path())),
                         eqCreateTransaction(memberName, type));
 
-        doReturn(false).when(mockActorContext).isPathLocal(actorRef.path().toString());
-
         return actorRef;
     }
 
     private ActorRef setupActorContextWithInitialCreateTransaction(ActorSystem actorSystem, TransactionType type) {
-        return setupActorContextWithInitialCreateTransaction(actorSystem, type, CreateTransaction.CURRENT_VERSION);
+        return setupActorContextWithInitialCreateTransaction(actorSystem, type, DataStoreVersions.CURRENT_VERSION);
     }
 
 
@@ -718,7 +753,7 @@ public class TransactionProxyTest {
                 eq(actorSelection(actorRef)), eqSerializedWriteData(nodeToWrite));
 
         verifyRecordingOperationFutures(transactionProxy.getRecordedOperationFutures(),
-                WriteDataReply.SERIALIZABLE_CLASS);
+                WriteDataReply.class);
     }
 
     @Test(expected=IllegalStateException.class)
@@ -760,7 +795,7 @@ public class TransactionProxyTest {
                 eq(actorSelection(actorRef)), eqSerializedMergeData(nodeToWrite));
 
         verifyRecordingOperationFutures(transactionProxy.getRecordedOperationFutures(),
-                MergeDataReply.SERIALIZABLE_CLASS);
+                MergeDataReply.class);
     }
 
     @Test
@@ -836,22 +871,25 @@ public class TransactionProxyTest {
         ThreePhaseCommitCohortProxy proxy = (ThreePhaseCommitCohortProxy) ready;
 
         verifyRecordingOperationFutures(transactionProxy.getRecordedOperationFutures(),
-                WriteDataReply.SERIALIZABLE_CLASS);
+                WriteDataReply.class);
 
         verifyCohortFutures(proxy, getSystem().actorSelection(actorRef.path()));
     }
 
-    @Test
-    public void testReadyForwardCompatibility() throws Exception {
-        ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_WRITE, 0);
+    private ActorRef testCompatibilityWithHeliumVersion(short version) throws Exception {
+        ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(),
+                READ_WRITE, version);
 
-        NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
+        NormalizedNode<?, ?> testNode = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
 
-        doReturn(readSerializedDataReply(null)).when(mockActorContext).executeOperationAsync(
+        doReturn(readSerializedDataReply(testNode, version)).when(mockActorContext).executeOperationAsync(
                 eq(actorSelection(actorRef)), eqSerializedReadData());
 
-        doReturn(writeSerializedDataReply()).when(mockActorContext).executeOperationAsync(
-                eq(actorSelection(actorRef)), eqSerializedWriteData(nodeToWrite));
+        doReturn(writeSerializedDataReply(version)).when(mockActorContext).executeOperationAsync(
+                eq(actorSelection(actorRef)), eqSerializedWriteData(testNode, version));
+
+        doReturn(mergeSerializedDataReply(version)).when(mockActorContext).executeOperationAsync(
+                eq(actorSelection(actorRef)), eqSerializedMergeData(testNode, version));
 
         doReturn(readySerializedTxReply(actorRef.path().toString())).when(mockActorContext).executeOperationAsync(
                 eq(actorSelection(actorRef)), isA(ReadyTransaction.SERIALIZABLE_CLASS));
@@ -859,12 +897,17 @@ public class TransactionProxyTest {
         doReturn(actorRef.path().toString()).when(mockActorContext).resolvePath(eq(actorRef.path().toString()),
                 eq(actorRef.path().toString()));
 
-        TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
-                READ_WRITE);
+        TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, READ_WRITE);
 
-        transactionProxy.read(TestModel.TEST_PATH);
+        Optional<NormalizedNode<?, ?>> readOptional = transactionProxy.read(TestModel.TEST_PATH).
+                get(5, TimeUnit.SECONDS);
 
-        transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
+        assertEquals("NormalizedNode isPresent", true, readOptional.isPresent());
+        assertEquals("Response NormalizedNode", testNode, readOptional.get());
+
+        transactionProxy.write(TestModel.TEST_PATH, testNode);
+
+        transactionProxy.merge(TestModel.TEST_PATH, testNode);
 
         DOMStoreThreePhaseCommitCohort ready = transactionProxy.ready();
 
@@ -873,14 +916,29 @@ public class TransactionProxyTest {
         ThreePhaseCommitCohortProxy proxy = (ThreePhaseCommitCohortProxy) ready;
 
         verifyRecordingOperationFutures(transactionProxy.getRecordedOperationFutures(),
-                WriteDataReply.SERIALIZABLE_CLASS);
+                ShardTransactionMessages.WriteDataReply.class, ShardTransactionMessages.MergeDataReply.class);
 
         verifyCohortFutures(proxy, getSystem().actorSelection(actorRef.path()));
 
+        return actorRef;
+    }
+
+    @Test
+    public void testCompatibilityWithBaseHeliumVersion() throws Exception {
+        ActorRef actorRef = testCompatibilityWithHeliumVersion(DataStoreVersions.BASE_HELIUM_VERSION);
+
         verify(mockActorContext).resolvePath(eq(actorRef.path().toString()),
                 eq(actorRef.path().toString()));
     }
 
+    @Test
+    public void testCompatibilityWithHeliumR1Version() throws Exception {
+        ActorRef actorRef = testCompatibilityWithHeliumVersion(DataStoreVersions.HELIUM_1_VERSION);
+
+        verify(mockActorContext, Mockito.never()).resolvePath(eq(actorRef.path().toString()),
+                eq(actorRef.path().toString()));
+    }
+
     @Test
     public void testReadyWithRecordingOperationFailure() throws Exception {
         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), WRITE_ONLY);
@@ -914,7 +972,7 @@ public class TransactionProxyTest {
         verifyCohortFutures(proxy, TestException.class);
 
         verifyRecordingOperationFutures(transactionProxy.getRecordedOperationFutures(),
-                MergeDataReply.SERIALIZABLE_CLASS, TestException.class);
+                MergeDataReply.class, TestException.class);
     }
 
     @Test
@@ -942,7 +1000,7 @@ public class TransactionProxyTest {
         ThreePhaseCommitCohortProxy proxy = (ThreePhaseCommitCohortProxy) ready;
 
         verifyRecordingOperationFutures(transactionProxy.getRecordedOperationFutures(),
-                MergeDataReply.SERIALIZABLE_CLASS);
+                MergeDataReply.class);
 
         verifyCohortFutures(proxy, TestException.class);
     }
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/messages/DataChangedTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/messages/DataChangedTest.java
new file mode 100644 (file)
index 0000000..2c1de7f
--- /dev/null
@@ -0,0 +1,57 @@
+/*
+ * Copyright (c) 2014 Brocade Communications Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.datastore.messages;
+
+import static org.junit.Assert.assertEquals;
+import org.apache.commons.lang.SerializationUtils;
+import org.junit.Test;
+import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
+import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker.DataChangeScope;
+import org.opendaylight.controller.md.sal.dom.store.impl.DOMImmutableDataChangeEvent;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
+import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
+import org.opendaylight.yangtools.yang.data.impl.schema.builder.impl.ImmutableContainerNodeBuilder;
+
+/**
+ * Unit tests for DataChanged.
+ *
+ * @author Thomas Pantelis
+ */
+public class DataChangedTest {
+
+    @Test
+    public void testSerialization() {
+        DOMImmutableDataChangeEvent change = DOMImmutableDataChangeEvent.builder(DataChangeScope.SUBTREE).
+                addCreated(TestModel.TEST_PATH, ImmutableContainerNodeBuilder.create().withNodeIdentifier(
+                        new YangInstanceIdentifier.NodeIdentifier(TestModel.TEST_QNAME)).
+                        withChild(ImmutableNodes.leafNode(TestModel.DESC_QNAME, "foo")).build()).
+                addUpdated(TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME),
+                        ImmutableContainerNodeBuilder.create().withNodeIdentifier(
+                            new YangInstanceIdentifier.NodeIdentifier(TestModel.TEST_QNAME)).
+                            withChild(ImmutableNodes.leafNode(TestModel.NAME_QNAME, "bar")).build())
+.
+                addRemoved(TestModel.OUTER_LIST_PATH,
+                       ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build()).
+                setBefore(ImmutableNodes.containerNode(TestModel.TEST_QNAME)).
+                setAfter(ImmutableContainerNodeBuilder.create().withNodeIdentifier(
+                        new YangInstanceIdentifier.NodeIdentifier(TestModel.TEST_QNAME)).
+                        withChild(ImmutableNodes.leafNode(TestModel.DESC_QNAME, "foo")).
+                        withChild(ImmutableNodes.leafNode(TestModel.NAME_QNAME, "bar")).build()).build();
+
+        DataChanged expected = new DataChanged(change);
+
+        DataChanged actual = (DataChanged) SerializationUtils.clone(expected);
+
+        assertEquals("getCreatedData", change.getCreatedData(), actual.getChange().getCreatedData());
+        assertEquals("getOriginalData", change.getOriginalData(), actual.getChange().getOriginalData());
+        assertEquals("getOriginalSubtree", change.getOriginalSubtree(), actual.getChange().getOriginalSubtree());
+        assertEquals("getRemovedPaths", change.getRemovedPaths(), actual.getChange().getRemovedPaths());
+        assertEquals("getUpdatedData", change.getUpdatedData(), actual.getChange().getUpdatedData());
+        assertEquals("getUpdatedSubtree", change.getUpdatedSubtree(), actual.getChange().getUpdatedSubtree());
+    }
+}
index 8f3ca9c535a06a1ae5a67f80c16dec2a2c5c2e48..5b40afdff8288ff714cc4f9b4a398e2f2724369b 100644 (file)
@@ -1,21 +1,70 @@
 package org.opendaylight.controller.cluster.datastore.messages;
 
-import org.junit.Assert;
+import static org.junit.Assert.assertEquals;
+import java.io.Serializable;
+import org.apache.commons.lang.SerializationUtils;
 import org.junit.Test;
+import org.opendaylight.controller.cluster.datastore.DataStoreVersions;
 import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
+import org.opendaylight.controller.protobuff.messages.common.NormalizedNodeMessages.InstanceIdentifier;
+import org.opendaylight.controller.protobuff.messages.common.NormalizedNodeMessages.Node;
+import org.opendaylight.controller.protobuff.messages.transaction.ShardTransactionMessages;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
+import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
 import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
-import org.opendaylight.yangtools.yang.model.api.SchemaContext;
+import org.opendaylight.yangtools.yang.data.impl.schema.builder.impl.ImmutableContainerNodeBuilder;
 
 public class MergeDataTest {
 
     @Test
     public void testSerialization() {
-        SchemaContext schemaContext = TestModel.createTestContext();
-        MergeData expected = new MergeData(TestModel.TEST_PATH, ImmutableNodes
-            .containerNode(TestModel.TEST_QNAME), schemaContext);
+        YangInstanceIdentifier path = TestModel.TEST_PATH;
+        NormalizedNode<?, ?> data = ImmutableContainerNodeBuilder.create().withNodeIdentifier(
+                new YangInstanceIdentifier.NodeIdentifier(TestModel.TEST_QNAME)).
+                withChild(ImmutableNodes.leafNode(TestModel.DESC_QNAME, "foo")).build();
 
-        MergeData actual = MergeData.fromSerializable(expected.toSerializable(), schemaContext);
-        Assert.assertEquals("getPath", expected.getPath(), actual.getPath());
-        Assert.assertEquals("getData", expected.getData(), actual.getData());
+        MergeData expected = new MergeData(path, data);
+
+        Object serialized = expected.toSerializable(DataStoreVersions.CURRENT_VERSION);
+        assertEquals("Serialized type", MergeData.class, serialized.getClass());
+        assertEquals("Version", DataStoreVersions.CURRENT_VERSION, ((MergeData)serialized).getVersion());
+
+        Object clone = SerializationUtils.clone((Serializable) serialized);
+        assertEquals("Version", DataStoreVersions.CURRENT_VERSION, ((MergeData)clone).getVersion());
+        MergeData actual = MergeData.fromSerializable(clone);
+        assertEquals("getPath", expected.getPath(), actual.getPath());
+        assertEquals("getData", expected.getData(), actual.getData());
+    }
+
+    @Test
+    public void testIsSerializedType() {
+        assertEquals("isSerializedType", true, MergeData.isSerializedType(
+                ShardTransactionMessages.MergeData.newBuilder()
+                    .setInstanceIdentifierPathArguments(InstanceIdentifier.getDefaultInstance())
+                    .setNormalizedNode(Node.getDefaultInstance()).build()));
+        assertEquals("isSerializedType", true,
+                MergeData.isSerializedType(new MergeData()));
+        assertEquals("isSerializedType", false, MergeData.isSerializedType(new Object()));
+    }
+
+    /**
+     * Tests backwards compatible serialization/deserialization of a MergeData message with the
+     * base and R1 Helium versions, which used the protobuff MergeData message.
+     */
+    @Test
+    public void testSerializationWithHeliumR1Version() throws Exception {
+        YangInstanceIdentifier path = TestModel.TEST_PATH;
+        NormalizedNode<?, ?> data = ImmutableContainerNodeBuilder.create().withNodeIdentifier(
+                new YangInstanceIdentifier.NodeIdentifier(TestModel.TEST_QNAME)).
+                withChild(ImmutableNodes.leafNode(TestModel.DESC_QNAME, "foo")).build();
+
+        MergeData expected = new MergeData(path, data);
+
+        Object serialized = expected.toSerializable(DataStoreVersions.HELIUM_1_VERSION);
+        assertEquals("Serialized type", ShardTransactionMessages.MergeData.class, serialized.getClass());
+
+        MergeData actual = MergeData.fromSerializable(SerializationUtils.clone((Serializable) serialized));
+        assertEquals("getPath", expected.getPath(), actual.getPath());
+        assertEquals("getData", expected.getData(), actual.getData());
     }
 }
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/messages/ReadDataReplyTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/messages/ReadDataReplyTest.java
new file mode 100644 (file)
index 0000000..8ce7329
--- /dev/null
@@ -0,0 +1,72 @@
+/*
+ * Copyright (c) 2014 Brocade Communications Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.datastore.messages;
+
+import static org.junit.Assert.assertEquals;
+import java.io.Serializable;
+import org.apache.commons.lang.SerializationUtils;
+import org.junit.Test;
+import org.opendaylight.controller.cluster.datastore.DataStoreVersions;
+import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
+import org.opendaylight.controller.protobuff.messages.transaction.ShardTransactionMessages;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
+import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
+import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
+import org.opendaylight.yangtools.yang.data.impl.schema.builder.impl.ImmutableContainerNodeBuilder;
+
+/**
+ * Unit tests for ReadDataReply.
+ *
+ * @author Thomas Pantelis
+ */
+public class ReadDataReplyTest {
+
+    @Test
+    public void testSerialization() {
+        NormalizedNode<?, ?> data = ImmutableContainerNodeBuilder.create().withNodeIdentifier(
+                new YangInstanceIdentifier.NodeIdentifier(TestModel.TEST_QNAME)).
+                withChild(ImmutableNodes.leafNode(TestModel.DESC_QNAME, "foo")).build();
+
+        ReadDataReply expected = new ReadDataReply(data);
+
+        Object serialized = expected.toSerializable(DataStoreVersions.CURRENT_VERSION);
+        assertEquals("Serialized type", ReadDataReply.class, serialized.getClass());
+
+        ReadDataReply actual = ReadDataReply.fromSerializable(SerializationUtils.clone(
+                (Serializable) serialized));
+        assertEquals("getNormalizedNode", expected.getNormalizedNode(), actual.getNormalizedNode());
+    }
+
+    @Test
+    public void testIsSerializedType() {
+        assertEquals("isSerializedType", true, ReadDataReply.isSerializedType(
+                ShardTransactionMessages.ReadDataReply.newBuilder().build()));
+        assertEquals("isSerializedType", true, ReadDataReply.isSerializedType(new ReadDataReply()));
+        assertEquals("isSerializedType", false, ReadDataReply.isSerializedType(new Object()));
+    }
+
+    /**
+     * Tests backwards compatible serialization/deserialization of a ReadDataReply message with the
+     * base and R1 Helium versions, which used the protobuff ReadDataReply message.
+     */
+    @Test
+    public void testSerializationWithHeliumR1Version() throws Exception {
+        NormalizedNode<?, ?> data = ImmutableContainerNodeBuilder.create().withNodeIdentifier(
+                new YangInstanceIdentifier.NodeIdentifier(TestModel.TEST_QNAME)).
+                withChild(ImmutableNodes.leafNode(TestModel.DESC_QNAME, "foo")).build();
+
+        ReadDataReply expected = new ReadDataReply(data);
+
+        Object serialized = expected.toSerializable(DataStoreVersions.HELIUM_1_VERSION);
+        assertEquals("Serialized type", ShardTransactionMessages.ReadDataReply.class, serialized.getClass());
+
+        ReadDataReply actual = ReadDataReply.fromSerializable(SerializationUtils.clone(
+                (Serializable) serialized));
+        assertEquals("getNormalizedNode", expected.getNormalizedNode(), actual.getNormalizedNode());
+    }
+}
index 6a5d65f8daab3e3952393e8bbdb764560c355886..90a76f229e1ddd045c084e56b1b854a518b43a92 100644 (file)
@@ -7,11 +7,19 @@
  */
 package org.opendaylight.controller.cluster.datastore.messages;
 
-import org.junit.Assert;
+import static org.junit.Assert.assertEquals;
+import java.io.Serializable;
+import org.apache.commons.lang.SerializationUtils;
 import org.junit.Test;
+import org.opendaylight.controller.cluster.datastore.DataStoreVersions;
 import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
+import org.opendaylight.controller.protobuff.messages.common.NormalizedNodeMessages.InstanceIdentifier;
+import org.opendaylight.controller.protobuff.messages.common.NormalizedNodeMessages.Node;
+import org.opendaylight.controller.protobuff.messages.transaction.ShardTransactionMessages;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
+import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
 import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
-import org.opendaylight.yangtools.yang.model.api.SchemaContext;
+import org.opendaylight.yangtools.yang.data.impl.schema.builder.impl.ImmutableContainerNodeBuilder;
 
 /**
  * Unit tests for WriteData.
@@ -22,12 +30,52 @@ public class WriteDataTest {
 
     @Test
     public void testSerialization() {
-        SchemaContext schemaContext = TestModel.createTestContext();
-        WriteData expected = new WriteData(TestModel.TEST_PATH, ImmutableNodes
-            .containerNode(TestModel.TEST_QNAME), schemaContext);
+        YangInstanceIdentifier path = TestModel.TEST_PATH;
+        NormalizedNode<?, ?> data = ImmutableContainerNodeBuilder.create().withNodeIdentifier(
+                new YangInstanceIdentifier.NodeIdentifier(TestModel.TEST_QNAME)).
+                withChild(ImmutableNodes.leafNode(TestModel.DESC_QNAME, "foo")).build();
 
-        WriteData actual = WriteData.fromSerializable(expected.toSerializable(), schemaContext);
-        Assert.assertEquals("getPath", expected.getPath(), actual.getPath());
-        Assert.assertEquals("getData", expected.getData(), actual.getData());
+        WriteData expected = new WriteData(path, data);
+
+        Object serialized = expected.toSerializable(DataStoreVersions.CURRENT_VERSION);
+        assertEquals("Serialized type", WriteData.class, serialized.getClass());
+        assertEquals("Version", DataStoreVersions.CURRENT_VERSION, ((WriteData)serialized).getVersion());
+
+        Object clone = SerializationUtils.clone((Serializable) serialized);
+        assertEquals("Version", DataStoreVersions.CURRENT_VERSION, ((WriteData)clone).getVersion());
+        WriteData actual = WriteData.fromSerializable(clone);
+        assertEquals("getPath", expected.getPath(), actual.getPath());
+        assertEquals("getData", expected.getData(), actual.getData());
+    }
+
+    @Test
+    public void testIsSerializedType() {
+        assertEquals("isSerializedType", true, WriteData.isSerializedType(
+                ShardTransactionMessages.WriteData.newBuilder()
+                    .setInstanceIdentifierPathArguments(InstanceIdentifier.getDefaultInstance())
+                    .setNormalizedNode(Node.getDefaultInstance()).build()));
+        assertEquals("isSerializedType", true, WriteData.isSerializedType(new WriteData()));
+        assertEquals("isSerializedType", false, WriteData.isSerializedType(new Object()));
+    }
+
+    /**
+     * Tests backwards compatible serialization/deserialization of a WriteData message with the
+     * base and R1 Helium versions, which used the protobuff WriteData message.
+     */
+    @Test
+    public void testSerializationWithHeliumR1Version() throws Exception {
+        YangInstanceIdentifier path = TestModel.TEST_PATH;
+        NormalizedNode<?, ?> data = ImmutableContainerNodeBuilder.create().withNodeIdentifier(
+                new YangInstanceIdentifier.NodeIdentifier(TestModel.TEST_QNAME)).
+                withChild(ImmutableNodes.leafNode(TestModel.DESC_QNAME, "foo")).build();
+
+        WriteData expected = new WriteData(path, data);
+
+        Object serialized = expected.toSerializable(DataStoreVersions.HELIUM_1_VERSION);
+        assertEquals("Serialized type", ShardTransactionMessages.WriteData.class, serialized.getClass());
+
+        WriteData actual = WriteData.fromSerializable(SerializationUtils.clone((Serializable) serialized));
+        assertEquals("getPath", expected.getPath(), actual.getPath());
+        assertEquals("getData", expected.getData(), actual.getData());
     }
 }
index 85441eca0d290ca4968cea22e793b4897fc3ec36..e571e3a715a7b0b845fbb1ea52b7fda096da1f32 100644 (file)
@@ -7,16 +7,15 @@
  */
 package org.opendaylight.controller.md.cluster.datastore.model;
 
+import java.io.InputStream;
+import java.util.Collections;
+import java.util.Set;
 import org.opendaylight.yangtools.yang.common.QName;
 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
 import org.opendaylight.yangtools.yang.model.api.Module;
 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
 import org.opendaylight.yangtools.yang.parser.impl.YangParserImpl;
 
-import java.io.InputStream;
-import java.util.Collections;
-import java.util.Set;
-
 public class TestModel {
 
   public static final QName TEST_QNAME = QName.create("urn:opendaylight:params:xml:ns:yang:controller:md:sal:dom:store:test", "2014-03-13",
@@ -26,6 +25,7 @@ public class TestModel {
   public static final QName OUTER_CHOICE_QNAME = QName.create(TEST_QNAME, "outer-choice");
   public static final QName ID_QNAME = QName.create(TEST_QNAME, "id");
   public static final QName NAME_QNAME = QName.create(TEST_QNAME, "name");
+  public static final QName DESC_QNAME = QName.create(TEST_QNAME, "desc");
   public static final QName VALUE_QNAME = QName.create(TEST_QNAME, "value");
   private static final String DATASTORE_TEST_YANG = "/odl-datastore-test.yang";