From cad857b425b1a0072681066b2ba37b0b0dc8c111 Mon Sep 17 00:00:00 2001 From: tpantelis Date: Sun, 18 Jan 2015 22:21:46 -0500 Subject: [PATCH] Bug 2265: Use new NormalizedNode streaming in messages MIME-Version: 1.0 Content-Type: text/plain; charset=utf8 Content-Transfer-Encoding: 8bit Utilized the new NormalizedNode streaming classes for the WriteData, MergeData, ReadDataReply and DataChanged messages. One solution was to add a bytes field to the protobuff messages and make the previous fields optional. For backwards compatibility, in the wrapper message's fromSerializable method, check whether or not the protobuff message has the bytes field and act accordingly. While this works, it results in an undesirable inefficiency. Protobuff translates the bytes field to a ByteString type. So when streaming, we need to create a ByteArrayOutputStream and pass that to the NormalizedNode stream writer. Then call ByteString.copyFrom(bos.toByteArray) to get the resulting ByteString. However this results in 2 copies of the serialized byte[]. The byte[] cannot be passed to ByteString as is as it always copies it to maintain immutability. I looked into subclassing ByteString and lazily streaming the data on demand but the ByteString ctor is package scoped so they don’t allow subclassing. So I went with an approach to make each message Externalizable instead of using protobuff. The classes writes the version number to enable us to handle compatibility in the future. So in this manner, we can get the efficient direct streaming we want and easily handle backwards and forwards compatibility. I added a VersionedSerializableMessage interface whose toSerializable method takes a version number. The version # is passed from the remote version # obtained from the CreateTransactionReply. This allows the message classes to handle backwards compatibility. So if the version is Helium-1 or less, send the protobuff message otherwise send the Externalizable instance. In the fromSerializable method, it checks if the passed Object is an instance of the Externalizable class or the protbuff message type. Change-Id: I5ebb968e70ac8ff92c29183c52e6c3fe5362ae34 Signed-off-by: tpantelis --- .../transaction/ShardTransactionMessages.java | 120 ++++++++ .../src/main/resources/ShardTransaction.proto | 12 +- .../cluster/datastore/DataChangeListener.java | 2 +- .../datastore/DataChangeListenerProxy.java | 13 +- .../cluster/datastore/DataStoreVersions.java | 21 ++ .../controller/cluster/datastore/Shard.java | 21 +- .../datastore/ShardCommitCoordinator.java | 17 +- .../datastore/ShardReadTransaction.java | 4 +- .../datastore/ShardReadWriteTransaction.java | 5 +- .../cluster/datastore/ShardTransaction.java | 26 +- .../datastore/ShardTransactionChain.java | 3 +- .../datastore/ShardWriteTransaction.java | 33 +- .../cluster/datastore/TransactionProxy.java | 32 +- .../messages/AbortTransactionReply.java | 7 +- .../messages/CanCommitTransactionReply.java | 19 +- .../datastore/messages/CloseTransaction.java | 18 +- .../messages/CloseTransactionChainReply.java | 17 +- .../messages/CloseTransactionReply.java | 18 +- .../messages/CommitTransactionReply.java | 7 +- .../datastore/messages/CreateTransaction.java | 14 +- .../messages/CreateTransactionReply.java | 20 +- .../datastore/messages/DataChanged.java | 288 ++++++------------ .../datastore/messages/DataChangedReply.java | 18 +- .../datastore/messages/DeleteDataReply.java | 17 +- .../messages/EmptyExternalizable.java | 29 ++ .../datastore/messages/EmptyReply.java | 30 ++ .../cluster/datastore/messages/MergeData.java | 52 ++-- .../datastore/messages/MergeDataReply.java | 17 +- .../datastore/messages/ModifyData.java | 52 +++- .../datastore/messages/ReadDataReply.java | 82 +++-- .../datastore/messages/ReadyTransaction.java | 15 +- .../VersionedSerializableMessage.java | 17 ++ .../cluster/datastore/messages/WriteData.java | 51 +++- .../datastore/messages/WriteDataReply.java | 18 +- .../datastore/utils/SerializationUtils.java | 81 +++++ .../DataChangeListenerProxyTest.java | 4 +- .../datastore/DataChangeListenerTest.java | 13 +- .../cluster/datastore/ShardTest.java | 2 +- .../ShardTransactionFailureTest.java | 15 +- ...ctionHeliumBackwardsCompatibilityTest.java | 20 +- .../datastore/ShardTransactionTest.java | 134 +++++--- .../ThreePhaseCommitCohortProxyTest.java | 33 +- .../datastore/TransactionProxyTest.java | 170 +++++++---- .../datastore/messages/DataChangedTest.java | 57 ++++ .../datastore/messages/MergeDataTest.java | 65 +++- .../datastore/messages/ReadDataReplyTest.java | 72 +++++ .../datastore/messages/WriteDataTest.java | 64 +++- .../md/cluster/datastore/model/TestModel.java | 8 +- 48 files changed, 1268 insertions(+), 585 deletions(-) create mode 100644 opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DataStoreVersions.java create mode 100644 opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/EmptyExternalizable.java create mode 100644 opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/EmptyReply.java create mode 100644 opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/VersionedSerializableMessage.java create mode 100644 opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/utils/SerializationUtils.java create mode 100644 opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/messages/DataChangedTest.java create mode 100644 opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/messages/ReadDataReplyTest.java diff --git a/opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/protobuff/messages/transaction/ShardTransactionMessages.java b/opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/protobuff/messages/transaction/ShardTransactionMessages.java index 3a1cfaa443..8be6ad14dd 100644 --- a/opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/protobuff/messages/transaction/ShardTransactionMessages.java +++ b/opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/protobuff/messages/transaction/ShardTransactionMessages.java @@ -4823,14 +4823,26 @@ public final class ShardTransactionMessages { // required .org.opendaylight.controller.mdsal.InstanceIdentifier instanceIdentifierPathArguments = 1; /** * required .org.opendaylight.controller.mdsal.InstanceIdentifier instanceIdentifierPathArguments = 1; + * + *
+     * base Helium version
+     * 
*/ boolean hasInstanceIdentifierPathArguments(); /** * required .org.opendaylight.controller.mdsal.InstanceIdentifier instanceIdentifierPathArguments = 1; + * + *
+     * base Helium version
+     * 
*/ org.opendaylight.controller.protobuff.messages.common.NormalizedNodeMessages.InstanceIdentifier getInstanceIdentifierPathArguments(); /** * required .org.opendaylight.controller.mdsal.InstanceIdentifier instanceIdentifierPathArguments = 1; + * + *
+     * base Helium version
+     * 
*/ org.opendaylight.controller.protobuff.messages.common.NormalizedNodeMessages.InstanceIdentifierOrBuilder getInstanceIdentifierPathArgumentsOrBuilder(); @@ -4970,18 +4982,30 @@ public final class ShardTransactionMessages { private org.opendaylight.controller.protobuff.messages.common.NormalizedNodeMessages.InstanceIdentifier instanceIdentifierPathArguments_; /** * required .org.opendaylight.controller.mdsal.InstanceIdentifier instanceIdentifierPathArguments = 1; + * + *
+     * base Helium version
+     * 
*/ public boolean hasInstanceIdentifierPathArguments() { return ((bitField0_ & 0x00000001) == 0x00000001); } /** * required .org.opendaylight.controller.mdsal.InstanceIdentifier instanceIdentifierPathArguments = 1; + * + *
+     * base Helium version
+     * 
*/ public org.opendaylight.controller.protobuff.messages.common.NormalizedNodeMessages.InstanceIdentifier getInstanceIdentifierPathArguments() { return instanceIdentifierPathArguments_; } /** * required .org.opendaylight.controller.mdsal.InstanceIdentifier instanceIdentifierPathArguments = 1; + * + *
+     * base Helium version
+     * 
*/ public org.opendaylight.controller.protobuff.messages.common.NormalizedNodeMessages.InstanceIdentifierOrBuilder getInstanceIdentifierPathArgumentsOrBuilder() { return instanceIdentifierPathArguments_; @@ -5309,12 +5333,20 @@ public final class ShardTransactionMessages { org.opendaylight.controller.protobuff.messages.common.NormalizedNodeMessages.InstanceIdentifier, org.opendaylight.controller.protobuff.messages.common.NormalizedNodeMessages.InstanceIdentifier.Builder, org.opendaylight.controller.protobuff.messages.common.NormalizedNodeMessages.InstanceIdentifierOrBuilder> instanceIdentifierPathArgumentsBuilder_; /** * required .org.opendaylight.controller.mdsal.InstanceIdentifier instanceIdentifierPathArguments = 1; + * + *
+       * base Helium version
+       * 
*/ public boolean hasInstanceIdentifierPathArguments() { return ((bitField0_ & 0x00000001) == 0x00000001); } /** * required .org.opendaylight.controller.mdsal.InstanceIdentifier instanceIdentifierPathArguments = 1; + * + *
+       * base Helium version
+       * 
*/ public org.opendaylight.controller.protobuff.messages.common.NormalizedNodeMessages.InstanceIdentifier getInstanceIdentifierPathArguments() { if (instanceIdentifierPathArgumentsBuilder_ == null) { @@ -5325,6 +5357,10 @@ public final class ShardTransactionMessages { } /** * required .org.opendaylight.controller.mdsal.InstanceIdentifier instanceIdentifierPathArguments = 1; + * + *
+       * base Helium version
+       * 
*/ public Builder setInstanceIdentifierPathArguments(org.opendaylight.controller.protobuff.messages.common.NormalizedNodeMessages.InstanceIdentifier value) { if (instanceIdentifierPathArgumentsBuilder_ == null) { @@ -5341,6 +5377,10 @@ public final class ShardTransactionMessages { } /** * required .org.opendaylight.controller.mdsal.InstanceIdentifier instanceIdentifierPathArguments = 1; + * + *
+       * base Helium version
+       * 
*/ public Builder setInstanceIdentifierPathArguments( org.opendaylight.controller.protobuff.messages.common.NormalizedNodeMessages.InstanceIdentifier.Builder builderForValue) { @@ -5355,6 +5395,10 @@ public final class ShardTransactionMessages { } /** * required .org.opendaylight.controller.mdsal.InstanceIdentifier instanceIdentifierPathArguments = 1; + * + *
+       * base Helium version
+       * 
*/ public Builder mergeInstanceIdentifierPathArguments(org.opendaylight.controller.protobuff.messages.common.NormalizedNodeMessages.InstanceIdentifier value) { if (instanceIdentifierPathArgumentsBuilder_ == null) { @@ -5374,6 +5418,10 @@ public final class ShardTransactionMessages { } /** * required .org.opendaylight.controller.mdsal.InstanceIdentifier instanceIdentifierPathArguments = 1; + * + *
+       * base Helium version
+       * 
*/ public Builder clearInstanceIdentifierPathArguments() { if (instanceIdentifierPathArgumentsBuilder_ == null) { @@ -5387,6 +5435,10 @@ public final class ShardTransactionMessages { } /** * required .org.opendaylight.controller.mdsal.InstanceIdentifier instanceIdentifierPathArguments = 1; + * + *
+       * base Helium version
+       * 
*/ public org.opendaylight.controller.protobuff.messages.common.NormalizedNodeMessages.InstanceIdentifier.Builder getInstanceIdentifierPathArgumentsBuilder() { bitField0_ |= 0x00000001; @@ -5395,6 +5447,10 @@ public final class ShardTransactionMessages { } /** * required .org.opendaylight.controller.mdsal.InstanceIdentifier instanceIdentifierPathArguments = 1; + * + *
+       * base Helium version
+       * 
*/ public org.opendaylight.controller.protobuff.messages.common.NormalizedNodeMessages.InstanceIdentifierOrBuilder getInstanceIdentifierPathArgumentsOrBuilder() { if (instanceIdentifierPathArgumentsBuilder_ != null) { @@ -5405,6 +5461,10 @@ public final class ShardTransactionMessages { } /** * required .org.opendaylight.controller.mdsal.InstanceIdentifier instanceIdentifierPathArguments = 1; + * + *
+       * base Helium version
+       * 
*/ private com.google.protobuf.SingleFieldBuilder< org.opendaylight.controller.protobuff.messages.common.NormalizedNodeMessages.InstanceIdentifier, org.opendaylight.controller.protobuff.messages.common.NormalizedNodeMessages.InstanceIdentifier.Builder, org.opendaylight.controller.protobuff.messages.common.NormalizedNodeMessages.InstanceIdentifierOrBuilder> @@ -5863,14 +5923,26 @@ public final class ShardTransactionMessages { // required .org.opendaylight.controller.mdsal.InstanceIdentifier instanceIdentifierPathArguments = 1; /** * required .org.opendaylight.controller.mdsal.InstanceIdentifier instanceIdentifierPathArguments = 1; + * + *
+     * base Helium version
+     * 
*/ boolean hasInstanceIdentifierPathArguments(); /** * required .org.opendaylight.controller.mdsal.InstanceIdentifier instanceIdentifierPathArguments = 1; + * + *
+     * base Helium version
+     * 
*/ org.opendaylight.controller.protobuff.messages.common.NormalizedNodeMessages.InstanceIdentifier getInstanceIdentifierPathArguments(); /** * required .org.opendaylight.controller.mdsal.InstanceIdentifier instanceIdentifierPathArguments = 1; + * + *
+     * base Helium version
+     * 
*/ org.opendaylight.controller.protobuff.messages.common.NormalizedNodeMessages.InstanceIdentifierOrBuilder getInstanceIdentifierPathArgumentsOrBuilder(); @@ -6010,18 +6082,30 @@ public final class ShardTransactionMessages { private org.opendaylight.controller.protobuff.messages.common.NormalizedNodeMessages.InstanceIdentifier instanceIdentifierPathArguments_; /** * required .org.opendaylight.controller.mdsal.InstanceIdentifier instanceIdentifierPathArguments = 1; + * + *
+     * base Helium version
+     * 
*/ public boolean hasInstanceIdentifierPathArguments() { return ((bitField0_ & 0x00000001) == 0x00000001); } /** * required .org.opendaylight.controller.mdsal.InstanceIdentifier instanceIdentifierPathArguments = 1; + * + *
+     * base Helium version
+     * 
*/ public org.opendaylight.controller.protobuff.messages.common.NormalizedNodeMessages.InstanceIdentifier getInstanceIdentifierPathArguments() { return instanceIdentifierPathArguments_; } /** * required .org.opendaylight.controller.mdsal.InstanceIdentifier instanceIdentifierPathArguments = 1; + * + *
+     * base Helium version
+     * 
*/ public org.opendaylight.controller.protobuff.messages.common.NormalizedNodeMessages.InstanceIdentifierOrBuilder getInstanceIdentifierPathArgumentsOrBuilder() { return instanceIdentifierPathArguments_; @@ -6349,12 +6433,20 @@ public final class ShardTransactionMessages { org.opendaylight.controller.protobuff.messages.common.NormalizedNodeMessages.InstanceIdentifier, org.opendaylight.controller.protobuff.messages.common.NormalizedNodeMessages.InstanceIdentifier.Builder, org.opendaylight.controller.protobuff.messages.common.NormalizedNodeMessages.InstanceIdentifierOrBuilder> instanceIdentifierPathArgumentsBuilder_; /** * required .org.opendaylight.controller.mdsal.InstanceIdentifier instanceIdentifierPathArguments = 1; + * + *
+       * base Helium version
+       * 
*/ public boolean hasInstanceIdentifierPathArguments() { return ((bitField0_ & 0x00000001) == 0x00000001); } /** * required .org.opendaylight.controller.mdsal.InstanceIdentifier instanceIdentifierPathArguments = 1; + * + *
+       * base Helium version
+       * 
*/ public org.opendaylight.controller.protobuff.messages.common.NormalizedNodeMessages.InstanceIdentifier getInstanceIdentifierPathArguments() { if (instanceIdentifierPathArgumentsBuilder_ == null) { @@ -6365,6 +6457,10 @@ public final class ShardTransactionMessages { } /** * required .org.opendaylight.controller.mdsal.InstanceIdentifier instanceIdentifierPathArguments = 1; + * + *
+       * base Helium version
+       * 
*/ public Builder setInstanceIdentifierPathArguments(org.opendaylight.controller.protobuff.messages.common.NormalizedNodeMessages.InstanceIdentifier value) { if (instanceIdentifierPathArgumentsBuilder_ == null) { @@ -6381,6 +6477,10 @@ public final class ShardTransactionMessages { } /** * required .org.opendaylight.controller.mdsal.InstanceIdentifier instanceIdentifierPathArguments = 1; + * + *
+       * base Helium version
+       * 
*/ public Builder setInstanceIdentifierPathArguments( org.opendaylight.controller.protobuff.messages.common.NormalizedNodeMessages.InstanceIdentifier.Builder builderForValue) { @@ -6395,6 +6495,10 @@ public final class ShardTransactionMessages { } /** * required .org.opendaylight.controller.mdsal.InstanceIdentifier instanceIdentifierPathArguments = 1; + * + *
+       * base Helium version
+       * 
*/ public Builder mergeInstanceIdentifierPathArguments(org.opendaylight.controller.protobuff.messages.common.NormalizedNodeMessages.InstanceIdentifier value) { if (instanceIdentifierPathArgumentsBuilder_ == null) { @@ -6414,6 +6518,10 @@ public final class ShardTransactionMessages { } /** * required .org.opendaylight.controller.mdsal.InstanceIdentifier instanceIdentifierPathArguments = 1; + * + *
+       * base Helium version
+       * 
*/ public Builder clearInstanceIdentifierPathArguments() { if (instanceIdentifierPathArgumentsBuilder_ == null) { @@ -6427,6 +6535,10 @@ public final class ShardTransactionMessages { } /** * required .org.opendaylight.controller.mdsal.InstanceIdentifier instanceIdentifierPathArguments = 1; + * + *
+       * base Helium version
+       * 
*/ public org.opendaylight.controller.protobuff.messages.common.NormalizedNodeMessages.InstanceIdentifier.Builder getInstanceIdentifierPathArgumentsBuilder() { bitField0_ |= 0x00000001; @@ -6435,6 +6547,10 @@ public final class ShardTransactionMessages { } /** * required .org.opendaylight.controller.mdsal.InstanceIdentifier instanceIdentifierPathArguments = 1; + * + *
+       * base Helium version
+       * 
*/ public org.opendaylight.controller.protobuff.messages.common.NormalizedNodeMessages.InstanceIdentifierOrBuilder getInstanceIdentifierPathArgumentsOrBuilder() { if (instanceIdentifierPathArgumentsBuilder_ != null) { @@ -6445,6 +6561,10 @@ public final class ShardTransactionMessages { } /** * required .org.opendaylight.controller.mdsal.InstanceIdentifier instanceIdentifierPathArguments = 1; + * + *
+       * base Helium version
+       * 
*/ private com.google.protobuf.SingleFieldBuilder< org.opendaylight.controller.protobuff.messages.common.NormalizedNodeMessages.InstanceIdentifier, org.opendaylight.controller.protobuff.messages.common.NormalizedNodeMessages.InstanceIdentifier.Builder, org.opendaylight.controller.protobuff.messages.common.NormalizedNodeMessages.InstanceIdentifierOrBuilder> diff --git a/opendaylight/md-sal/sal-clustering-commons/src/main/resources/ShardTransaction.proto b/opendaylight/md-sal/sal-clustering-commons/src/main/resources/ShardTransaction.proto index c5e4ee45c0..ac9cb22033 100644 --- a/opendaylight/md-sal/sal-clustering-commons/src/main/resources/ShardTransaction.proto +++ b/opendaylight/md-sal/sal-clustering-commons/src/main/resources/ShardTransaction.proto @@ -49,9 +49,9 @@ message ReadDataReply{ } message WriteData { - required InstanceIdentifier instanceIdentifierPathArguments = 1; -required Node normalizedNode =2; - + // base Helium version + required InstanceIdentifier instanceIdentifierPathArguments = 1; + required Node normalizedNode = 2; } message WriteDataReply{ @@ -59,9 +59,9 @@ message WriteDataReply{ } message MergeData { - required InstanceIdentifier instanceIdentifierPathArguments = 1; -required Node normalizedNode =2; - + // base Helium version + required InstanceIdentifier instanceIdentifierPathArguments = 1; + required Node normalizedNode = 2; } message MergeDataReply{ diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DataChangeListener.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DataChangeListener.java index 6f14af304f..1bc835f1e3 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DataChangeListener.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DataChangeListener.java @@ -71,7 +71,7 @@ public class DataChangeListener extends AbstractUntypedActor { // It seems the sender is never null but it doesn't hurt to check. If the caller passes in // a null sender (ActorRef.noSender()), akka translates that to the deadLetters actor. if(getSender() != null && !getContext().system().deadLetters().equals(getSender())) { - getSender().tell(new DataChangedReply(), getSelf()); + getSender().tell(DataChangedReply.INSTANCE, getSelf()); } } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DataChangeListenerProxy.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DataChangeListenerProxy.java index 1a0ee8c2fa..afec1a07d4 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DataChangeListenerProxy.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DataChangeListenerProxy.java @@ -16,22 +16,21 @@ import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeEvent; import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeListener; import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode; -import org.opendaylight.yangtools.yang.model.api.SchemaContext; /** * DataChangeListenerProxy represents a single remote DataChangeListener */ public class DataChangeListenerProxy implements AsyncDataChangeListener>{ private final ActorSelection dataChangeListenerActor; - private final SchemaContext schemaContext; - public DataChangeListenerProxy(SchemaContext schemaContext, ActorSelection dataChangeListenerActor) { - this.dataChangeListenerActor = Preconditions.checkNotNull(dataChangeListenerActor, "dataChangeListenerActor should not be null"); - this.schemaContext = schemaContext; + public DataChangeListenerProxy(ActorSelection dataChangeListenerActor) { + this.dataChangeListenerActor = Preconditions.checkNotNull(dataChangeListenerActor, + "dataChangeListenerActor should not be null"); } - @Override public void onDataChanged( + @Override + public void onDataChanged( AsyncDataChangeEvent> change) { - dataChangeListenerActor.tell(new DataChanged(schemaContext, change), ActorRef.noSender()); + dataChangeListenerActor.tell(new DataChanged(change), ActorRef.noSender()); } } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DataStoreVersions.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DataStoreVersions.java new file mode 100644 index 0000000000..1f22222b5b --- /dev/null +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DataStoreVersions.java @@ -0,0 +1,21 @@ +/* + * Copyright (c) 2014 Brocade Communications Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.controller.cluster.datastore; + +/** + * Defines version numbers. + * + * @author Thomas Pantelis + */ +public interface DataStoreVersions { + short BASE_HELIUM_VERSION = 0; + short HELIUM_1_VERSION = 1; + short HELIUM_2_VERSION = 2; + short LITHIUM_VERSION = 3; + short CURRENT_VERSION = LITHIUM_VERSION; +} diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java index 7ef6e040a9..1661bb4b5d 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java @@ -95,8 +95,6 @@ import scala.concurrent.duration.FiniteDuration; */ public class Shard extends RaftActor { - private static final Object COMMIT_TRANSACTION_REPLY = new CommitTransactionReply().toSerializable(); - private static final Object TX_COMMIT_TIMEOUT_CHECK_MESSAGE = "txCommitTimeoutCheck"; public static final String DEFAULT_NAME = "default"; @@ -354,7 +352,7 @@ public class Shard extends RaftActor { cohortEntry = commitCoordinator.getAndRemoveCohortEntry(transactionID); if(cohortEntry != null) { commitWithNewTransaction(cohortEntry.getModification()); - sender.tell(COMMIT_TRANSACTION_REPLY, getSelf()); + sender.tell(CommitTransactionReply.INSTANCE.toSerializable(), getSelf()); } else { // This really shouldn't happen - it likely means that persistence or replication // took so long to complete such that the cohort entry was expired from the cache. @@ -376,7 +374,7 @@ public class Shard extends RaftActor { // currently uses a same thread executor anyway. cohortEntry.getCohort().commit().get(); - sender.tell(COMMIT_TRANSACTION_REPLY, getSelf()); + sender.tell(CommitTransactionReply.INSTANCE.toSerializable(), getSelf()); shardMBean.incrementCommittedTransactionCount(); shardMBean.setLastCommittedTransactionTime(System.currentTimeMillis()); @@ -412,7 +410,7 @@ public class Shard extends RaftActor { // transactionId so to maintain backwards compatibility, we create a separate cohort actor // to provide the compatible behavior. ActorRef replyActorPath = self(); - if(ready.getTxnClientVersion() < CreateTransaction.HELIUM_1_VERSION) { + if(ready.getTxnClientVersion() < DataStoreVersions.HELIUM_1_VERSION) { LOG.debug("Creating BackwardsCompatibleThreePhaseCommitCohort"); replyActorPath = getContext().actorOf(BackwardsCompatibleThreePhaseCommitCohort.props( ready.getTransactionID())); @@ -447,7 +445,7 @@ public class Shard extends RaftActor { shardMBean.incrementAbortTransactionsCount(); if(sender != null) { - sender.tell(new AbortTransactionReply().toSerializable(), self); + sender.tell(AbortTransactionReply.INSTANCE.toSerializable(), self); } } @@ -480,7 +478,7 @@ public class Shard extends RaftActor { // This must be for install snapshot. Don't want to open this up and trigger // deSerialization - self().tell(new CaptureSnapshotReply(ReadDataReply.getNormalizedNodeByteString(message)), + self().tell(new CaptureSnapshotReply(ReadDataReply.fromSerializableAsByteString(message)), self()); createSnapshotTransaction = null; @@ -500,7 +498,8 @@ public class Shard extends RaftActor { } private ActorRef createTypedTransactionActor(int transactionType, - ShardTransactionIdentifier transactionId, String transactionChainId, int clientVersion ) { + ShardTransactionIdentifier transactionId, String transactionChainId, + short clientVersion ) { DOMStoreTransactionFactory factory = store; @@ -568,7 +567,7 @@ public class Shard extends RaftActor { } private ActorRef createTransaction(int transactionType, String remoteTransactionId, - String transactionChainId, int clientVersion) { + String transactionChainId, short clientVersion) { ShardTransactionIdentifier transactionId = ShardTransactionIdentifier.builder() @@ -659,7 +658,7 @@ public class Shard extends RaftActor { dataChangeListeners.add(dataChangeListenerPath); AsyncDataChangeListener> listener = - new DataChangeListenerProxy(schemaContext, dataChangeListenerPath); + new DataChangeListenerProxy(dataChangeListenerPath); LOG.debug("Registering for path {}", registerChangeListener.getPath()); @@ -818,7 +817,7 @@ public class Shard extends RaftActor { createSnapshotTransaction = createTransaction( TransactionProxy.TransactionType.READ_ONLY.ordinal(), "createSnapshot" + ++createSnapshotTransactionCounter, "", - CreateTransaction.CURRENT_VERSION); + DataStoreVersions.CURRENT_VERSION); createSnapshotTransaction.tell( new ReadData(YangInstanceIdentifier.builder().build()).toSerializable(), self()); diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardCommitCoordinator.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardCommitCoordinator.java index f3b4e41640..19fa26682e 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardCommitCoordinator.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardCommitCoordinator.java @@ -7,6 +7,10 @@ */ package org.opendaylight.controller.cluster.datastore; +import akka.actor.ActorRef; +import akka.actor.Status; +import com.google.common.cache.Cache; +import com.google.common.cache.CacheBuilder; import java.util.LinkedList; import java.util.Queue; import java.util.concurrent.ExecutionException; @@ -17,10 +21,6 @@ import org.opendaylight.controller.cluster.datastore.modification.Modification; import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import akka.actor.ActorRef; -import akka.actor.Status; -import com.google.common.cache.Cache; -import com.google.common.cache.CacheBuilder; /** * Coordinates commits for a shard ensuring only one concurrent 3-phase commit. @@ -31,12 +31,6 @@ public class ShardCommitCoordinator { private static final Logger LOG = LoggerFactory.getLogger(ShardCommitCoordinator.class); - private static final Object CAN_COMMIT_REPLY_TRUE = - new CanCommitTransactionReply(Boolean.TRUE).toSerializable(); - - private static final Object CAN_COMMIT_REPLY_FALSE = - new CanCommitTransactionReply(Boolean.FALSE).toSerializable(); - private final Cache cohortCache; private CohortEntry currentCohortEntry; @@ -138,7 +132,8 @@ public class ShardCommitCoordinator { Boolean canCommit = cohortEntry.getCohort().canCommit().get(); cohortEntry.getCanCommitSender().tell( - canCommit ? CAN_COMMIT_REPLY_TRUE : CAN_COMMIT_REPLY_FALSE, cohortEntry.getShard()); + canCommit ? CanCommitTransactionReply.YES.toSerializable() : + CanCommitTransactionReply.NO.toSerializable(), cohortEntry.getShard()); if(!canCommit) { // Remove the entry from the cache now since the Tx will be aborted. diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardReadTransaction.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardReadTransaction.java index 9d8f57252a..be9c4d80e3 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardReadTransaction.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardReadTransaction.java @@ -27,8 +27,8 @@ public class ShardReadTransaction extends ShardTransaction { public ShardReadTransaction(DOMStoreReadTransaction transaction, ActorRef shardActor, SchemaContext schemaContext, ShardStats shardStats, String transactionID, - int txnClientVersion) { - super(shardActor, schemaContext, shardStats, transactionID, txnClientVersion); + short clientTxVersion) { + super(shardActor, schemaContext, shardStats, transactionID, clientTxVersion); this.transaction = transaction; } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardReadWriteTransaction.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardReadWriteTransaction.java index e558677ebb..b394da88e8 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardReadWriteTransaction.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardReadWriteTransaction.java @@ -11,7 +11,6 @@ package org.opendaylight.controller.cluster.datastore; import akka.actor.ActorRef; - import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shard.ShardStats; import org.opendaylight.controller.cluster.datastore.messages.DataExists; import org.opendaylight.controller.cluster.datastore.messages.ReadData; @@ -27,8 +26,8 @@ public class ShardReadWriteTransaction extends ShardWriteTransaction { public ShardReadWriteTransaction(DOMStoreReadWriteTransaction transaction, ActorRef shardActor, SchemaContext schemaContext, ShardStats shardStats, String transactionID, - int txnClientVersion) { - super(transaction, shardActor, schemaContext, shardStats, transactionID, txnClientVersion); + short clientTxVersion) { + super(transaction, shardActor, schemaContext, shardStats, transactionID, clientTxVersion); this.transaction = transaction; } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardTransaction.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardTransaction.java index 59bb4bfd77..678b781569 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardTransaction.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardTransaction.java @@ -63,21 +63,21 @@ public abstract class ShardTransaction extends AbstractUntypedActorWithMetering private final SchemaContext schemaContext; private final ShardStats shardStats; private final String transactionID; - private final int txnClientVersion; + private final short clientTxVersion; protected ShardTransaction(ActorRef shardActor, SchemaContext schemaContext, - ShardStats shardStats, String transactionID, int txnClientVersion) { + ShardStats shardStats, String transactionID, short clientTxVersion) { super("shard-tx"); //actor name override used for metering. This does not change the "real" actor name this.shardActor = shardActor; this.schemaContext = schemaContext; this.shardStats = shardStats; this.transactionID = transactionID; - this.txnClientVersion = txnClientVersion; + this.clientTxVersion = clientTxVersion; } public static Props props(DOMStoreTransaction transaction, ActorRef shardActor, SchemaContext schemaContext,DatastoreContext datastoreContext, ShardStats shardStats, - String transactionID, int txnClientVersion) { + String transactionID, short txnClientVersion) { return Props.create(new ShardTransactionCreator(transaction, shardActor, schemaContext, datastoreContext, shardStats, transactionID, txnClientVersion)); } @@ -96,8 +96,8 @@ public abstract class ShardTransaction extends AbstractUntypedActorWithMetering return schemaContext; } - protected int getTxnClientVersion() { - return txnClientVersion; + protected short getClientTxVersion() { + return clientTxVersion; } @Override @@ -118,28 +118,28 @@ public abstract class ShardTransaction extends AbstractUntypedActorWithMetering getDOMStoreTransaction().close(); if(sendReply) { - getSender().tell(new CloseTransactionReply().toSerializable(), getSelf()); + getSender().tell(CloseTransactionReply.INSTANCE.toSerializable(), getSelf()); } getSelf().tell(PoisonPill.getInstance(), getSelf()); } - protected void readData(DOMStoreReadTransaction transaction, ReadData message, final boolean returnSerialized) { + protected void readData(DOMStoreReadTransaction transaction, ReadData message, + final boolean returnSerialized) { final ActorRef sender = getSender(); final ActorRef self = getSelf(); final YangInstanceIdentifier path = message.getPath(); final CheckedFuture>, ReadFailedException> future = transaction.read(path); - future.addListener(new Runnable() { @Override public void run() { try { Optional> optional = future.checkedGet(); - ReadDataReply readDataReply = new ReadDataReply(schemaContext, optional.orNull()); + ReadDataReply readDataReply = new ReadDataReply(optional.orNull()); - sender.tell((returnSerialized ? readDataReply.toSerializable(): + sender.tell((returnSerialized ? readDataReply.toSerializable(clientTxVersion): readDataReply), self); } catch (Exception e) { @@ -176,11 +176,11 @@ public abstract class ShardTransaction extends AbstractUntypedActorWithMetering final DatastoreContext datastoreContext; final ShardStats shardStats; final String transactionID; - final int txnClientVersion; + final short txnClientVersion; ShardTransactionCreator(DOMStoreTransaction transaction, ActorRef shardActor, SchemaContext schemaContext, DatastoreContext datastoreContext, - ShardStats shardStats, String transactionID, int txnClientVersion) { + ShardStats shardStats, String transactionID, short txnClientVersion) { this.transaction = transaction; this.shardActor = shardActor; this.shardStats = shardStats; diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardTransactionChain.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardTransactionChain.java index 78c6a558f4..8ba613958a 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardTransactionChain.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardTransactionChain.java @@ -12,7 +12,6 @@ import akka.actor.ActorRef; import akka.actor.Props; import akka.japi.Creator; import org.opendaylight.controller.cluster.common.actor.AbstractUntypedActor; - import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shard.ShardStats; import org.opendaylight.controller.cluster.datastore.messages.CloseTransactionChain; import org.opendaylight.controller.cluster.datastore.messages.CloseTransactionChainReply; @@ -46,7 +45,7 @@ public class ShardTransactionChain extends AbstractUntypedActor { createTransaction(createTransaction); } else if (message.getClass().equals(CloseTransactionChain.SERIALIZABLE_CLASS)) { chain.close(); - getSender().tell(new CloseTransactionChainReply().toSerializable(), getSelf()); + getSender().tell(CloseTransactionChainReply.INSTANCE.toSerializable(), getSelf()); }else{ unknownMessage(message); } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardWriteTransaction.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardWriteTransaction.java index 44f2c7bd0a..2e43219523 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardWriteTransaction.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardWriteTransaction.java @@ -43,8 +43,8 @@ public class ShardWriteTransaction extends ShardTransaction { public ShardWriteTransaction(DOMStoreWriteTransaction transaction, ActorRef shardActor, SchemaContext schemaContext, ShardStats shardStats, String transactionID, - int txnClientVersion) { - super(shardActor, schemaContext, shardStats, transactionID, txnClientVersion); + short clientTxVersion) { + super(shardActor, schemaContext, shardStats, transactionID, clientTxVersion); this.transaction = transaction; } @@ -66,19 +66,19 @@ public class ShardWriteTransaction extends ShardTransaction { deleteData(transaction, (DeleteData) message, !SERIALIZED_REPLY); } else if (message instanceof ReadyTransaction) { - readyTransaction(transaction, new ReadyTransaction(), !SERIALIZED_REPLY); + readyTransaction(transaction, !SERIALIZED_REPLY); - } else if(WriteData.SERIALIZABLE_CLASS.equals(message.getClass())) { - writeData(transaction, WriteData.fromSerializable(message, getSchemaContext()), SERIALIZED_REPLY); + } else if(WriteData.isSerializedType(message)) { + writeData(transaction, WriteData.fromSerializable(message), SERIALIZED_REPLY); - } else if(MergeData.SERIALIZABLE_CLASS.equals(message.getClass())) { - mergeData(transaction, MergeData.fromSerializable(message, getSchemaContext()), SERIALIZED_REPLY); + } else if(MergeData.isSerializedType(message)) { + mergeData(transaction, MergeData.fromSerializable(message), SERIALIZED_REPLY); } else if(DeleteData.SERIALIZABLE_CLASS.equals(message.getClass())) { deleteData(transaction, DeleteData.fromSerializable(message), SERIALIZED_REPLY); } else if(ReadyTransaction.SERIALIZABLE_CLASS.equals(message.getClass())) { - readyTransaction(transaction, new ReadyTransaction(), SERIALIZED_REPLY); + readyTransaction(transaction, SERIALIZED_REPLY); } else if (message instanceof GetCompositedModification) { // This is here for testing only @@ -97,9 +97,9 @@ public class ShardWriteTransaction extends ShardTransaction { new WriteModification(message.getPath(), message.getData(), getSchemaContext())); try { transaction.write(message.getPath(), message.getData()); - WriteDataReply writeDataReply = new WriteDataReply(); - getSender().tell(returnSerialized ? writeDataReply.toSerializable() : writeDataReply, - getSelf()); + WriteDataReply writeDataReply = WriteDataReply.INSTANCE; + getSender().tell(returnSerialized ? writeDataReply.toSerializable(message.getVersion()) : + writeDataReply, getSelf()); }catch(Exception e){ getSender().tell(new akka.actor.Status.Failure(e), getSelf()); } @@ -114,9 +114,9 @@ public class ShardWriteTransaction extends ShardTransaction { try { transaction.merge(message.getPath(), message.getData()); - MergeDataReply mergeDataReply = new MergeDataReply(); - getSender().tell(returnSerialized ? mergeDataReply.toSerializable() : mergeDataReply , - getSelf()); + MergeDataReply mergeDataReply = MergeDataReply.INSTANCE; + getSender().tell(returnSerialized ? mergeDataReply.toSerializable(message.getVersion()) : + mergeDataReply, getSelf()); }catch(Exception e){ getSender().tell(new akka.actor.Status.Failure(e), getSelf()); } @@ -137,15 +137,14 @@ public class ShardWriteTransaction extends ShardTransaction { } } - private void readyTransaction(DOMStoreWriteTransaction transaction, ReadyTransaction message, - boolean returnSerialized) { + private void readyTransaction(DOMStoreWriteTransaction transaction, boolean returnSerialized) { String transactionID = getTransactionID(); LOG.debug("readyTransaction : {}", transactionID); DOMStoreThreePhaseCommitCohort cohort = transaction.ready(); - getShardActor().forward(new ForwardedReadyTransaction(transactionID, getTxnClientVersion(), + getShardActor().forward(new ForwardedReadyTransaction(transactionID, getClientTxVersion(), cohort, modification, returnSerialized), getContext()); // The shard will handle the commit from here so we're no longer needed - self-destruct. diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionProxy.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionProxy.java index 7703f484c7..f34e88fb27 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionProxy.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionProxy.java @@ -43,6 +43,7 @@ import org.opendaylight.controller.cluster.datastore.messages.ReadDataReply; import org.opendaylight.controller.cluster.datastore.messages.ReadyTransaction; import org.opendaylight.controller.cluster.datastore.messages.ReadyTransactionReply; import org.opendaylight.controller.cluster.datastore.messages.SerializableMessage; +import org.opendaylight.controller.cluster.datastore.messages.VersionedSerializableMessage; import org.opendaylight.controller.cluster.datastore.messages.WriteData; import org.opendaylight.controller.cluster.datastore.shardstrategy.ShardStrategyFactory; import org.opendaylight.controller.cluster.datastore.utils.ActorContext; @@ -157,8 +158,7 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction { if(remoteTransactionActorsMB.get()) { for(ActorSelection actor : remoteTransactionActors) { LOG.trace("Sending CloseTransaction to {}", actor); - actorContext.sendOperationAsync(actor, - new CloseTransaction().toSerializable()); + actorContext.sendOperationAsync(actor, CloseTransaction.INSTANCE.toSerializable()); } } } @@ -617,10 +617,6 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction { } } - - - - /** * Performs a CreateTransaction try async. */ @@ -763,11 +759,11 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction { private final String transactionPath; private final ActorSelection actor; private final boolean isTxActorLocal; - private final int remoteTransactionVersion; + private final short remoteTransactionVersion; private TransactionContextImpl(String transactionPath, ActorSelection actor, TransactionIdentifier identifier, ActorContext actorContext, SchemaContext schemaContext, - boolean isTxActorLocal, int remoteTransactionVersion) { + boolean isTxActorLocal, short remoteTransactionVersion) { super(identifier); this.transactionPath = transactionPath; this.actor = actor; @@ -785,11 +781,16 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction { return actorContext.executeOperationAsync(getActor(), isTxActorLocal ? msg : msg.toSerializable()); } + private Future executeOperationAsync(VersionedSerializableMessage msg) { + return actorContext.executeOperationAsync(getActor(), isTxActorLocal ? msg : + msg.toSerializable(remoteTransactionVersion)); + } + @Override public void closeTransaction() { LOG.debug("Tx {} closeTransaction called", identifier); - actorContext.sendOperationAsync(getActor(), new CloseTransaction().toSerializable()); + actorContext.sendOperationAsync(getActor(), CloseTransaction.INSTANCE.toSerializable()); } @Override @@ -799,7 +800,7 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction { // Send the ReadyTransaction message to the Tx actor. - final Future replyFuture = executeOperationAsync(new ReadyTransaction()); + final Future replyFuture = executeOperationAsync(ReadyTransaction.INSTANCE); // Combine all the previously recorded put/merge/delete operation reply Futures and the // ReadyTransactionReply Future into one Future. If any one fails then the combined @@ -846,7 +847,8 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction { // At some point in the future when upgrades from Helium are not supported // we could remove this code to resolvePath and just use the cohortPath as the // resolved cohortPath - if(TransactionContextImpl.this.remoteTransactionVersion < CreateTransaction.HELIUM_1_VERSION) { + if(TransactionContextImpl.this.remoteTransactionVersion < + DataStoreVersions.HELIUM_1_VERSION) { cohortPath = actorContext.resolvePath(transactionPath, cohortPath); } @@ -872,14 +874,14 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction { public void mergeData(YangInstanceIdentifier path, NormalizedNode data) { LOG.debug("Tx {} mergeData called path = {}", identifier, path); - recordedOperationFutures.add(executeOperationAsync(new MergeData(path, data, schemaContext))); + recordedOperationFutures.add(executeOperationAsync(new MergeData(path, data))); } @Override public void writeData(YangInstanceIdentifier path, NormalizedNode data) { LOG.debug("Tx {} writeData called path = {}", identifier, path); - recordedOperationFutures.add(executeOperationAsync(new WriteData(path, data, schemaContext))); + recordedOperationFutures.add(executeOperationAsync(new WriteData(path, data))); } @Override @@ -950,8 +952,8 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction { ReadDataReply reply = (ReadDataReply) readResponse; returnFuture.set(Optional.>fromNullable(reply.getNormalizedNode())); - } else if (readResponse.getClass().equals(ReadDataReply.SERIALIZABLE_CLASS)) { - ReadDataReply reply = ReadDataReply.fromSerializable(schemaContext, path, readResponse); + } else if (ReadDataReply.isSerializedType(readResponse)) { + ReadDataReply reply = ReadDataReply.fromSerializable(readResponse); returnFuture.set(Optional.>fromNullable(reply.getNormalizedNode())); } else { diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/AbortTransactionReply.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/AbortTransactionReply.java index 79c6b036fa..3680aec4f3 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/AbortTransactionReply.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/AbortTransactionReply.java @@ -14,8 +14,13 @@ public class AbortTransactionReply implements SerializableMessage { public static final Class SERIALIZABLE_CLASS = ThreePhaseCommitCohortMessages.AbortTransactionReply.class; + private static final Object SERIALIZED_INSTANCE = + ThreePhaseCommitCohortMessages.AbortTransactionReply.newBuilder().build(); + + public static final AbortTransactionReply INSTANCE = new AbortTransactionReply(); + @Override public Object toSerializable() { - return ThreePhaseCommitCohortMessages.AbortTransactionReply.newBuilder().build(); + return SERIALIZED_INSTANCE; } } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/CanCommitTransactionReply.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/CanCommitTransactionReply.java index 4d121bae0a..7db4846ef4 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/CanCommitTransactionReply.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/CanCommitTransactionReply.java @@ -14,23 +14,30 @@ public class CanCommitTransactionReply implements SerializableMessage { public static final Class SERIALIZABLE_CLASS = ThreePhaseCommitCohortMessages.CanCommitTransactionReply.class; - private final Boolean canCommit; + public static final CanCommitTransactionReply YES = new CanCommitTransactionReply(true); + public static final CanCommitTransactionReply NO = new CanCommitTransactionReply(false); - public CanCommitTransactionReply(final Boolean canCommit) { + private final boolean canCommit; + private final Object serializedMessage; + + private CanCommitTransactionReply(final boolean canCommit) { this.canCommit = canCommit; + this.serializedMessage = ThreePhaseCommitCohortMessages.CanCommitTransactionReply.newBuilder(). + setCanCommit(canCommit).build(); } - public Boolean getCanCommit() { + public boolean getCanCommit() { return canCommit; } @Override public Object toSerializable() { - return ThreePhaseCommitCohortMessages.CanCommitTransactionReply.newBuilder().setCanCommit(canCommit).build(); + return serializedMessage; } public static CanCommitTransactionReply fromSerializable(final Object message) { - return new CanCommitTransactionReply( - ((ThreePhaseCommitCohortMessages.CanCommitTransactionReply) message).getCanCommit()); + ThreePhaseCommitCohortMessages.CanCommitTransactionReply serialized = + (ThreePhaseCommitCohortMessages.CanCommitTransactionReply) message; + return serialized.getCanCommit() ? YES : NO; } } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/CloseTransaction.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/CloseTransaction.java index c73111f2db..ef1aac8d4b 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/CloseTransaction.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/CloseTransaction.java @@ -11,10 +11,16 @@ package org.opendaylight.controller.cluster.datastore.messages; import org.opendaylight.controller.protobuff.messages.transaction.ShardTransactionMessages; public class CloseTransaction implements SerializableMessage{ - public static final Class SERIALIZABLE_CLASS = - ShardTransactionMessages.CloseTransaction.class; - @Override - public Object toSerializable() { - return ShardTransactionMessages.CloseTransaction.newBuilder().build(); - } + public static final Class SERIALIZABLE_CLASS = + ShardTransactionMessages.CloseTransaction.class; + + private static final Object SERIALIZED_INSTANCE = + ShardTransactionMessages.CloseTransaction.newBuilder().build(); + + public static final CloseTransaction INSTANCE = new CloseTransaction(); + + @Override + public Object toSerializable() { + return SERIALIZED_INSTANCE; + } } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/CloseTransactionChainReply.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/CloseTransactionChainReply.java index c001ae185a..b4673e8a08 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/CloseTransactionChainReply.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/CloseTransactionChainReply.java @@ -11,11 +11,16 @@ package org.opendaylight.controller.cluster.datastore.messages; import org.opendaylight.controller.protobuff.messages.transaction.ShardTransactionChainMessages; public class CloseTransactionChainReply implements SerializableMessage { - public static final Class SERIALIZABLE_CLASS = - ShardTransactionChainMessages.CloseTransactionChainReply.class; - @Override - public Object toSerializable() { - return ShardTransactionChainMessages.CloseTransactionChainReply.newBuilder().build(); - } + public static final Class SERIALIZABLE_CLASS = + ShardTransactionChainMessages.CloseTransactionChainReply.class; + private static final Object SERIALIZED_INSTANCE = + ShardTransactionChainMessages.CloseTransactionChainReply.newBuilder().build(); + + public static final CloseTransactionChainReply INSTANCE = new CloseTransactionChainReply(); + + @Override + public Object toSerializable() { + return SERIALIZED_INSTANCE; + } } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/CloseTransactionReply.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/CloseTransactionReply.java index 124eeb2235..1c47a1827f 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/CloseTransactionReply.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/CloseTransactionReply.java @@ -11,10 +11,16 @@ package org.opendaylight.controller.cluster.datastore.messages; import org.opendaylight.controller.protobuff.messages.transaction.ShardTransactionMessages; public class CloseTransactionReply implements SerializableMessage { - public static final Class SERIALIZABLE_CLASS = - ShardTransactionMessages.CloseTransactionReply.class; - @Override - public Object toSerializable() { - return ShardTransactionMessages.CloseTransactionReply.newBuilder().build(); - } + public static final Class SERIALIZABLE_CLASS = + ShardTransactionMessages.CloseTransactionReply.class; + + private static final Object SERIALIZED_INSTANCE = + ShardTransactionMessages.CloseTransactionReply.newBuilder().build(); + + public static final CloseTransactionReply INSTANCE = new CloseTransactionReply(); + + @Override + public Object toSerializable() { + return SERIALIZED_INSTANCE; + } } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/CommitTransactionReply.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/CommitTransactionReply.java index 3d4a168450..47adea5ea0 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/CommitTransactionReply.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/CommitTransactionReply.java @@ -14,8 +14,13 @@ public class CommitTransactionReply implements SerializableMessage { public static final Class SERIALIZABLE_CLASS = ThreePhaseCommitCohortMessages.CommitTransactionReply.class; + private static final Object SERIALIZED_INSTANCE = + ThreePhaseCommitCohortMessages.CommitTransactionReply.newBuilder().build(); + + public static final CommitTransactionReply INSTANCE = new CommitTransactionReply(); + @Override public Object toSerializable() { - return ThreePhaseCommitCohortMessages.CommitTransactionReply.newBuilder().build(); + return SERIALIZED_INSTANCE; } } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/CreateTransaction.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/CreateTransaction.java index bf82e66036..ea3caef093 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/CreateTransaction.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/CreateTransaction.java @@ -9,6 +9,7 @@ package org.opendaylight.controller.cluster.datastore.messages; +import org.opendaylight.controller.cluster.datastore.DataStoreVersions; import org.opendaylight.controller.protobuff.messages.transaction.ShardTransactionMessages; @@ -16,24 +17,21 @@ public class CreateTransaction implements SerializableMessage { public static final Class SERIALIZABLE_CLASS = ShardTransactionMessages.CreateTransaction.class; - public static final int HELIUM_1_VERSION = 1; - public static final int CURRENT_VERSION = HELIUM_1_VERSION; - private final String transactionId; private final int transactionType; private final String transactionChainId; - private final int version; + private final short version; public CreateTransaction(String transactionId, int transactionType) { this(transactionId, transactionType, ""); } public CreateTransaction(String transactionId, int transactionType, String transactionChainId) { - this(transactionId, transactionType, transactionChainId, CURRENT_VERSION); + this(transactionId, transactionType, transactionChainId, DataStoreVersions.CURRENT_VERSION); } private CreateTransaction(String transactionId, int transactionType, String transactionChainId, - int version) { + short version) { this.transactionId = transactionId; this.transactionType = transactionType; this.transactionChainId = transactionChainId; @@ -48,7 +46,7 @@ public class CreateTransaction implements SerializableMessage { return transactionType; } - public int getVersion() { + public short getVersion() { return version; } @@ -66,7 +64,7 @@ public class CreateTransaction implements SerializableMessage { (ShardTransactionMessages.CreateTransaction) message; return new CreateTransaction(createTransaction.getTransactionId(), createTransaction.getTransactionType(), createTransaction.getTransactionChainId(), - createTransaction.getMessageVersion()); + (short)createTransaction.getMessageVersion()); } public String getTransactionChainId() { diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/CreateTransactionReply.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/CreateTransactionReply.java index 83e68c9cb4..3fde6cc7fc 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/CreateTransactionReply.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/CreateTransactionReply.java @@ -8,23 +8,22 @@ package org.opendaylight.controller.cluster.datastore.messages; +import org.opendaylight.controller.cluster.datastore.DataStoreVersions; import org.opendaylight.controller.protobuff.messages.transaction.ShardTransactionMessages; public class CreateTransactionReply implements SerializableMessage { - public static final Class SERIALIZABLE_CLASS = - ShardTransactionMessages.CreateTransactionReply.class; + public static final Class SERIALIZABLE_CLASS = ShardTransactionMessages.CreateTransactionReply.class; private final String transactionPath; private final String transactionId; - private final int version; + private final short version; - public CreateTransactionReply(final String transactionPath, - final String transactionId) { - this(transactionPath, transactionId, CreateTransaction.CURRENT_VERSION); + public CreateTransactionReply(String transactionPath, String transactionId) { + this(transactionPath, transactionId, DataStoreVersions.CURRENT_VERSION); } public CreateTransactionReply(final String transactionPath, - final String transactionId, final int version) { + final String transactionId, final short version) { this.transactionPath = transactionPath; this.transactionId = transactionId; this.version = version; @@ -39,7 +38,7 @@ public class CreateTransactionReply implements SerializableMessage { return transactionId; } - public int getVersion() { + public short getVersion() { return version; } @@ -52,9 +51,10 @@ public class CreateTransactionReply implements SerializableMessage { .build(); } - public static CreateTransactionReply fromSerializable(final Object serializable){ + public static CreateTransactionReply fromSerializable(Object serializable){ ShardTransactionMessages.CreateTransactionReply o = (ShardTransactionMessages.CreateTransactionReply) serializable; - return new CreateTransactionReply(o.getTransactionActorPath(), o.getTransactionId(), o.getMessageVersion()); + return new CreateTransactionReply(o.getTransactionActorPath(), o.getTransactionId(), + (short)o.getMessageVersion()); } } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/DataChanged.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/DataChanged.java index 5b5f076d43..fe81e27e3d 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/DataChanged.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/DataChanged.java @@ -8,255 +8,143 @@ package org.opendaylight.controller.cluster.datastore.messages; -import org.opendaylight.controller.cluster.datastore.node.NormalizedNodeToNodeCodec; -import org.opendaylight.controller.cluster.datastore.util.InstanceIdentifierUtils; +import java.io.Externalizable; +import java.io.IOException; +import java.io.ObjectInput; +import java.io.ObjectOutput; +import java.util.Map; +import java.util.Set; +import org.opendaylight.controller.cluster.datastore.DataStoreVersions; +import org.opendaylight.controller.cluster.datastore.node.utils.stream.NormalizedNodeInputStreamReader; +import org.opendaylight.controller.cluster.datastore.node.utils.stream.NormalizedNodeOutputStreamWriter; +import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker.DataChangeScope; import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeEvent; -import org.opendaylight.controller.protobuff.messages.common.NormalizedNodeMessages; -import org.opendaylight.controller.protobuff.messages.datachange.notification.DataChangeListenerMessages; +import org.opendaylight.controller.md.sal.dom.store.impl.DOMImmutableDataChangeEvent; +import org.opendaylight.controller.md.sal.dom.store.impl.DOMImmutableDataChangeEvent.Builder; import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode; -import org.opendaylight.yangtools.yang.model.api.SchemaContext; -import java.util.Collections; -import java.util.HashMap; -import java.util.HashSet; -import java.util.Iterator; -import java.util.List; -import java.util.Map; -import java.util.Set; - -public class DataChanged implements SerializableMessage { - public static final Class SERIALIZABLE_CLASS = - DataChangeListenerMessages.DataChanged.class; +import org.opendaylight.yangtools.yang.data.api.schema.stream.NormalizedNodeWriter; - final private SchemaContext schemaContext; - private final AsyncDataChangeEvent> - change; +public class DataChanged implements Externalizable { + private static final long serialVersionUID = 1L; + private AsyncDataChangeEvent> change; + public DataChanged() { + } - public DataChanged(SchemaContext schemaContext, - AsyncDataChangeEvent> change) { + public DataChanged(AsyncDataChangeEvent> change) { this.change = change; - this.schemaContext = schemaContext; } - public AsyncDataChangeEvent> getChange() { return change; } + @Override + public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { + in.readShort(); // Read the version - private NormalizedNodeMessages.Node convertToNodeTree( - NormalizedNode normalizedNode) { + NormalizedNodeInputStreamReader streamReader = new NormalizedNodeInputStreamReader(in); - return new NormalizedNodeToNodeCodec(schemaContext) - .encode(normalizedNode) - .getNormalizedNode(); + // Note: the scope passed to builder is not actually used. + Builder builder = DOMImmutableDataChangeEvent.builder(DataChangeScope.SUBTREE); - } + // Read created data - private Iterable convertToRemovePaths( - Set removedPaths) { - final Set removedPathInstanceIds = new HashSet<>(); - for (YangInstanceIdentifier id : removedPaths) { - removedPathInstanceIds.add(InstanceIdentifierUtils.toSerializable(id)); + int size = in.readInt(); + for(int i = 0; i < size; i++) { + YangInstanceIdentifier path = streamReader.readYangInstanceIdentifier(); + NormalizedNode node = streamReader.readNormalizedNode(); + builder.addCreated(path, node); } - return new Iterable() { - @Override - public Iterator iterator() { - return removedPathInstanceIds.iterator(); - } - }; - } + // Read updated data - private NormalizedNodeMessages.NodeMap convertToNodeMap( - Map> data) { - NormalizedNodeToNodeCodec normalizedNodeToNodeCodec = - new NormalizedNodeToNodeCodec(schemaContext); - NormalizedNodeMessages.NodeMap.Builder nodeMapBuilder = - NormalizedNodeMessages.NodeMap.newBuilder(); - NormalizedNodeMessages.NodeMapEntry.Builder builder = - NormalizedNodeMessages.NodeMapEntry.newBuilder(); - for (Map.Entry> entry : data - .entrySet()) { - - - NormalizedNodeMessages.InstanceIdentifier instanceIdentifier = - InstanceIdentifierUtils.toSerializable(entry.getKey()); - - builder.setInstanceIdentifierPath(instanceIdentifier) - .setNormalizedNode(normalizedNodeToNodeCodec - .encode(entry.getValue()) - .getNormalizedNode()); - nodeMapBuilder.addMapEntries(builder.build()); + size = in.readInt(); + for(int i = 0; i < size; i++) { + YangInstanceIdentifier path = streamReader.readYangInstanceIdentifier(); + NormalizedNode before = streamReader.readNormalizedNode(); + NormalizedNode after = streamReader.readNormalizedNode(); + builder.addUpdated(path, before, after); } - return nodeMapBuilder.build(); - } - - @Override - public Object toSerializable() { - return DataChangeListenerMessages.DataChanged.newBuilder() - .addAllRemovedPaths(convertToRemovePaths(change.getRemovedPaths())) - .setCreatedData(convertToNodeMap(change.getCreatedData())) - .setOriginalData(convertToNodeMap(change.getOriginalData())) - .setUpdatedData(convertToNodeMap(change.getUpdatedData())) - .setOriginalSubTree(convertToNodeTree(change.getOriginalSubtree())) - .setUpdatedSubTree(convertToNodeTree(change.getUpdatedSubtree())) - .build(); - } + // Read removed data - public static DataChanged fromSerialize(SchemaContext sc, Object message, - YangInstanceIdentifier pathId) { - DataChangeListenerMessages.DataChanged dataChanged = - (DataChangeListenerMessages.DataChanged) message; - DataChangedEvent event = new DataChangedEvent(sc); - if (dataChanged.getCreatedData() != null && dataChanged.getCreatedData() - .isInitialized()) { - event.setCreatedData(dataChanged.getCreatedData()); - } - if (dataChanged.getOriginalData() != null && dataChanged - .getOriginalData().isInitialized()) { - event.setOriginalData(dataChanged.getOriginalData()); + size = in.readInt(); + for(int i = 0; i < size; i++) { + YangInstanceIdentifier path = streamReader.readYangInstanceIdentifier(); + NormalizedNode node = streamReader.readNormalizedNode(); + builder.addRemoved(path, node); } - if (dataChanged.getUpdatedData() != null && dataChanged.getUpdatedData() - .isInitialized()) { - event.setUpdateData(dataChanged.getUpdatedData()); - } + // Read original subtree - if (dataChanged.getOriginalSubTree() != null && dataChanged - .getOriginalSubTree().isInitialized()) { - event.setOriginalSubtree(dataChanged.getOriginalSubTree(), pathId); + boolean present = in.readBoolean(); + if(present) { + builder.setBefore(streamReader.readNormalizedNode()); } - if (dataChanged.getUpdatedSubTree() != null && dataChanged - .getUpdatedSubTree().isInitialized()) { - event.setUpdatedSubtree(dataChanged.getOriginalSubTree(), pathId); - } + // Read updated subtree - if (dataChanged.getRemovedPathsList() != null && !dataChanged - .getRemovedPathsList().isEmpty()) { - event.setRemovedPaths(dataChanged.getRemovedPathsList()); + present = in.readBoolean(); + if(present) { + builder.setAfter(streamReader.readNormalizedNode()); } - return new DataChanged(sc, event); - + change = builder.build(); } - static class DataChangedEvent implements - AsyncDataChangeEvent> { - private Map> createdData; - private final NormalizedNodeToNodeCodec nodeCodec; - private Map> updatedData; - private Map> originalData; - private NormalizedNode originalSubTree; - private NormalizedNode updatedSubTree; - private Set removedPathIds; - - DataChangedEvent(SchemaContext schemaContext) { - nodeCodec = new NormalizedNodeToNodeCodec(schemaContext); - } - - @Override - public Map> getCreatedData() { - if(createdData == null){ - return Collections.emptyMap(); - } - return createdData; - } - - DataChangedEvent setCreatedData( - NormalizedNodeMessages.NodeMap nodeMap) { - this.createdData = convertNodeMapToMap(nodeMap); - return this; - } - - private Map> convertNodeMapToMap( - NormalizedNodeMessages.NodeMap nodeMap) { - Map> mapEntries = - new HashMap>(); - for (NormalizedNodeMessages.NodeMapEntry nodeMapEntry : nodeMap - .getMapEntriesList()) { - YangInstanceIdentifier id = InstanceIdentifierUtils - .fromSerializable(nodeMapEntry.getInstanceIdentifierPath()); - mapEntries.put(id, - nodeCodec.decode(nodeMapEntry.getNormalizedNode())); - } - return mapEntries; - } + @Override + public void writeExternal(ObjectOutput out) throws IOException { + out.writeShort(DataStoreVersions.CURRENT_VERSION); + NormalizedNodeOutputStreamWriter streamWriter = new NormalizedNodeOutputStreamWriter(out); + NormalizedNodeWriter nodeWriter = NormalizedNodeWriter.forStreamWriter(streamWriter); - @Override - public Map> getUpdatedData() { - if(updatedData == null){ - return Collections.emptyMap(); - } - return updatedData; - } + // Write created data - DataChangedEvent setUpdateData(NormalizedNodeMessages.NodeMap nodeMap) { - this.updatedData = convertNodeMapToMap(nodeMap); - return this; + Map> createdData = change.getCreatedData(); + out.writeInt(createdData.size()); + for(Map.Entry> e: createdData.entrySet()) { + streamWriter.writeYangInstanceIdentifier(e.getKey()); + nodeWriter.write(e.getValue()); } - @Override - public Set getRemovedPaths() { - if (removedPathIds == null) { - return Collections.emptySet(); - } - return removedPathIds; - } + // Write updated data - public DataChangedEvent setRemovedPaths(List removedPaths) { - Set removedIds = new HashSet<>(); - for (NormalizedNodeMessages.InstanceIdentifier path : removedPaths) { - removedIds.add(InstanceIdentifierUtils.fromSerializable(path)); - } - this.removedPathIds = removedIds; - return this; + Map> originalData = change.getOriginalData(); + Map> updatedData = change.getUpdatedData(); + out.writeInt(updatedData.size()); + for(Map.Entry> e: updatedData.entrySet()) { + streamWriter.writeYangInstanceIdentifier(e.getKey()); + nodeWriter.write(originalData.get(e.getKey())); + nodeWriter.write(e.getValue()); } - @Override - public Map> getOriginalData() { - if (originalData == null) { - Collections.emptyMap(); - } - return originalData; - } + // Write removed data - DataChangedEvent setOriginalData( - NormalizedNodeMessages.NodeMap nodeMap) { - this.originalData = convertNodeMapToMap(nodeMap); - return this; + Set removed = change.getRemovedPaths(); + out.writeInt(removed.size()); + for(YangInstanceIdentifier path: removed) { + streamWriter.writeYangInstanceIdentifier(path); + nodeWriter.write(originalData.get(path)); } - @Override - public NormalizedNode getOriginalSubtree() { - return originalSubTree; - } + // Write original subtree - DataChangedEvent setOriginalSubtree(NormalizedNodeMessages.Node node, - YangInstanceIdentifier instanceIdentifierPath) { - originalSubTree = nodeCodec.decode(node); - return this; + NormalizedNode originalSubtree = change.getOriginalSubtree(); + out.writeBoolean(originalSubtree != null); + if(originalSubtree != null) { + nodeWriter.write(originalSubtree); } - @Override - public NormalizedNode getUpdatedSubtree() { - return updatedSubTree; - } + // Write original subtree - DataChangedEvent setUpdatedSubtree(NormalizedNodeMessages.Node node, - YangInstanceIdentifier instanceIdentifierPath) { - updatedSubTree = nodeCodec.decode(node); - return this; + NormalizedNode updatedSubtree = change.getUpdatedSubtree(); + out.writeBoolean(updatedSubtree != null); + if(updatedSubtree != null) { + nodeWriter.write(updatedSubtree); } - - } - - - } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/DataChangedReply.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/DataChangedReply.java index e10a407292..2db03446d9 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/DataChangedReply.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/DataChangedReply.java @@ -11,10 +11,16 @@ package org.opendaylight.controller.cluster.datastore.messages; import org.opendaylight.controller.protobuff.messages.datachange.notification.DataChangeListenerMessages; public class DataChangedReply implements SerializableMessage { - public static final Class SERIALIZABLE_CLASS = - DataChangeListenerMessages.DataChangedReply.class; - @Override - public Object toSerializable() { - return DataChangeListenerMessages.DataChangedReply.newBuilder().build(); - } + public static final Class SERIALIZABLE_CLASS = + DataChangeListenerMessages.DataChangedReply.class; + + private static final Object SERIALIZED_INSTANCE = + DataChangeListenerMessages.DataChangedReply.newBuilder().build(); + + public static final DataChangedReply INSTANCE = new DataChangedReply(); + + @Override + public Object toSerializable() { + return SERIALIZED_INSTANCE; + } } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/DeleteDataReply.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/DeleteDataReply.java index 2e02664e1d..eba9c39170 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/DeleteDataReply.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/DeleteDataReply.java @@ -11,10 +11,15 @@ package org.opendaylight.controller.cluster.datastore.messages; import org.opendaylight.controller.protobuff.messages.transaction.ShardTransactionMessages; public class DeleteDataReply implements SerializableMessage{ - public static final Class SERIALIZABLE_CLASS = - ShardTransactionMessages.DeleteDataReply.class; - @Override - public Object toSerializable() { - return ShardTransactionMessages.DeleteDataReply.newBuilder().build(); - } + public static final Class SERIALIZABLE_CLASS = + ShardTransactionMessages.DeleteDataReply.class; + + private static final Object SERIALIZED_INSTANCE = ShardTransactionMessages.DeleteDataReply.newBuilder().build(); + + public static final DeleteDataReply INSTANCE = new DeleteDataReply(); + + @Override + public Object toSerializable() { + return SERIALIZED_INSTANCE; + } } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/EmptyExternalizable.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/EmptyExternalizable.java new file mode 100644 index 0000000000..0b7b262a9d --- /dev/null +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/EmptyExternalizable.java @@ -0,0 +1,29 @@ +/* + * Copyright (c) 2015 Brocade Communications Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.controller.cluster.datastore.messages; + +import java.io.Externalizable; +import java.io.IOException; +import java.io.ObjectInput; +import java.io.ObjectOutput; + +/** + * Externalizable with no data. + * + * @author Thomas Pantelis + */ +public class EmptyExternalizable implements Externalizable { + + @Override + public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { + } + + @Override + public void writeExternal(ObjectOutput out) throws IOException { + } +} diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/EmptyReply.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/EmptyReply.java new file mode 100644 index 0000000000..284c6eff8d --- /dev/null +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/EmptyReply.java @@ -0,0 +1,30 @@ +/* + * Copyright (c) 2015 Brocade Communications Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.controller.cluster.datastore.messages; + +import org.opendaylight.controller.cluster.datastore.DataStoreVersions; + +/** + * A reply with no data. + * + * @author Thomas Pantelis + */ +public abstract class EmptyReply extends EmptyExternalizable implements VersionedSerializableMessage { + + private final Object legacySerializedInstance; + + protected EmptyReply(Object legacySerializedInstance) { + super(); + this.legacySerializedInstance = legacySerializedInstance; + } + + @Override + public Object toSerializable(short toVersion) { + return toVersion >= DataStoreVersions.LITHIUM_VERSION ? this : legacySerializedInstance; + } +} diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/MergeData.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/MergeData.java index eb1f3495bd..9234385b35 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/MergeData.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/MergeData.java @@ -8,36 +8,54 @@ package org.opendaylight.controller.cluster.datastore.messages; +import org.opendaylight.controller.cluster.datastore.DataStoreVersions; import org.opendaylight.controller.cluster.datastore.node.NormalizedNodeToNodeCodec; import org.opendaylight.controller.cluster.datastore.node.NormalizedNodeToNodeCodec.Decoded; import org.opendaylight.controller.cluster.datastore.node.NormalizedNodeToNodeCodec.Encoded; import org.opendaylight.controller.protobuff.messages.transaction.ShardTransactionMessages; import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode; -import org.opendaylight.yangtools.yang.model.api.SchemaContext; -public class MergeData extends ModifyData{ +public class MergeData extends ModifyData implements VersionedSerializableMessage { + private static final long serialVersionUID = 1L; - public static final Class SERIALIZABLE_CLASS = - ShardTransactionMessages.MergeData.class; + public static final Class SERIALIZABLE_CLASS = MergeData.class; - public MergeData(YangInstanceIdentifier path, NormalizedNode data, - SchemaContext context) { - super(path, data, context); + public MergeData() { + } + + public MergeData(YangInstanceIdentifier path, NormalizedNode data) { + super(path, data); } @Override - public Object toSerializable() { - Encoded encoded = new NormalizedNodeToNodeCodec(schemaContext).encode(path, data); - return ShardTransactionMessages.MergeData.newBuilder() - .setInstanceIdentifierPathArguments(encoded.getEncodedPath()) - .setNormalizedNode(encoded.getEncodedNode().getNormalizedNode()).build(); + public Object toSerializable(short toVersion) { + if(toVersion >= DataStoreVersions.LITHIUM_VERSION) { + setVersion(toVersion); + return this; + } else { + // To base or R1 Helium version + Encoded encoded = new NormalizedNodeToNodeCodec(null).encode(getPath(), getData()); + return ShardTransactionMessages.MergeData.newBuilder() + .setInstanceIdentifierPathArguments(encoded.getEncodedPath()) + .setNormalizedNode(encoded.getEncodedNode().getNormalizedNode()).build(); + } + } + + public static MergeData fromSerializable(Object serializable){ + if(serializable instanceof MergeData) { + return (MergeData) serializable; + } else { + // From base or R1 Helium version + ShardTransactionMessages.MergeData o = (ShardTransactionMessages.MergeData) serializable; + Decoded decoded = new NormalizedNodeToNodeCodec(null).decode( + o.getInstanceIdentifierPathArguments(), o.getNormalizedNode()); + return new MergeData(decoded.getDecodedPath(), decoded.getDecodedNode()); + } } - public static MergeData fromSerializable(Object serializable, SchemaContext schemaContext){ - ShardTransactionMessages.MergeData o = (ShardTransactionMessages.MergeData) serializable; - Decoded decoded = new NormalizedNodeToNodeCodec(schemaContext).decode( - o.getInstanceIdentifierPathArguments(), o.getNormalizedNode()); - return new MergeData(decoded.getDecodedPath(), decoded.getDecodedNode(), schemaContext); + public static boolean isSerializedType(Object message) { + return SERIALIZABLE_CLASS.isAssignableFrom(message.getClass()) || + message instanceof ShardTransactionMessages.MergeData; } } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/MergeDataReply.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/MergeDataReply.java index 92d6d72847..a4c514bdbf 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/MergeDataReply.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/MergeDataReply.java @@ -10,12 +10,15 @@ package org.opendaylight.controller.cluster.datastore.messages; import org.opendaylight.controller.protobuff.messages.transaction.ShardTransactionMessages; -public class MergeDataReply implements SerializableMessage{ - public static final Class SERIALIZABLE_CLASS = - ShardTransactionMessages.MergeDataReply.class; +public class MergeDataReply extends EmptyReply { + private static final long serialVersionUID = 1L; - @Override - public Object toSerializable() { - return ShardTransactionMessages.MergeDataReply.newBuilder().build(); - } + private static final Object LEGACY_SERIALIZED_INSTANCE = + ShardTransactionMessages.MergeDataReply.newBuilder().build(); + + public static final MergeDataReply INSTANCE = new MergeDataReply(); + + public MergeDataReply() { + super(LEGACY_SERIALIZED_INSTANCE); + } } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/ModifyData.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/ModifyData.java index b5c39d1c3f..69c41c2a56 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/ModifyData.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/ModifyData.java @@ -8,25 +8,28 @@ package org.opendaylight.controller.cluster.datastore.messages; -import com.google.common.base.Preconditions; +import java.io.Externalizable; +import java.io.IOException; +import java.io.ObjectInput; +import java.io.ObjectOutput; +import org.opendaylight.controller.cluster.datastore.utils.SerializationUtils; +import org.opendaylight.controller.cluster.datastore.utils.SerializationUtils.Applier; import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode; -import org.opendaylight.yangtools.yang.model.api.SchemaContext; -public abstract class ModifyData implements SerializableMessage { - protected final YangInstanceIdentifier path; - protected final NormalizedNode data; - protected final SchemaContext schemaContext; +public abstract class ModifyData implements Externalizable { + private static final long serialVersionUID = 1L; - public ModifyData(YangInstanceIdentifier path, NormalizedNode data, - SchemaContext context) { - Preconditions.checkNotNull(context, - "Cannot serialize an object which does not have a schema schemaContext"); + private YangInstanceIdentifier path; + private NormalizedNode data; + private short version; + protected ModifyData() { + } + protected ModifyData(YangInstanceIdentifier path, NormalizedNode data) { this.path = path; this.data = data; - this.schemaContext = context; } public YangInstanceIdentifier getPath() { @@ -37,4 +40,31 @@ public abstract class ModifyData implements SerializableMessage { return data; } + public short getVersion() { + return version; + } + + protected void setVersion(short version) { + this.version = version; + } + + @Override + public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { + version = in.readShort(); + SerializationUtils.deserializePathAndNode(in, this, APPLIER); + } + + @Override + public void writeExternal(ObjectOutput out) throws IOException { + out.writeShort(version); + SerializationUtils.serializePathAndNode(path, data, out); + } + + private static final Applier APPLIER = new Applier() { + @Override + public void apply(ModifyData instance, YangInstanceIdentifier path, NormalizedNode data) { + instance.path = path; + instance.data = data; + } + }; } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/ReadDataReply.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/ReadDataReply.java index 43dd81252c..8ac6e1b149 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/ReadDataReply.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/ReadDataReply.java @@ -9,23 +9,29 @@ package org.opendaylight.controller.cluster.datastore.messages; import com.google.protobuf.ByteString; +import java.io.Externalizable; +import java.io.IOException; +import java.io.ObjectInput; +import java.io.ObjectOutput; +import org.opendaylight.controller.cluster.datastore.DataStoreVersions; import org.opendaylight.controller.cluster.datastore.node.NormalizedNodeToNodeCodec; +import org.opendaylight.controller.cluster.datastore.utils.SerializationUtils; import org.opendaylight.controller.protobuff.messages.transaction.ShardTransactionMessages; -import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode; -import org.opendaylight.yangtools.yang.model.api.SchemaContext; -public class ReadDataReply implements SerializableMessage { - public static final Class SERIALIZABLE_CLASS = - ShardTransactionMessages.ReadDataReply.class; +public class ReadDataReply implements VersionedSerializableMessage, Externalizable { + private static final long serialVersionUID = 1L; - private final NormalizedNode normalizedNode; - private final SchemaContext schemaContext; + public static final Class SERIALIZABLE_CLASS = ReadDataReply.class; - public ReadDataReply(SchemaContext context,NormalizedNode normalizedNode){ + private NormalizedNode normalizedNode; + private short version; + public ReadDataReply() { + } + + public ReadDataReply(NormalizedNode normalizedNode) { this.normalizedNode = normalizedNode; - this.schemaContext = context; } public NormalizedNode getNormalizedNode() { @@ -33,26 +39,62 @@ public class ReadDataReply implements SerializableMessage { } @Override - public Object toSerializable(){ + public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { + version = in.readShort(); + normalizedNode = SerializationUtils.deserializeNormalizedNode(in); + } + + @Override + public void writeExternal(ObjectOutput out) throws IOException { + out.writeShort(version); + SerializationUtils.serializeNormalizedNode(normalizedNode, out); + } + + @Override + public Object toSerializable(short toVersion) { + if(toVersion >= DataStoreVersions.LITHIUM_VERSION) { + version = toVersion; + return this; + } else { + return toSerializableReadDataReply(normalizedNode); + } + } + + private static ShardTransactionMessages.ReadDataReply toSerializableReadDataReply( + NormalizedNode normalizedNode) { if(normalizedNode != null) { return ShardTransactionMessages.ReadDataReply.newBuilder() - .setNormalizedNode(new NormalizedNodeToNodeCodec(schemaContext) - .encode(normalizedNode).getNormalizedNode()).build(); + .setNormalizedNode(new NormalizedNodeToNodeCodec(null) + .encode(normalizedNode).getNormalizedNode()).build(); } else { return ShardTransactionMessages.ReadDataReply.newBuilder().build(); } } - public static ReadDataReply fromSerializable(SchemaContext schemaContext, - YangInstanceIdentifier id, Object serializable) { - ShardTransactionMessages.ReadDataReply o = (ShardTransactionMessages.ReadDataReply) serializable; - return new ReadDataReply(schemaContext, new NormalizedNodeToNodeCodec(schemaContext).decode( - o.getNormalizedNode())); + public static ReadDataReply fromSerializable(Object serializable) { + if(serializable instanceof ReadDataReply) { + return (ReadDataReply) serializable; + } else { + ShardTransactionMessages.ReadDataReply o = + (ShardTransactionMessages.ReadDataReply) serializable; + return new ReadDataReply(new NormalizedNodeToNodeCodec(null).decode(o.getNormalizedNode())); + } + } + + public static ByteString fromSerializableAsByteString(Object serializable) { + if(serializable instanceof ReadDataReply) { + ReadDataReply r = (ReadDataReply)serializable; + return toSerializableReadDataReply(r.getNormalizedNode()).toByteString(); + } else { + ShardTransactionMessages.ReadDataReply o = + (ShardTransactionMessages.ReadDataReply) serializable; + return o.getNormalizedNode().toByteString(); + } } - public static ByteString getNormalizedNodeByteString(Object serializable){ - ShardTransactionMessages.ReadDataReply o = (ShardTransactionMessages.ReadDataReply) serializable; - return ((ShardTransactionMessages.ReadDataReply) serializable).getNormalizedNode().toByteString(); + public static boolean isSerializedType(Object message) { + return SERIALIZABLE_CLASS.isAssignableFrom(message.getClass()) || + message instanceof ShardTransactionMessages.ReadDataReply; } } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/ReadyTransaction.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/ReadyTransaction.java index 581caefd04..09617abde9 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/ReadyTransaction.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/ReadyTransaction.java @@ -11,12 +11,15 @@ package org.opendaylight.controller.cluster.datastore.messages; import org.opendaylight.controller.protobuff.messages.transaction.ShardTransactionMessages; public class ReadyTransaction implements SerializableMessage{ - public static final Class SERIALIZABLE_CLASS = - ShardTransactionMessages.ReadyTransaction.class; + public static final Class SERIALIZABLE_CLASS = + ShardTransactionMessages.ReadyTransaction.class; - @Override - public Object toSerializable() { - return ShardTransactionMessages.ReadyTransaction.newBuilder().build(); - } + private static final Object SERIALIZED_INSTANCE = ShardTransactionMessages.ReadyTransaction.newBuilder().build(); + public static final ReadyTransaction INSTANCE = new ReadyTransaction(); + + @Override + public Object toSerializable() { + return SERIALIZED_INSTANCE; + } } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/VersionedSerializableMessage.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/VersionedSerializableMessage.java new file mode 100644 index 0000000000..5c30b1078e --- /dev/null +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/VersionedSerializableMessage.java @@ -0,0 +1,17 @@ +/* + * Copyright (c) 2014 Brocade Communications Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.controller.cluster.datastore.messages; + +/** + * Interface for a Serializable message with versioning. + * + * @author Thomas Pantelis + */ +public interface VersionedSerializableMessage { + Object toSerializable(short toVersion); +} diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/WriteData.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/WriteData.java index 8aa63ef262..c5e3a6b059 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/WriteData.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/WriteData.java @@ -8,35 +8,54 @@ package org.opendaylight.controller.cluster.datastore.messages; +import org.opendaylight.controller.cluster.datastore.DataStoreVersions; import org.opendaylight.controller.cluster.datastore.node.NormalizedNodeToNodeCodec; import org.opendaylight.controller.cluster.datastore.node.NormalizedNodeToNodeCodec.Decoded; import org.opendaylight.controller.cluster.datastore.node.NormalizedNodeToNodeCodec.Encoded; import org.opendaylight.controller.protobuff.messages.transaction.ShardTransactionMessages; import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode; -import org.opendaylight.yangtools.yang.model.api.SchemaContext; -public class WriteData extends ModifyData { +public class WriteData extends ModifyData implements VersionedSerializableMessage { + private static final long serialVersionUID = 1L; - public static final Class SERIALIZABLE_CLASS = - ShardTransactionMessages.WriteData.class; + public static final Class SERIALIZABLE_CLASS = WriteData.class; - public WriteData(YangInstanceIdentifier path, NormalizedNode data, SchemaContext schemaContext) { - super(path, data, schemaContext); + public WriteData() { + } + + public WriteData(YangInstanceIdentifier path, NormalizedNode data) { + super(path, data); } @Override - public Object toSerializable() { - Encoded encoded = new NormalizedNodeToNodeCodec(schemaContext).encode(path, data); - return ShardTransactionMessages.WriteData.newBuilder() - .setInstanceIdentifierPathArguments(encoded.getEncodedPath()) - .setNormalizedNode(encoded.getEncodedNode().getNormalizedNode()).build(); + public Object toSerializable(short toVersion) { + if(toVersion >= DataStoreVersions.LITHIUM_VERSION) { + setVersion(toVersion); + return this; + } else { + // To base or R1 Helium version + Encoded encoded = new NormalizedNodeToNodeCodec(null).encode(getPath(), getData()); + return ShardTransactionMessages.WriteData.newBuilder() + .setInstanceIdentifierPathArguments(encoded.getEncodedPath()) + .setNormalizedNode(encoded.getEncodedNode().getNormalizedNode()).build(); + } + } + + public static WriteData fromSerializable(Object serializable) { + if(serializable instanceof WriteData) { + return (WriteData) serializable; + } else { + // From base or R1 Helium version + ShardTransactionMessages.WriteData o = (ShardTransactionMessages.WriteData) serializable; + Decoded decoded = new NormalizedNodeToNodeCodec(null).decode( + o.getInstanceIdentifierPathArguments(), o.getNormalizedNode()); + return new WriteData(decoded.getDecodedPath(), decoded.getDecodedNode()); + } } - public static WriteData fromSerializable(Object serializable, SchemaContext schemaContext){ - ShardTransactionMessages.WriteData o = (ShardTransactionMessages.WriteData) serializable; - Decoded decoded = new NormalizedNodeToNodeCodec(schemaContext).decode( - o.getInstanceIdentifierPathArguments(), o.getNormalizedNode()); - return new WriteData(decoded.getDecodedPath(), decoded.getDecodedNode(), schemaContext); + public static boolean isSerializedType(Object message) { + return SERIALIZABLE_CLASS.isAssignableFrom(message.getClass()) || + message instanceof ShardTransactionMessages.WriteData; } } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/WriteDataReply.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/WriteDataReply.java index 876105de18..8255828819 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/WriteDataReply.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/WriteDataReply.java @@ -10,11 +10,15 @@ package org.opendaylight.controller.cluster.datastore.messages; import org.opendaylight.controller.protobuff.messages.transaction.ShardTransactionMessages; -public class WriteDataReply implements SerializableMessage{ - public static final Class SERIALIZABLE_CLASS = - ShardTransactionMessages.WriteDataReply.class; - @Override - public Object toSerializable() { - return ShardTransactionMessages.WriteDataReply.newBuilder().build(); - } +public class WriteDataReply extends EmptyReply { + private static final long serialVersionUID = 1L; + + private static final Object LEGACY_SERIALIZED_INSTANCE = + ShardTransactionMessages.WriteDataReply.newBuilder().build(); + + public static final WriteDataReply INSTANCE = new WriteDataReply(); + + public WriteDataReply() { + super(LEGACY_SERIALIZED_INSTANCE); + } } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/utils/SerializationUtils.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/utils/SerializationUtils.java new file mode 100644 index 0000000000..8404a6e6d8 --- /dev/null +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/utils/SerializationUtils.java @@ -0,0 +1,81 @@ +/* + * Copyright (c) 2014 Brocade Communications Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.controller.cluster.datastore.utils; + +import com.google.common.base.Preconditions; +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; +import org.opendaylight.controller.cluster.datastore.node.utils.stream.NormalizedNodeInputStreamReader; +import org.opendaylight.controller.cluster.datastore.node.utils.stream.NormalizedNodeOutputStreamWriter; +import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; +import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode; +import org.opendaylight.yangtools.yang.data.api.schema.stream.NormalizedNodeWriter; + +/** + * Provides various utility methods for serialization and de-serialization. + * + * @author Thomas Pantelis + */ +public final class SerializationUtils { + public static interface Applier { + void apply(T instance, YangInstanceIdentifier path, NormalizedNode node); + } + + public static void serializePathAndNode(YangInstanceIdentifier path, NormalizedNode node, + DataOutput out) { + Preconditions.checkNotNull(path); + Preconditions.checkNotNull(node); + try { + NormalizedNodeOutputStreamWriter streamWriter = new NormalizedNodeOutputStreamWriter(out); + NormalizedNodeWriter.forStreamWriter(streamWriter).write(node); + streamWriter.writeYangInstanceIdentifier(path); + } catch (IOException e) { + throw new IllegalArgumentException(String.format("Error serializing path {} and Node {}", + path, node), e); + } + } + + public static void deserializePathAndNode(DataInput in, T instance, Applier applier) { + try { + NormalizedNodeInputStreamReader streamReader = new NormalizedNodeInputStreamReader(in); + NormalizedNode node = streamReader.readNormalizedNode(); + YangInstanceIdentifier path = streamReader.readYangInstanceIdentifier(); + applier.apply(instance, path, node); + } catch (IOException e) { + throw new IllegalArgumentException("Error deserializing path and Node", e); + } + } + + public static void serializeNormalizedNode(NormalizedNode node, DataOutput out) { + try { + out.writeBoolean(node != null); + if(node != null) { + NormalizedNodeOutputStreamWriter streamWriter = new NormalizedNodeOutputStreamWriter(out); + NormalizedNodeWriter.forStreamWriter(streamWriter).write(node); + } + } catch (IOException e) { + throw new IllegalArgumentException(String.format("Error serializing NormalizedNode {}", + node), e); + } + } + + public static NormalizedNode deserializeNormalizedNode(DataInput in) { + try { + boolean present = in.readBoolean(); + if(present) { + NormalizedNodeInputStreamReader streamReader = new NormalizedNodeInputStreamReader(in); + return streamReader.readNormalizedNode(); + } + } catch (IOException e) { + throw new IllegalArgumentException("Error deserializing NormalizedNode", e); + } + + return null; + } +} diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DataChangeListenerProxyTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DataChangeListenerProxyTest.java index 55250dd5e9..5485c57fd6 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DataChangeListenerProxyTest.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DataChangeListenerProxyTest.java @@ -17,11 +17,9 @@ import org.opendaylight.controller.cluster.datastore.utils.MessageCollectorActor import org.opendaylight.controller.cluster.datastore.utils.MockClusterWrapper; import org.opendaylight.controller.cluster.datastore.utils.MockConfiguration; import org.opendaylight.controller.md.cluster.datastore.model.CompositeModel; -import org.opendaylight.controller.md.cluster.datastore.model.TestModel; import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeEvent; import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode; - public class DataChangeListenerProxyTest extends AbstractActorTest { private static class MockDataChangedEvent implements AsyncDataChangeEvent> { @@ -73,7 +71,7 @@ public class DataChangeListenerProxyTest extends AbstractActorTest { final ActorRef actorRef = getSystem().actorOf(props); DataChangeListenerProxy dataChangeListenerProxy = new DataChangeListenerProxy( - TestModel.createTestContext(), getSystem().actorSelection(actorRef.path())); + getSystem().actorSelection(actorRef.path())); dataChangeListenerProxy.onDataChanged(new MockDataChangedEvent()); diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DataChangeListenerTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DataChangeListenerTest.java index 25d47388fe..19f0f8c551 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DataChangeListenerTest.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DataChangeListenerTest.java @@ -29,7 +29,7 @@ public class DataChangeListenerTest extends AbstractActorTest { // Let the DataChangeListener know that notifications should be enabled subject.tell(new EnableNotification(true), getRef()); - subject.tell(new DataChanged(CompositeModel.createTestContext(), mockChangeEvent), + subject.tell(new DataChanged(mockChangeEvent), getRef()); expectMsgClass(DataChangedReply.class); @@ -48,7 +48,7 @@ public class DataChangeListenerTest extends AbstractActorTest { final ActorRef subject = getSystem().actorOf(props, "testDataChangedNotificationsDisabled"); - subject.tell(new DataChanged(CompositeModel.createTestContext(), mockChangeEvent), + subject.tell(new DataChanged(mockChangeEvent), getRef()); new Within(duration("1 seconds")) { @@ -74,8 +74,7 @@ public class DataChangeListenerTest extends AbstractActorTest { getSystem().eventStream().subscribe(getRef(), DeadLetter.class); - subject.tell(new DataChanged(CompositeModel.createTestContext(), mockChangeEvent), - ActorRef.noSender()); + subject.tell(new DataChanged(mockChangeEvent), ActorRef.noSender()); // Make sure no DataChangedReply is sent to DeadLetters. while(true) { @@ -113,13 +112,13 @@ public class DataChangeListenerTest extends AbstractActorTest { SchemaContext schemaContext = CompositeModel.createTestContext(); - subject.tell(new DataChanged(schemaContext, mockChangeEvent1),getRef()); + subject.tell(new DataChanged(mockChangeEvent1),getRef()); expectMsgClass(DataChangedReply.class); - subject.tell(new DataChanged(schemaContext, mockChangeEvent2),getRef()); + subject.tell(new DataChanged(mockChangeEvent2),getRef()); expectMsgClass(DataChangedReply.class); - subject.tell(new DataChanged(schemaContext, mockChangeEvent3),getRef()); + subject.tell(new DataChanged(mockChangeEvent3),getRef()); expectMsgClass(DataChangedReply.class); Mockito.verify(mockListener).onDataChanged(mockChangeEvent1); diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTest.java index ed842b2021..42f30437c9 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTest.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTest.java @@ -10,7 +10,7 @@ import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.inOrder; import static org.mockito.Mockito.mock; -import static org.opendaylight.controller.cluster.datastore.messages.CreateTransaction.CURRENT_VERSION; +import static org.opendaylight.controller.cluster.datastore.DataStoreVersions.CURRENT_VERSION; import akka.actor.ActorRef; import akka.actor.PoisonPill; import akka.actor.Props; diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTransactionFailureTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTransactionFailureTest.java index 9f57359429..09a4532b53 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTransactionFailureTest.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTransactionFailureTest.java @@ -21,7 +21,6 @@ import org.junit.BeforeClass; import org.junit.Test; import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier; import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shard.ShardStats; -import org.opendaylight.controller.cluster.datastore.messages.CreateTransaction; import org.opendaylight.controller.cluster.datastore.node.utils.serialization.NormalizedNodeSerializer; import org.opendaylight.controller.md.cluster.datastore.model.TestModel; import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException; @@ -72,7 +71,7 @@ public class ShardTransactionFailureTest extends AbstractActorTest { final ActorRef shard = createShard(); final Props props = ShardTransaction.props(store.newReadOnlyTransaction(), shard, testSchemaContext, datastoreContext, shardStats, "txn", - CreateTransaction.CURRENT_VERSION); + DataStoreVersions.CURRENT_VERSION); final TestActorRef subject = TestActorRef .create(getSystem(), props, @@ -102,7 +101,7 @@ public class ShardTransactionFailureTest extends AbstractActorTest { final ActorRef shard = createShard(); final Props props = ShardTransaction.props(store.newReadWriteTransaction(), shard, testSchemaContext, datastoreContext, shardStats, "txn", - CreateTransaction.CURRENT_VERSION); + DataStoreVersions.CURRENT_VERSION); final TestActorRef subject = TestActorRef .create(getSystem(), props, @@ -132,7 +131,7 @@ public class ShardTransactionFailureTest extends AbstractActorTest { final ActorRef shard = createShard(); final Props props = ShardTransaction.props(store.newReadWriteTransaction(), shard, testSchemaContext, datastoreContext, shardStats, "txn", - CreateTransaction.CURRENT_VERSION); + DataStoreVersions.CURRENT_VERSION); final TestActorRef subject = TestActorRef .create(getSystem(), props, @@ -162,7 +161,7 @@ public class ShardTransactionFailureTest extends AbstractActorTest { final ActorRef shard = createShard(); final Props props = ShardTransaction.props(store.newWriteOnlyTransaction(), shard, testSchemaContext, datastoreContext, shardStats, "txn", - CreateTransaction.CURRENT_VERSION); + DataStoreVersions.CURRENT_VERSION); final TestActorRef subject = TestActorRef .create(getSystem(), props, @@ -195,7 +194,7 @@ public class ShardTransactionFailureTest extends AbstractActorTest { final ActorRef shard = createShard(); final Props props = ShardTransaction.props(store.newReadWriteTransaction(), shard, testSchemaContext, datastoreContext, shardStats, "txn", - CreateTransaction.CURRENT_VERSION); + DataStoreVersions.CURRENT_VERSION); final TestActorRef subject = TestActorRef .create(getSystem(), props, @@ -233,7 +232,7 @@ public class ShardTransactionFailureTest extends AbstractActorTest { final ActorRef shard = createShard(); final Props props = ShardTransaction.props(store.newReadWriteTransaction(), shard, testSchemaContext, datastoreContext, shardStats, "txn", - CreateTransaction.CURRENT_VERSION); + DataStoreVersions.CURRENT_VERSION); final TestActorRef subject = TestActorRef .create(getSystem(), props, "testNegativeMergeTransactionReady"); @@ -266,7 +265,7 @@ public class ShardTransactionFailureTest extends AbstractActorTest { final ActorRef shard = createShard(); final Props props = ShardTransaction.props(store.newReadWriteTransaction(), shard, testSchemaContext, datastoreContext, shardStats, "txn", - CreateTransaction.CURRENT_VERSION); + DataStoreVersions.CURRENT_VERSION); final TestActorRef subject = TestActorRef .create(getSystem(), props, diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTransactionHeliumBackwardsCompatibilityTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTransactionHeliumBackwardsCompatibilityTest.java index af07aeebcf..58cec67a2d 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTransactionHeliumBackwardsCompatibilityTest.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTransactionHeliumBackwardsCompatibilityTest.java @@ -7,6 +7,12 @@ */ package org.opendaylight.controller.cluster.datastore; +import akka.actor.ActorRef; +import akka.actor.ActorSelection; +import akka.actor.PoisonPill; +import akka.actor.Props; +import akka.dispatch.Dispatchers; +import akka.testkit.TestActorRef; import java.util.Collections; import org.junit.Assert; import org.junit.Test; @@ -27,12 +33,6 @@ import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode; import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes; import org.opendaylight.yangtools.yang.model.api.SchemaContext; import scala.concurrent.duration.FiniteDuration; -import akka.actor.ActorRef; -import akka.actor.ActorSelection; -import akka.actor.PoisonPill; -import akka.actor.Props; -import akka.dispatch.Dispatchers; -import akka.testkit.TestActorRef; /** * Tests backwards compatibility support from Helium-1 to Helium. @@ -46,6 +46,7 @@ import akka.testkit.TestActorRef; */ public class ShardTransactionHeliumBackwardsCompatibilityTest extends AbstractActorTest { + @SuppressWarnings("unchecked") @Test public void testTransactionCommit() throws Exception { new ShardTestKit(getSystem()) {{ @@ -78,9 +79,10 @@ public class ShardTransactionHeliumBackwardsCompatibilityTest extends AbstractAc // Write data to the Tx txActor.tell(new WriteData(TestModel.TEST_PATH, - ImmutableNodes.containerNode(TestModel.TEST_QNAME), schemaContext), getRef()); + ImmutableNodes.containerNode(TestModel.TEST_QNAME)).toSerializable( + DataStoreVersions.BASE_HELIUM_VERSION), getRef()); - expectMsgClass(duration, WriteDataReply.class); + expectMsgClass(duration, ShardTransactionMessages.WriteDataReply.class); // Ready the Tx @@ -151,7 +153,7 @@ public class ShardTransactionHeliumBackwardsCompatibilityTest extends AbstractAc // Write data to the Tx txActor.tell(new WriteData(TestModel.TEST_PATH, - ImmutableNodes.containerNode(TestModel.TEST_QNAME), schemaContext), getRef()); + ImmutableNodes.containerNode(TestModel.TEST_QNAME)), getRef()); expectMsgClass(duration, WriteDataReply.class); diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTransactionTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTransactionTest.java index f5af93d584..79480ce592 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTransactionTest.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTransactionTest.java @@ -20,7 +20,6 @@ import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shard.ShardStats; import org.opendaylight.controller.cluster.datastore.messages.CloseTransaction; import org.opendaylight.controller.cluster.datastore.messages.CloseTransactionReply; -import org.opendaylight.controller.cluster.datastore.messages.CreateTransaction; import org.opendaylight.controller.cluster.datastore.messages.DataExists; import org.opendaylight.controller.cluster.datastore.messages.DataExistsReply; import org.opendaylight.controller.cluster.datastore.messages.DeleteData; @@ -38,6 +37,8 @@ import org.opendaylight.controller.cluster.datastore.modification.DeleteModifica import org.opendaylight.controller.cluster.datastore.modification.MergeModification; import org.opendaylight.controller.cluster.datastore.modification.Modification; import org.opendaylight.controller.cluster.datastore.modification.WriteModification; +import org.opendaylight.controller.cluster.datastore.node.NormalizedNodeToNodeCodec; +import org.opendaylight.controller.cluster.datastore.node.NormalizedNodeToNodeCodec.Encoded; import org.opendaylight.controller.md.cluster.datastore.model.TestModel; import org.opendaylight.controller.md.sal.dom.store.impl.InMemoryDOMDataStore; import org.opendaylight.controller.protobuff.messages.transaction.ShardTransactionMessages; @@ -76,13 +77,13 @@ public class ShardTransactionTest extends AbstractActorTest { final ActorRef shard = createShard(); Props props = ShardTransaction.props(store.newReadOnlyTransaction(), shard, testSchemaContext, datastoreContext, shardStats, "txn", - CreateTransaction.CURRENT_VERSION); + DataStoreVersions.CURRENT_VERSION); testOnReceiveReadData(getSystem().actorOf(props, "testReadDataRO")); props = ShardTransaction.props(store.newReadWriteTransaction(), shard, testSchemaContext, datastoreContext, shardStats, "txn", - CreateTransaction.CURRENT_VERSION); + DataStoreVersions.CURRENT_VERSION); testOnReceiveReadData(getSystem().actorOf(props, "testReadDataRW")); } @@ -92,12 +93,10 @@ public class ShardTransactionTest extends AbstractActorTest { transaction.tell(new ReadData(YangInstanceIdentifier.builder().build()).toSerializable(), getRef()); - ShardTransactionMessages.ReadDataReply replySerialized = - expectMsgClass(duration("5 seconds"), ReadDataReply.SERIALIZABLE_CLASS); + Object replySerialized = + expectMsgClass(duration("5 seconds"), ReadDataReply.SERIALIZABLE_CLASS); - assertNotNull(ReadDataReply.fromSerializable( - testSchemaContext,YangInstanceIdentifier.builder().build(), replySerialized) - .getNormalizedNode()); + assertNotNull(ReadDataReply.fromSerializable(replySerialized).getNormalizedNode()); // unserialized read transaction.tell(new ReadData(YangInstanceIdentifier.builder().build()),getRef()); @@ -114,14 +113,14 @@ public class ShardTransactionTest extends AbstractActorTest { final ActorRef shard = createShard(); Props props = ShardTransaction.props( store.newReadOnlyTransaction(), shard, testSchemaContext, datastoreContext, shardStats, "txn", - CreateTransaction.CURRENT_VERSION); + DataStoreVersions.CURRENT_VERSION); testOnReceiveReadDataWhenDataNotFound(getSystem().actorOf( props, "testReadDataWhenDataNotFoundRO")); props = ShardTransaction.props( store.newReadWriteTransaction(), shard, testSchemaContext, datastoreContext, shardStats, "txn", - CreateTransaction.CURRENT_VERSION); + DataStoreVersions.CURRENT_VERSION); testOnReceiveReadDataWhenDataNotFound(getSystem().actorOf( props, "testReadDataWhenDataNotFoundRW")); @@ -131,11 +130,10 @@ public class ShardTransactionTest extends AbstractActorTest { // serialized read transaction.tell(new ReadData(TestModel.TEST_PATH).toSerializable(), getRef()); - ShardTransactionMessages.ReadDataReply replySerialized = - expectMsgClass(duration("5 seconds"), ReadDataReply.SERIALIZABLE_CLASS); + Object replySerialized = + expectMsgClass(duration("5 seconds"), ReadDataReply.SERIALIZABLE_CLASS); - assertTrue(ReadDataReply.fromSerializable( - testSchemaContext, TestModel.TEST_PATH, replySerialized).getNormalizedNode() == null); + assertTrue(ReadDataReply.fromSerializable(replySerialized).getNormalizedNode() == null); // unserialized read transaction.tell(new ReadData(TestModel.TEST_PATH),getRef()); @@ -146,19 +144,39 @@ public class ShardTransactionTest extends AbstractActorTest { }}; } + @Test + public void testOnReceiveReadDataHeliumR1() throws Exception { + new JavaTestKit(getSystem()) {{ + final ActorRef shard = createShard(); + Props props = ShardTransaction.props(store.newReadOnlyTransaction(), shard, + testSchemaContext, datastoreContext, shardStats, "txn", + DataStoreVersions.HELIUM_1_VERSION); + + ActorRef transaction = getSystem().actorOf(props, "testOnReceiveReadDataHeliumR1"); + + transaction.tell(new ReadData(YangInstanceIdentifier.builder().build()).toSerializable(), + getRef()); + + ShardTransactionMessages.ReadDataReply replySerialized = + expectMsgClass(duration("5 seconds"), ShardTransactionMessages.ReadDataReply.class); + + assertNotNull(ReadDataReply.fromSerializable(replySerialized).getNormalizedNode()); + }}; + } + @Test public void testOnReceiveDataExistsPositive() throws Exception { new JavaTestKit(getSystem()) {{ final ActorRef shard = createShard(); Props props = ShardTransaction.props(store.newReadOnlyTransaction(), shard, testSchemaContext, datastoreContext, shardStats, "txn", - CreateTransaction.CURRENT_VERSION); + DataStoreVersions.CURRENT_VERSION); testOnReceiveDataExistsPositive(getSystem().actorOf(props, "testDataExistsPositiveRO")); props = ShardTransaction.props(store.newReadWriteTransaction(), shard, testSchemaContext, datastoreContext, shardStats, "txn", - CreateTransaction.CURRENT_VERSION); + DataStoreVersions.CURRENT_VERSION); testOnReceiveDataExistsPositive(getSystem().actorOf(props, "testDataExistsPositiveRW")); } @@ -187,13 +205,13 @@ public class ShardTransactionTest extends AbstractActorTest { final ActorRef shard = createShard(); Props props = ShardTransaction.props(store.newReadOnlyTransaction(), shard, testSchemaContext, datastoreContext, shardStats, "txn", - CreateTransaction.CURRENT_VERSION); + DataStoreVersions.CURRENT_VERSION); testOnReceiveDataExistsNegative(getSystem().actorOf(props, "testDataExistsNegativeRO")); props = ShardTransaction.props(store.newReadWriteTransaction(), shard, testSchemaContext, datastoreContext, shardStats, "txn", - CreateTransaction.CURRENT_VERSION); + DataStoreVersions.CURRENT_VERSION); testOnReceiveDataExistsNegative(getSystem().actorOf(props, "testDataExistsNegativeRW")); } @@ -234,39 +252,61 @@ public class ShardTransactionTest extends AbstractActorTest { final ActorRef shard = createShard(); final Props props = ShardTransaction.props(store.newWriteOnlyTransaction(), shard, testSchemaContext, datastoreContext, shardStats, "txn", - CreateTransaction.CURRENT_VERSION); - final ActorRef transaction = getSystem().actorOf(props, "testWriteData"); + DataStoreVersions.CURRENT_VERSION); + final ActorRef transaction = getSystem().actorOf(props, "testOnReceiveWriteData"); transaction.tell(new WriteData(TestModel.TEST_PATH, - ImmutableNodes.containerNode(TestModel.TEST_QNAME), TestModel.createTestContext()).toSerializable(), - getRef()); + ImmutableNodes.containerNode(TestModel.TEST_QNAME)).toSerializable( + DataStoreVersions.HELIUM_2_VERSION), getRef()); expectMsgClass(duration("5 seconds"), ShardTransactionMessages.WriteDataReply.class); assertModification(transaction, WriteModification.class); - //unserialized write + // unserialized write transaction.tell(new WriteData(TestModel.TEST_PATH, - ImmutableNodes.containerNode(TestModel.TEST_QNAME), - TestModel.createTestContext()), + ImmutableNodes.containerNode(TestModel.TEST_QNAME)), getRef()); expectMsgClass(duration("5 seconds"), WriteDataReply.class); }}; } + @Test + public void testOnReceiveHeliumR1WriteData() throws Exception { + new JavaTestKit(getSystem()) {{ + final ActorRef shard = createShard(); + final Props props = ShardTransaction.props(store.newWriteOnlyTransaction(), shard, + testSchemaContext, datastoreContext, shardStats, "txn", + DataStoreVersions.HELIUM_1_VERSION); + final ActorRef transaction = getSystem().actorOf(props, "testOnReceiveHeliumR1WriteData"); + + Encoded encoded = new NormalizedNodeToNodeCodec(null).encode(TestModel.TEST_PATH, + ImmutableNodes.containerNode(TestModel.TEST_QNAME)); + ShardTransactionMessages.WriteData serialized = ShardTransactionMessages.WriteData.newBuilder() + .setInstanceIdentifierPathArguments(encoded.getEncodedPath()) + .setNormalizedNode(encoded.getEncodedNode().getNormalizedNode()).build(); + + transaction.tell(serialized, getRef()); + + expectMsgClass(duration("5 seconds"), ShardTransactionMessages.WriteDataReply.class); + + assertModification(transaction, WriteModification.class); + }}; + } + @Test public void testOnReceiveMergeData() throws Exception { new JavaTestKit(getSystem()) {{ final ActorRef shard = createShard(); final Props props = ShardTransaction.props(store.newReadWriteTransaction(), shard, testSchemaContext, datastoreContext, shardStats, "txn", - CreateTransaction.CURRENT_VERSION); + DataStoreVersions.CURRENT_VERSION); final ActorRef transaction = getSystem().actorOf(props, "testMergeData"); transaction.tell(new MergeData(TestModel.TEST_PATH, - ImmutableNodes.containerNode(TestModel.TEST_QNAME), testSchemaContext).toSerializable(), - getRef()); + ImmutableNodes.containerNode(TestModel.TEST_QNAME)).toSerializable( + DataStoreVersions.HELIUM_2_VERSION), getRef()); expectMsgClass(duration("5 seconds"), ShardTransactionMessages.MergeDataReply.class); @@ -274,20 +314,43 @@ public class ShardTransactionTest extends AbstractActorTest { //unserialized merge transaction.tell(new MergeData(TestModel.TEST_PATH, - ImmutableNodes.containerNode(TestModel.TEST_QNAME), testSchemaContext), + ImmutableNodes.containerNode(TestModel.TEST_QNAME)), getRef()); expectMsgClass(duration("5 seconds"), MergeDataReply.class); }}; } + @Test + public void testOnReceiveHeliumR1MergeData() throws Exception { + new JavaTestKit(getSystem()) {{ + final ActorRef shard = createShard(); + final Props props = ShardTransaction.props(store.newWriteOnlyTransaction(), shard, + testSchemaContext, datastoreContext, shardStats, "txn", + DataStoreVersions.HELIUM_1_VERSION); + final ActorRef transaction = getSystem().actorOf(props, "testOnReceiveHeliumR1MergeData"); + + Encoded encoded = new NormalizedNodeToNodeCodec(null).encode(TestModel.TEST_PATH, + ImmutableNodes.containerNode(TestModel.TEST_QNAME)); + ShardTransactionMessages.MergeData serialized = ShardTransactionMessages.MergeData.newBuilder() + .setInstanceIdentifierPathArguments(encoded.getEncodedPath()) + .setNormalizedNode(encoded.getEncodedNode().getNormalizedNode()).build(); + + transaction.tell(serialized, getRef()); + + expectMsgClass(duration("5 seconds"), ShardTransactionMessages.MergeDataReply.class); + + assertModification(transaction, MergeModification.class); + }}; + } + @Test public void testOnReceiveDeleteData() throws Exception { new JavaTestKit(getSystem()) {{ final ActorRef shard = createShard(); final Props props = ShardTransaction.props( store.newWriteOnlyTransaction(), shard, testSchemaContext, datastoreContext, shardStats, "txn", - CreateTransaction.CURRENT_VERSION); + DataStoreVersions.CURRENT_VERSION); final ActorRef transaction = getSystem().actorOf(props, "testDeleteData"); transaction.tell(new DeleteData(TestModel.TEST_PATH).toSerializable(), getRef()); @@ -310,7 +373,7 @@ public class ShardTransactionTest extends AbstractActorTest { final ActorRef shard = createShard(); final Props props = ShardTransaction.props( store.newReadWriteTransaction(), shard, testSchemaContext, datastoreContext, shardStats, "txn", - CreateTransaction.CURRENT_VERSION); + DataStoreVersions.CURRENT_VERSION); final ActorRef transaction = getSystem().actorOf(props, "testReadyTransaction"); watch(transaction); @@ -328,7 +391,7 @@ public class ShardTransactionTest extends AbstractActorTest { final ActorRef shard = createShard(); final Props props = ShardTransaction.props( store.newReadWriteTransaction(), shard, testSchemaContext, datastoreContext, shardStats, "txn", - CreateTransaction.CURRENT_VERSION); + DataStoreVersions.CURRENT_VERSION); final ActorRef transaction = getSystem().actorOf(props, "testReadyTransaction2"); watch(transaction); @@ -343,13 +406,14 @@ public class ShardTransactionTest extends AbstractActorTest { } + @SuppressWarnings("unchecked") @Test public void testOnReceiveCloseTransaction() throws Exception { new JavaTestKit(getSystem()) {{ final ActorRef shard = createShard(); final Props props = ShardTransaction.props(store.newReadWriteTransaction(), shard, testSchemaContext, datastoreContext, shardStats, "txn", - CreateTransaction.CURRENT_VERSION); + DataStoreVersions.CURRENT_VERSION); final ActorRef transaction = getSystem().actorOf(props, "testCloseTransaction"); watch(transaction); @@ -366,7 +430,7 @@ public class ShardTransactionTest extends AbstractActorTest { final ActorRef shard = createShard(); final Props props = ShardTransaction.props(store.newReadOnlyTransaction(), shard, testSchemaContext, datastoreContext, shardStats, "txn", - CreateTransaction.CURRENT_VERSION); + DataStoreVersions.CURRENT_VERSION); final TestActorRef transaction = TestActorRef.apply(props,getSystem()); transaction.receive(new DeleteData(TestModel.TEST_PATH).toSerializable(), ActorRef.noSender()); @@ -382,7 +446,7 @@ public class ShardTransactionTest extends AbstractActorTest { final ActorRef shard = createShard(); final Props props = ShardTransaction.props(store.newReadWriteTransaction(), shard, testSchemaContext, datastoreContext, shardStats, "txn", - CreateTransaction.CURRENT_VERSION); + DataStoreVersions.CURRENT_VERSION); final ActorRef transaction = getSystem().actorOf(props, "testShardTransactionInactivity"); diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ThreePhaseCommitCohortProxyTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ThreePhaseCommitCohortProxyTest.java index 46060dda2a..75c93dd5d2 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ThreePhaseCommitCohortProxyTest.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ThreePhaseCommitCohortProxyTest.java @@ -1,11 +1,21 @@ package org.opendaylight.controller.cluster.datastore; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.fail; +import static org.mockito.Matchers.any; +import static org.mockito.Matchers.isA; +import static org.mockito.Mockito.doReturn; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; import akka.actor.ActorPath; import akka.actor.ActorSelection; import akka.actor.Props; import akka.dispatch.Futures; import com.google.common.collect.Lists; import com.google.common.util.concurrent.ListenableFuture; +import java.util.List; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; import org.junit.Before; import org.junit.Test; import org.mockito.Mock; @@ -24,18 +34,6 @@ import org.opendaylight.controller.cluster.datastore.utils.ActorContext; import org.opendaylight.controller.cluster.datastore.utils.DoNothingActor; import scala.concurrent.Future; -import java.util.List; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.TimeUnit; - -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.fail; -import static org.mockito.Mockito.any; -import static org.mockito.Mockito.doReturn; -import static org.mockito.Mockito.isA; -import static org.mockito.Mockito.times; -import static org.mockito.Mockito.verify; - public class ThreePhaseCommitCohortProxyTest extends AbstractActorTest { @SuppressWarnings("serial") @@ -112,14 +110,14 @@ public class ThreePhaseCommitCohortProxyTest extends AbstractActorTest { ThreePhaseCommitCohortProxy proxy = setupProxy(1); setupMockActorContext(CanCommitTransaction.SERIALIZABLE_CLASS, - new CanCommitTransactionReply(true)); + CanCommitTransactionReply.YES); ListenableFuture future = proxy.canCommit(); assertEquals("canCommit", true, future.get(5, TimeUnit.SECONDS)); setupMockActorContext(CanCommitTransaction.SERIALIZABLE_CLASS, - new CanCommitTransactionReply(false)); + CanCommitTransactionReply.NO); future = proxy.canCommit(); @@ -134,7 +132,7 @@ public class ThreePhaseCommitCohortProxyTest extends AbstractActorTest { ThreePhaseCommitCohortProxy proxy = setupProxy(2); setupMockActorContext(CanCommitTransaction.SERIALIZABLE_CLASS, - new CanCommitTransactionReply(true), new CanCommitTransactionReply(true)); + CanCommitTransactionReply.YES, CanCommitTransactionReply.YES); ListenableFuture future = proxy.canCommit(); @@ -149,8 +147,7 @@ public class ThreePhaseCommitCohortProxyTest extends AbstractActorTest { ThreePhaseCommitCohortProxy proxy = setupProxy(3); setupMockActorContext(CanCommitTransaction.SERIALIZABLE_CLASS, - new CanCommitTransactionReply(true), new CanCommitTransactionReply(false), - new CanCommitTransactionReply(true)); + CanCommitTransactionReply.YES, CanCommitTransactionReply.NO, CanCommitTransactionReply.YES); ListenableFuture future = proxy.canCommit(); @@ -289,7 +286,7 @@ public class ThreePhaseCommitCohortProxyTest extends AbstractActorTest { ThreePhaseCommitCohortProxy proxy = setupProxy(2); setupMockActorContext(CanCommitTransaction.SERIALIZABLE_CLASS, - new CanCommitTransactionReply(true), new CanCommitTransactionReply(true)); + CanCommitTransactionReply.YES, CanCommitTransactionReply.YES); setupMockActorContext(PreCommitTransaction.SERIALIZABLE_CLASS, new PreCommitTransactionReply(), new PreCommitTransactionReply()); diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/TransactionProxyTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/TransactionProxyTest.java index 7407897dfa..5e53b29db1 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/TransactionProxyTest.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/TransactionProxyTest.java @@ -1,5 +1,20 @@ package org.opendaylight.controller.cluster.datastore; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; +import static org.mockito.Matchers.any; +import static org.mockito.Matchers.anyString; +import static org.mockito.Matchers.argThat; +import static org.mockito.Matchers.eq; +import static org.mockito.Matchers.isA; +import static org.mockito.Mockito.doReturn; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.opendaylight.controller.cluster.datastore.TransactionProxy.TransactionType.READ_ONLY; +import static org.opendaylight.controller.cluster.datastore.TransactionProxy.TransactionType.READ_WRITE; +import static org.opendaylight.controller.cluster.datastore.TransactionProxy.TransactionType.WRITE_ONLY; import akka.actor.ActorRef; import akka.actor.ActorSelection; import akka.actor.ActorSystem; @@ -11,12 +26,16 @@ import com.google.common.collect.ImmutableMap; import com.google.common.util.concurrent.CheckedFuture; import com.typesafe.config.Config; import com.typesafe.config.ConfigFactory; +import java.io.IOException; +import java.util.List; +import java.util.concurrent.TimeUnit; import org.junit.AfterClass; import org.junit.Before; import org.junit.BeforeClass; import org.junit.Test; import org.mockito.ArgumentMatcher; import org.mockito.Mock; +import org.mockito.Mockito; import org.mockito.MockitoAnnotations; import org.opendaylight.controller.cluster.datastore.TransactionProxy.TransactionType; import org.opendaylight.controller.cluster.datastore.exceptions.PrimaryNotFoundException; @@ -42,6 +61,7 @@ import org.opendaylight.controller.cluster.datastore.utils.DoNothingActor; import org.opendaylight.controller.cluster.datastore.utils.MockConfiguration; import org.opendaylight.controller.md.cluster.datastore.model.TestModel; import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException; +import org.opendaylight.controller.protobuff.messages.transaction.ShardTransactionMessages; import org.opendaylight.controller.protobuff.messages.transaction.ShardTransactionMessages.CreateTransactionReply; import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort; import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode; @@ -50,24 +70,6 @@ import org.opendaylight.yangtools.yang.model.api.SchemaContext; import scala.concurrent.Await; import scala.concurrent.Future; import scala.concurrent.duration.Duration; -import java.io.IOException; -import java.util.List; -import java.util.concurrent.TimeUnit; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertNotNull; -import static org.junit.Assert.assertTrue; -import static org.junit.Assert.fail; -import static org.mockito.Matchers.any; -import static org.mockito.Matchers.anyString; -import static org.mockito.Mockito.argThat; -import static org.mockito.Mockito.doReturn; -import static org.mockito.Mockito.eq; -import static org.mockito.Mockito.isA; -import static org.mockito.Mockito.times; -import static org.mockito.Mockito.verify; -import static org.opendaylight.controller.cluster.datastore.TransactionProxy.TransactionType.READ_ONLY; -import static org.opendaylight.controller.cluster.datastore.TransactionProxy.TransactionType.READ_WRITE; -import static org.opendaylight.controller.cluster.datastore.TransactionProxy.TransactionType.WRITE_ONLY; @SuppressWarnings("resource") public class TransactionProxyTest { @@ -137,9 +139,13 @@ public class TransactionProxyTest { ArgumentMatcher matcher = new ArgumentMatcher() { @Override public boolean matches(Object argument) { - CreateTransaction obj = CreateTransaction.fromSerializable(argument); - return obj.getTransactionId().startsWith(memberName) && - obj.getTransactionType() == type.ordinal(); + if(CreateTransaction.SERIALIZABLE_CLASS.equals(argument.getClass())) { + CreateTransaction obj = CreateTransaction.fromSerializable(argument); + return obj.getTransactionId().startsWith(memberName) && + obj.getTransactionType() == type.ordinal(); + } + + return false; } }; @@ -195,16 +201,25 @@ public class TransactionProxyTest { } private WriteData eqSerializedWriteData(final NormalizedNode nodeToWrite) { + return eqSerializedWriteData(nodeToWrite, DataStoreVersions.CURRENT_VERSION); + } + + private WriteData eqSerializedWriteData(final NormalizedNode nodeToWrite, + final int transactionVersion) { ArgumentMatcher matcher = new ArgumentMatcher() { @Override public boolean matches(Object argument) { - if(!WriteData.SERIALIZABLE_CLASS.equals(argument.getClass())) { - return false; + if((transactionVersion >= DataStoreVersions.LITHIUM_VERSION && + WriteData.SERIALIZABLE_CLASS.equals(argument.getClass())) || + (transactionVersion < DataStoreVersions.LITHIUM_VERSION && + ShardTransactionMessages.WriteData.class.equals(argument.getClass()))) { + + WriteData obj = WriteData.fromSerializable(argument); + return obj.getPath().equals(TestModel.TEST_PATH) && + obj.getData().equals(nodeToWrite); } - WriteData obj = WriteData.fromSerializable(argument, schemaContext); - return obj.getPath().equals(TestModel.TEST_PATH) && - obj.getData().equals(nodeToWrite); + return false; } }; @@ -228,16 +243,25 @@ public class TransactionProxyTest { } private MergeData eqSerializedMergeData(final NormalizedNode nodeToWrite) { + return eqSerializedMergeData(nodeToWrite, DataStoreVersions.CURRENT_VERSION); + } + + private MergeData eqSerializedMergeData(final NormalizedNode nodeToWrite, + final int transactionVersion) { ArgumentMatcher matcher = new ArgumentMatcher() { @Override public boolean matches(Object argument) { - if(!MergeData.SERIALIZABLE_CLASS.equals(argument.getClass())) { - return false; + if((transactionVersion >= DataStoreVersions.LITHIUM_VERSION && + MergeData.SERIALIZABLE_CLASS.equals(argument.getClass())) || + (transactionVersion < DataStoreVersions.LITHIUM_VERSION && + ShardTransactionMessages.MergeData.class.equals(argument.getClass()))) { + + MergeData obj = MergeData.fromSerializable(argument); + return obj.getPath().equals(TestModel.TEST_PATH) && + obj.getData().equals(nodeToWrite); } - MergeData obj = MergeData.fromSerializable(argument, schemaContext); - return obj.getPath().equals(TestModel.TEST_PATH) && - obj.getData().equals(nodeToWrite); + return false; } }; @@ -293,13 +317,17 @@ public class TransactionProxyTest { return Futures.successful((Object)new ReadyTransactionReply(path)); } + private Future readSerializedDataReply(NormalizedNode data, + short transactionVersion) { + return Futures.successful(new ReadDataReply(data).toSerializable(transactionVersion)); + } private Future readSerializedDataReply(NormalizedNode data) { - return Futures.successful(new ReadDataReply(schemaContext, data).toSerializable()); + return readSerializedDataReply(data, DataStoreVersions.CURRENT_VERSION); } private Future readDataReply(NormalizedNode data) { - return Futures.successful(new ReadDataReply(schemaContext, data)); + return Futures.successful(new ReadDataReply(data)); } private Future dataExistsSerializedReply(boolean exists) { @@ -310,16 +338,24 @@ public class TransactionProxyTest { return Futures.successful(new DataExistsReply(exists)); } + private Future writeSerializedDataReply(short version) { + return Futures.successful(new WriteDataReply().toSerializable(version)); + } + private Future writeSerializedDataReply() { - return Futures.successful(new WriteDataReply().toSerializable()); + return writeSerializedDataReply(DataStoreVersions.CURRENT_VERSION); } private Future writeDataReply() { return Futures.successful(new WriteDataReply()); } + private Future mergeSerializedDataReply(short version) { + return Futures.successful(new MergeDataReply().toSerializable(version)); + } + private Future mergeSerializedDataReply() { - return Futures.successful(new MergeDataReply().toSerializable()); + return mergeSerializedDataReply(DataStoreVersions.CURRENT_VERSION); } private Future mergeDataReply() { @@ -346,7 +382,8 @@ public class TransactionProxyTest { .build(); } - private ActorRef setupActorContextWithInitialCreateTransaction(ActorSystem actorSystem, TransactionType type, int transactionVersion) { + private ActorRef setupActorContextWithInitialCreateTransaction(ActorSystem actorSystem, + TransactionType type, int transactionVersion) { ActorRef actorRef = actorSystem.actorOf(Props.create(DoNothingActor.class)); doReturn(actorSystem.actorSelection(actorRef.path())). when(mockActorContext).actorSelection(actorRef.path().toString()); @@ -358,13 +395,11 @@ public class TransactionProxyTest { executeOperationAsync(eq(actorSystem.actorSelection(actorRef.path())), eqCreateTransaction(memberName, type)); - doReturn(false).when(mockActorContext).isPathLocal(actorRef.path().toString()); - return actorRef; } private ActorRef setupActorContextWithInitialCreateTransaction(ActorSystem actorSystem, TransactionType type) { - return setupActorContextWithInitialCreateTransaction(actorSystem, type, CreateTransaction.CURRENT_VERSION); + return setupActorContextWithInitialCreateTransaction(actorSystem, type, DataStoreVersions.CURRENT_VERSION); } @@ -718,7 +753,7 @@ public class TransactionProxyTest { eq(actorSelection(actorRef)), eqSerializedWriteData(nodeToWrite)); verifyRecordingOperationFutures(transactionProxy.getRecordedOperationFutures(), - WriteDataReply.SERIALIZABLE_CLASS); + WriteDataReply.class); } @Test(expected=IllegalStateException.class) @@ -760,7 +795,7 @@ public class TransactionProxyTest { eq(actorSelection(actorRef)), eqSerializedMergeData(nodeToWrite)); verifyRecordingOperationFutures(transactionProxy.getRecordedOperationFutures(), - MergeDataReply.SERIALIZABLE_CLASS); + MergeDataReply.class); } @Test @@ -836,22 +871,25 @@ public class TransactionProxyTest { ThreePhaseCommitCohortProxy proxy = (ThreePhaseCommitCohortProxy) ready; verifyRecordingOperationFutures(transactionProxy.getRecordedOperationFutures(), - WriteDataReply.SERIALIZABLE_CLASS); + WriteDataReply.class); verifyCohortFutures(proxy, getSystem().actorSelection(actorRef.path())); } - @Test - public void testReadyForwardCompatibility() throws Exception { - ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_WRITE, 0); + private ActorRef testCompatibilityWithHeliumVersion(short version) throws Exception { + ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), + READ_WRITE, version); - NormalizedNode nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME); + NormalizedNode testNode = ImmutableNodes.containerNode(TestModel.TEST_QNAME); - doReturn(readSerializedDataReply(null)).when(mockActorContext).executeOperationAsync( + doReturn(readSerializedDataReply(testNode, version)).when(mockActorContext).executeOperationAsync( eq(actorSelection(actorRef)), eqSerializedReadData()); - doReturn(writeSerializedDataReply()).when(mockActorContext).executeOperationAsync( - eq(actorSelection(actorRef)), eqSerializedWriteData(nodeToWrite)); + doReturn(writeSerializedDataReply(version)).when(mockActorContext).executeOperationAsync( + eq(actorSelection(actorRef)), eqSerializedWriteData(testNode, version)); + + doReturn(mergeSerializedDataReply(version)).when(mockActorContext).executeOperationAsync( + eq(actorSelection(actorRef)), eqSerializedMergeData(testNode, version)); doReturn(readySerializedTxReply(actorRef.path().toString())).when(mockActorContext).executeOperationAsync( eq(actorSelection(actorRef)), isA(ReadyTransaction.SERIALIZABLE_CLASS)); @@ -859,12 +897,17 @@ public class TransactionProxyTest { doReturn(actorRef.path().toString()).when(mockActorContext).resolvePath(eq(actorRef.path().toString()), eq(actorRef.path().toString())); - TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, - READ_WRITE); + TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, READ_WRITE); - transactionProxy.read(TestModel.TEST_PATH); + Optional> readOptional = transactionProxy.read(TestModel.TEST_PATH). + get(5, TimeUnit.SECONDS); - transactionProxy.write(TestModel.TEST_PATH, nodeToWrite); + assertEquals("NormalizedNode isPresent", true, readOptional.isPresent()); + assertEquals("Response NormalizedNode", testNode, readOptional.get()); + + transactionProxy.write(TestModel.TEST_PATH, testNode); + + transactionProxy.merge(TestModel.TEST_PATH, testNode); DOMStoreThreePhaseCommitCohort ready = transactionProxy.ready(); @@ -873,14 +916,29 @@ public class TransactionProxyTest { ThreePhaseCommitCohortProxy proxy = (ThreePhaseCommitCohortProxy) ready; verifyRecordingOperationFutures(transactionProxy.getRecordedOperationFutures(), - WriteDataReply.SERIALIZABLE_CLASS); + ShardTransactionMessages.WriteDataReply.class, ShardTransactionMessages.MergeDataReply.class); verifyCohortFutures(proxy, getSystem().actorSelection(actorRef.path())); + return actorRef; + } + + @Test + public void testCompatibilityWithBaseHeliumVersion() throws Exception { + ActorRef actorRef = testCompatibilityWithHeliumVersion(DataStoreVersions.BASE_HELIUM_VERSION); + verify(mockActorContext).resolvePath(eq(actorRef.path().toString()), eq(actorRef.path().toString())); } + @Test + public void testCompatibilityWithHeliumR1Version() throws Exception { + ActorRef actorRef = testCompatibilityWithHeliumVersion(DataStoreVersions.HELIUM_1_VERSION); + + verify(mockActorContext, Mockito.never()).resolvePath(eq(actorRef.path().toString()), + eq(actorRef.path().toString())); + } + @Test public void testReadyWithRecordingOperationFailure() throws Exception { ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), WRITE_ONLY); @@ -914,7 +972,7 @@ public class TransactionProxyTest { verifyCohortFutures(proxy, TestException.class); verifyRecordingOperationFutures(transactionProxy.getRecordedOperationFutures(), - MergeDataReply.SERIALIZABLE_CLASS, TestException.class); + MergeDataReply.class, TestException.class); } @Test @@ -942,7 +1000,7 @@ public class TransactionProxyTest { ThreePhaseCommitCohortProxy proxy = (ThreePhaseCommitCohortProxy) ready; verifyRecordingOperationFutures(transactionProxy.getRecordedOperationFutures(), - MergeDataReply.SERIALIZABLE_CLASS); + MergeDataReply.class); verifyCohortFutures(proxy, TestException.class); } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/messages/DataChangedTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/messages/DataChangedTest.java new file mode 100644 index 0000000000..2c1de7f1e3 --- /dev/null +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/messages/DataChangedTest.java @@ -0,0 +1,57 @@ +/* + * Copyright (c) 2014 Brocade Communications Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.controller.cluster.datastore.messages; + +import static org.junit.Assert.assertEquals; +import org.apache.commons.lang.SerializationUtils; +import org.junit.Test; +import org.opendaylight.controller.md.cluster.datastore.model.TestModel; +import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker.DataChangeScope; +import org.opendaylight.controller.md.sal.dom.store.impl.DOMImmutableDataChangeEvent; +import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; +import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes; +import org.opendaylight.yangtools.yang.data.impl.schema.builder.impl.ImmutableContainerNodeBuilder; + +/** + * Unit tests for DataChanged. + * + * @author Thomas Pantelis + */ +public class DataChangedTest { + + @Test + public void testSerialization() { + DOMImmutableDataChangeEvent change = DOMImmutableDataChangeEvent.builder(DataChangeScope.SUBTREE). + addCreated(TestModel.TEST_PATH, ImmutableContainerNodeBuilder.create().withNodeIdentifier( + new YangInstanceIdentifier.NodeIdentifier(TestModel.TEST_QNAME)). + withChild(ImmutableNodes.leafNode(TestModel.DESC_QNAME, "foo")).build()). + addUpdated(TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME), + ImmutableContainerNodeBuilder.create().withNodeIdentifier( + new YangInstanceIdentifier.NodeIdentifier(TestModel.TEST_QNAME)). + withChild(ImmutableNodes.leafNode(TestModel.NAME_QNAME, "bar")).build()) +. + addRemoved(TestModel.OUTER_LIST_PATH, + ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build()). + setBefore(ImmutableNodes.containerNode(TestModel.TEST_QNAME)). + setAfter(ImmutableContainerNodeBuilder.create().withNodeIdentifier( + new YangInstanceIdentifier.NodeIdentifier(TestModel.TEST_QNAME)). + withChild(ImmutableNodes.leafNode(TestModel.DESC_QNAME, "foo")). + withChild(ImmutableNodes.leafNode(TestModel.NAME_QNAME, "bar")).build()).build(); + + DataChanged expected = new DataChanged(change); + + DataChanged actual = (DataChanged) SerializationUtils.clone(expected); + + assertEquals("getCreatedData", change.getCreatedData(), actual.getChange().getCreatedData()); + assertEquals("getOriginalData", change.getOriginalData(), actual.getChange().getOriginalData()); + assertEquals("getOriginalSubtree", change.getOriginalSubtree(), actual.getChange().getOriginalSubtree()); + assertEquals("getRemovedPaths", change.getRemovedPaths(), actual.getChange().getRemovedPaths()); + assertEquals("getUpdatedData", change.getUpdatedData(), actual.getChange().getUpdatedData()); + assertEquals("getUpdatedSubtree", change.getUpdatedSubtree(), actual.getChange().getUpdatedSubtree()); + } +} diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/messages/MergeDataTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/messages/MergeDataTest.java index 8f3ca9c535..5b40afdff8 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/messages/MergeDataTest.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/messages/MergeDataTest.java @@ -1,21 +1,70 @@ package org.opendaylight.controller.cluster.datastore.messages; -import org.junit.Assert; +import static org.junit.Assert.assertEquals; +import java.io.Serializable; +import org.apache.commons.lang.SerializationUtils; import org.junit.Test; +import org.opendaylight.controller.cluster.datastore.DataStoreVersions; import org.opendaylight.controller.md.cluster.datastore.model.TestModel; +import org.opendaylight.controller.protobuff.messages.common.NormalizedNodeMessages.InstanceIdentifier; +import org.opendaylight.controller.protobuff.messages.common.NormalizedNodeMessages.Node; +import org.opendaylight.controller.protobuff.messages.transaction.ShardTransactionMessages; +import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; +import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode; import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes; -import org.opendaylight.yangtools.yang.model.api.SchemaContext; +import org.opendaylight.yangtools.yang.data.impl.schema.builder.impl.ImmutableContainerNodeBuilder; public class MergeDataTest { @Test public void testSerialization() { - SchemaContext schemaContext = TestModel.createTestContext(); - MergeData expected = new MergeData(TestModel.TEST_PATH, ImmutableNodes - .containerNode(TestModel.TEST_QNAME), schemaContext); + YangInstanceIdentifier path = TestModel.TEST_PATH; + NormalizedNode data = ImmutableContainerNodeBuilder.create().withNodeIdentifier( + new YangInstanceIdentifier.NodeIdentifier(TestModel.TEST_QNAME)). + withChild(ImmutableNodes.leafNode(TestModel.DESC_QNAME, "foo")).build(); - MergeData actual = MergeData.fromSerializable(expected.toSerializable(), schemaContext); - Assert.assertEquals("getPath", expected.getPath(), actual.getPath()); - Assert.assertEquals("getData", expected.getData(), actual.getData()); + MergeData expected = new MergeData(path, data); + + Object serialized = expected.toSerializable(DataStoreVersions.CURRENT_VERSION); + assertEquals("Serialized type", MergeData.class, serialized.getClass()); + assertEquals("Version", DataStoreVersions.CURRENT_VERSION, ((MergeData)serialized).getVersion()); + + Object clone = SerializationUtils.clone((Serializable) serialized); + assertEquals("Version", DataStoreVersions.CURRENT_VERSION, ((MergeData)clone).getVersion()); + MergeData actual = MergeData.fromSerializable(clone); + assertEquals("getPath", expected.getPath(), actual.getPath()); + assertEquals("getData", expected.getData(), actual.getData()); + } + + @Test + public void testIsSerializedType() { + assertEquals("isSerializedType", true, MergeData.isSerializedType( + ShardTransactionMessages.MergeData.newBuilder() + .setInstanceIdentifierPathArguments(InstanceIdentifier.getDefaultInstance()) + .setNormalizedNode(Node.getDefaultInstance()).build())); + assertEquals("isSerializedType", true, + MergeData.isSerializedType(new MergeData())); + assertEquals("isSerializedType", false, MergeData.isSerializedType(new Object())); + } + + /** + * Tests backwards compatible serialization/deserialization of a MergeData message with the + * base and R1 Helium versions, which used the protobuff MergeData message. + */ + @Test + public void testSerializationWithHeliumR1Version() throws Exception { + YangInstanceIdentifier path = TestModel.TEST_PATH; + NormalizedNode data = ImmutableContainerNodeBuilder.create().withNodeIdentifier( + new YangInstanceIdentifier.NodeIdentifier(TestModel.TEST_QNAME)). + withChild(ImmutableNodes.leafNode(TestModel.DESC_QNAME, "foo")).build(); + + MergeData expected = new MergeData(path, data); + + Object serialized = expected.toSerializable(DataStoreVersions.HELIUM_1_VERSION); + assertEquals("Serialized type", ShardTransactionMessages.MergeData.class, serialized.getClass()); + + MergeData actual = MergeData.fromSerializable(SerializationUtils.clone((Serializable) serialized)); + assertEquals("getPath", expected.getPath(), actual.getPath()); + assertEquals("getData", expected.getData(), actual.getData()); } } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/messages/ReadDataReplyTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/messages/ReadDataReplyTest.java new file mode 100644 index 0000000000..8ce73296c1 --- /dev/null +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/messages/ReadDataReplyTest.java @@ -0,0 +1,72 @@ +/* + * Copyright (c) 2014 Brocade Communications Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.controller.cluster.datastore.messages; + +import static org.junit.Assert.assertEquals; +import java.io.Serializable; +import org.apache.commons.lang.SerializationUtils; +import org.junit.Test; +import org.opendaylight.controller.cluster.datastore.DataStoreVersions; +import org.opendaylight.controller.md.cluster.datastore.model.TestModel; +import org.opendaylight.controller.protobuff.messages.transaction.ShardTransactionMessages; +import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; +import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode; +import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes; +import org.opendaylight.yangtools.yang.data.impl.schema.builder.impl.ImmutableContainerNodeBuilder; + +/** + * Unit tests for ReadDataReply. + * + * @author Thomas Pantelis + */ +public class ReadDataReplyTest { + + @Test + public void testSerialization() { + NormalizedNode data = ImmutableContainerNodeBuilder.create().withNodeIdentifier( + new YangInstanceIdentifier.NodeIdentifier(TestModel.TEST_QNAME)). + withChild(ImmutableNodes.leafNode(TestModel.DESC_QNAME, "foo")).build(); + + ReadDataReply expected = new ReadDataReply(data); + + Object serialized = expected.toSerializable(DataStoreVersions.CURRENT_VERSION); + assertEquals("Serialized type", ReadDataReply.class, serialized.getClass()); + + ReadDataReply actual = ReadDataReply.fromSerializable(SerializationUtils.clone( + (Serializable) serialized)); + assertEquals("getNormalizedNode", expected.getNormalizedNode(), actual.getNormalizedNode()); + } + + @Test + public void testIsSerializedType() { + assertEquals("isSerializedType", true, ReadDataReply.isSerializedType( + ShardTransactionMessages.ReadDataReply.newBuilder().build())); + assertEquals("isSerializedType", true, ReadDataReply.isSerializedType(new ReadDataReply())); + assertEquals("isSerializedType", false, ReadDataReply.isSerializedType(new Object())); + } + + /** + * Tests backwards compatible serialization/deserialization of a ReadDataReply message with the + * base and R1 Helium versions, which used the protobuff ReadDataReply message. + */ + @Test + public void testSerializationWithHeliumR1Version() throws Exception { + NormalizedNode data = ImmutableContainerNodeBuilder.create().withNodeIdentifier( + new YangInstanceIdentifier.NodeIdentifier(TestModel.TEST_QNAME)). + withChild(ImmutableNodes.leafNode(TestModel.DESC_QNAME, "foo")).build(); + + ReadDataReply expected = new ReadDataReply(data); + + Object serialized = expected.toSerializable(DataStoreVersions.HELIUM_1_VERSION); + assertEquals("Serialized type", ShardTransactionMessages.ReadDataReply.class, serialized.getClass()); + + ReadDataReply actual = ReadDataReply.fromSerializable(SerializationUtils.clone( + (Serializable) serialized)); + assertEquals("getNormalizedNode", expected.getNormalizedNode(), actual.getNormalizedNode()); + } +} diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/messages/WriteDataTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/messages/WriteDataTest.java index 6a5d65f8da..90a76f229e 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/messages/WriteDataTest.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/messages/WriteDataTest.java @@ -7,11 +7,19 @@ */ package org.opendaylight.controller.cluster.datastore.messages; -import org.junit.Assert; +import static org.junit.Assert.assertEquals; +import java.io.Serializable; +import org.apache.commons.lang.SerializationUtils; import org.junit.Test; +import org.opendaylight.controller.cluster.datastore.DataStoreVersions; import org.opendaylight.controller.md.cluster.datastore.model.TestModel; +import org.opendaylight.controller.protobuff.messages.common.NormalizedNodeMessages.InstanceIdentifier; +import org.opendaylight.controller.protobuff.messages.common.NormalizedNodeMessages.Node; +import org.opendaylight.controller.protobuff.messages.transaction.ShardTransactionMessages; +import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; +import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode; import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes; -import org.opendaylight.yangtools.yang.model.api.SchemaContext; +import org.opendaylight.yangtools.yang.data.impl.schema.builder.impl.ImmutableContainerNodeBuilder; /** * Unit tests for WriteData. @@ -22,12 +30,52 @@ public class WriteDataTest { @Test public void testSerialization() { - SchemaContext schemaContext = TestModel.createTestContext(); - WriteData expected = new WriteData(TestModel.TEST_PATH, ImmutableNodes - .containerNode(TestModel.TEST_QNAME), schemaContext); + YangInstanceIdentifier path = TestModel.TEST_PATH; + NormalizedNode data = ImmutableContainerNodeBuilder.create().withNodeIdentifier( + new YangInstanceIdentifier.NodeIdentifier(TestModel.TEST_QNAME)). + withChild(ImmutableNodes.leafNode(TestModel.DESC_QNAME, "foo")).build(); - WriteData actual = WriteData.fromSerializable(expected.toSerializable(), schemaContext); - Assert.assertEquals("getPath", expected.getPath(), actual.getPath()); - Assert.assertEquals("getData", expected.getData(), actual.getData()); + WriteData expected = new WriteData(path, data); + + Object serialized = expected.toSerializable(DataStoreVersions.CURRENT_VERSION); + assertEquals("Serialized type", WriteData.class, serialized.getClass()); + assertEquals("Version", DataStoreVersions.CURRENT_VERSION, ((WriteData)serialized).getVersion()); + + Object clone = SerializationUtils.clone((Serializable) serialized); + assertEquals("Version", DataStoreVersions.CURRENT_VERSION, ((WriteData)clone).getVersion()); + WriteData actual = WriteData.fromSerializable(clone); + assertEquals("getPath", expected.getPath(), actual.getPath()); + assertEquals("getData", expected.getData(), actual.getData()); + } + + @Test + public void testIsSerializedType() { + assertEquals("isSerializedType", true, WriteData.isSerializedType( + ShardTransactionMessages.WriteData.newBuilder() + .setInstanceIdentifierPathArguments(InstanceIdentifier.getDefaultInstance()) + .setNormalizedNode(Node.getDefaultInstance()).build())); + assertEquals("isSerializedType", true, WriteData.isSerializedType(new WriteData())); + assertEquals("isSerializedType", false, WriteData.isSerializedType(new Object())); + } + + /** + * Tests backwards compatible serialization/deserialization of a WriteData message with the + * base and R1 Helium versions, which used the protobuff WriteData message. + */ + @Test + public void testSerializationWithHeliumR1Version() throws Exception { + YangInstanceIdentifier path = TestModel.TEST_PATH; + NormalizedNode data = ImmutableContainerNodeBuilder.create().withNodeIdentifier( + new YangInstanceIdentifier.NodeIdentifier(TestModel.TEST_QNAME)). + withChild(ImmutableNodes.leafNode(TestModel.DESC_QNAME, "foo")).build(); + + WriteData expected = new WriteData(path, data); + + Object serialized = expected.toSerializable(DataStoreVersions.HELIUM_1_VERSION); + assertEquals("Serialized type", ShardTransactionMessages.WriteData.class, serialized.getClass()); + + WriteData actual = WriteData.fromSerializable(SerializationUtils.clone((Serializable) serialized)); + assertEquals("getPath", expected.getPath(), actual.getPath()); + assertEquals("getData", expected.getData(), actual.getData()); } } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/md/cluster/datastore/model/TestModel.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/md/cluster/datastore/model/TestModel.java index 85441eca0d..e571e3a715 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/md/cluster/datastore/model/TestModel.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/md/cluster/datastore/model/TestModel.java @@ -7,16 +7,15 @@ */ package org.opendaylight.controller.md.cluster.datastore.model; +import java.io.InputStream; +import java.util.Collections; +import java.util.Set; import org.opendaylight.yangtools.yang.common.QName; import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; import org.opendaylight.yangtools.yang.model.api.Module; import org.opendaylight.yangtools.yang.model.api.SchemaContext; import org.opendaylight.yangtools.yang.parser.impl.YangParserImpl; -import java.io.InputStream; -import java.util.Collections; -import java.util.Set; - public class TestModel { public static final QName TEST_QNAME = QName.create("urn:opendaylight:params:xml:ns:yang:controller:md:sal:dom:store:test", "2014-03-13", @@ -26,6 +25,7 @@ public class TestModel { public static final QName OUTER_CHOICE_QNAME = QName.create(TEST_QNAME, "outer-choice"); public static final QName ID_QNAME = QName.create(TEST_QNAME, "id"); public static final QName NAME_QNAME = QName.create(TEST_QNAME, "name"); + public static final QName DESC_QNAME = QName.create(TEST_QNAME, "desc"); public static final QName VALUE_QNAME = QName.create(TEST_QNAME, "value"); private static final String DATASTORE_TEST_YANG = "/odl-datastore-test.yang"; -- 2.36.6