From: tpantelis Date: Sun, 8 Feb 2015 17:17:16 +0000 (-0500) Subject: Bug 2597: Batch modification operations in TransactionProxy X-Git-Tag: release/lithium~476^2 X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=commitdiff_plain;h=dc5eceede07e499e5c5e0ab60d7ce42bc596fcc0 Bug 2597: Batch modification operations in TransactionProxy Modified TransactionContextImpl to batch write, merge, delete modification operations into a single BatchedModifications message and send the batch when a count threshold is reached. Instead of using the current WriteData, MergeData, DeleteData message classes in BatchedModifications, I reused the Modification classes that are used as the payload for peristence and replication. BatchedModifications derives from MutableCompositeModification. The ShardWriteTransaction now simply transfers the Modification instances in the BatchedModifications instance to its internal MutableCompositeModification and applies the Modifications to its transaction. The Modification classes were refactored a little wrt to versioning. The Write/Merge/DeleteModifications no longer read/write the version from the stream. The version is read/written by MutableCompositeModification so it's redundant to also do so in the data Modification classes. The WriteData, MergeData, DeleteData and associated reply classes were deprecated. I did refactor them a little (along with ReadDataReply) as I was originally going to use them. The VersionedSerializableMessage interface was removed as I realized it makes more sense to pass the version in the WriteData, MergeData, DeleteData constructors instead of passing the version via toSerializable. This made it easier in TransactionContextImpl to transition it to use BatchedModifications. I created a VersionedExternalizableMessage base class that reads/write the version. To handle backwards compatibility with Helium, I derived a LegacyTransactionContextImpl class from TransactionContextImpl that overrides writeData, mergeData and deleteData to send WriteData, MergeData, DeleteData messages. TransactionProxy creates this instance if the remote tx versions is < Lithium version. Change-Id: I28df1f89e97667eaca114b991355a6e9d0160a59 Signed-off-by: tpantelis --- diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DatastoreContext.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DatastoreContext.java index cee781fb88..20708335f3 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DatastoreContext.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DatastoreContext.java @@ -42,6 +42,7 @@ public class DatastoreContext { public static final int DEFAULT_SHARD_ELECTION_TIMEOUT_FACTOR = 2; public static final int DEFAULT_TX_CREATION_INITIAL_RATE_LIMIT = 100; public static final String UNKNOWN_DATA_STORE_TYPE = "unknown"; + public static final int DEFAULT_SHARD_BATCHED_MODIFICATION_COUNT= 100; private InMemoryDOMDataStoreConfigProperties dataStoreProperties; private Duration shardTransactionIdleTimeout = DatastoreContext.DEFAULT_SHARD_TRANSACTION_IDLE_TIMEOUT; @@ -54,8 +55,9 @@ public class DatastoreContext { private boolean persistent = DEFAULT_PERSISTENT; private ConfigurationReader configurationReader = DEFAULT_CONFIGURATION_READER; private long transactionCreationInitialRateLimit = DEFAULT_TX_CREATION_INITIAL_RATE_LIMIT; - private DefaultConfigParamsImpl raftConfig = new DefaultConfigParamsImpl(); + private final DefaultConfigParamsImpl raftConfig = new DefaultConfigParamsImpl(); private String dataStoreType = UNKNOWN_DATA_STORE_TYPE; + private int shardBatchedModificationCount = DEFAULT_SHARD_BATCHED_MODIFICATION_COUNT; private DatastoreContext(){ setShardJournalRecoveryLogBatchSize(DEFAULT_JOURNAL_RECOVERY_BATCH_SIZE); @@ -152,8 +154,12 @@ public class DatastoreContext { raftConfig.setSnapshotBatchCount(shardSnapshotBatchCount); } + public int getShardBatchedModificationCount() { + return shardBatchedModificationCount; + } + public static class Builder { - private DatastoreContext datastoreContext = new DatastoreContext(); + private final DatastoreContext datastoreContext = new DatastoreContext(); public Builder shardTransactionIdleTimeout(Duration shardTransactionIdleTimeout) { datastoreContext.shardTransactionIdleTimeout = shardTransactionIdleTimeout; @@ -246,6 +252,11 @@ public class DatastoreContext { return this; } + public Builder shardBatchedModificationCount(int shardBatchedModificationCount) { + datastoreContext.shardBatchedModificationCount = shardBatchedModificationCount; + return this; + } + public DatastoreContext build() { return datastoreContext; } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/LegacyTransactionContextImpl.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/LegacyTransactionContextImpl.java new file mode 100644 index 0000000000..65d82b73d9 --- /dev/null +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/LegacyTransactionContextImpl.java @@ -0,0 +1,52 @@ +/* + * Copyright (c) 2015 Brocade Communications Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.controller.cluster.datastore; + +import akka.actor.ActorSelection; +import org.opendaylight.controller.cluster.datastore.identifiers.TransactionIdentifier; +import org.opendaylight.controller.cluster.datastore.messages.DeleteData; +import org.opendaylight.controller.cluster.datastore.messages.MergeData; +import org.opendaylight.controller.cluster.datastore.messages.WriteData; +import org.opendaylight.controller.cluster.datastore.utils.ActorContext; +import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; +import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode; +import org.opendaylight.yangtools.yang.model.api.SchemaContext; + +/** + * Implementation of TransactionContextImpl used when talking to a pre-Lithium controller that doesn't + * support the BatchedModifications message. + * + * @author Thomas Pantelis + */ +class LegacyTransactionContextImpl extends TransactionContextImpl { + + LegacyTransactionContextImpl(String transactionPath, ActorSelection actor, TransactionIdentifier identifier, + ActorContext actorContext, SchemaContext schemaContext, boolean isTxActorLocal, + short remoteTransactionVersion, OperationCompleter operationCompleter) { + super(transactionPath, actor, identifier, actorContext, schemaContext, isTxActorLocal, + remoteTransactionVersion, operationCompleter); + } + + @Override + public void deleteData(YangInstanceIdentifier path) { + recordedOperationFutures.add(executeOperationAsync( + new DeleteData(path, getRemoteTransactionVersion()))); + } + + @Override + public void mergeData(YangInstanceIdentifier path, NormalizedNode data) { + recordedOperationFutures.add(executeOperationAsync( + new MergeData(path, data, getRemoteTransactionVersion()))); + } + + @Override + public void writeData(YangInstanceIdentifier path, NormalizedNode data) { + recordedOperationFutures.add(executeOperationAsync( + new WriteData(path, data, getRemoteTransactionVersion()))); + } +} diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/OperationCompleter.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/OperationCompleter.java index 09fa61b570..80aa3793c1 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/OperationCompleter.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/OperationCompleter.java @@ -10,6 +10,7 @@ package org.opendaylight.controller.cluster.datastore; import akka.dispatch.OnComplete; import com.google.common.base.Preconditions; import java.util.concurrent.Semaphore; +import org.opendaylight.controller.cluster.datastore.messages.BatchedModificationsReply; final class OperationCompleter extends OnComplete { private final Semaphore operationLimiter; @@ -19,7 +20,11 @@ final class OperationCompleter extends OnComplete { } @Override - public void onComplete(Throwable throwable, Object o){ - this.operationLimiter.release(); + public void onComplete(Throwable throwable, Object message) { + if(message instanceof BatchedModificationsReply) { + this.operationLimiter.release(((BatchedModificationsReply)message).getNumBatched()); + } else { + this.operationLimiter.release(); + } } } \ No newline at end of file diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardTransaction.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardTransaction.java index af25df13d2..613b3749e0 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardTransaction.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardTransaction.java @@ -129,9 +129,9 @@ public abstract class ShardTransaction extends AbstractUntypedActorWithMetering try { final CheckedFuture>, ReadFailedException> future = transaction.read(path); Optional> optional = future.checkedGet(); - ReadDataReply readDataReply = new ReadDataReply(optional.orNull()); + ReadDataReply readDataReply = new ReadDataReply(optional.orNull(), clientTxVersion); - sender().tell((returnSerialized ? readDataReply.toSerializable(clientTxVersion): readDataReply), self()); + sender().tell((returnSerialized ? readDataReply.toSerializable(): readDataReply), self()); } catch (Exception e) { LOG.debug(String.format("Unexpected error reading path %s", path), e); diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardWriteTransaction.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardWriteTransaction.java index a4a2f45fdb..d5dcfde803 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardWriteTransaction.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardWriteTransaction.java @@ -13,6 +13,8 @@ package org.opendaylight.controller.cluster.datastore; import akka.actor.ActorRef; import akka.actor.PoisonPill; import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shard.ShardStats; +import org.opendaylight.controller.cluster.datastore.messages.BatchedModifications; +import org.opendaylight.controller.cluster.datastore.messages.BatchedModificationsReply; import org.opendaylight.controller.cluster.datastore.messages.DeleteData; import org.opendaylight.controller.cluster.datastore.messages.DeleteDataReply; import org.opendaylight.controller.cluster.datastore.messages.ForwardedReadyTransaction; @@ -24,6 +26,7 @@ import org.opendaylight.controller.cluster.datastore.messages.WriteDataReply; import org.opendaylight.controller.cluster.datastore.modification.CompositeModification; import org.opendaylight.controller.cluster.datastore.modification.DeleteModification; import org.opendaylight.controller.cluster.datastore.modification.MergeModification; +import org.opendaylight.controller.cluster.datastore.modification.Modification; import org.opendaylight.controller.cluster.datastore.modification.MutableCompositeModification; import org.opendaylight.controller.cluster.datastore.modification.WriteModification; import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort; @@ -37,7 +40,7 @@ import org.opendaylight.yangtools.yang.model.api.SchemaContext; */ public class ShardWriteTransaction extends ShardTransaction { - private final MutableCompositeModification modification = new MutableCompositeModification(); + private final MutableCompositeModification compositeModification = new MutableCompositeModification(); private final DOMStoreWriteTransaction transaction; public ShardWriteTransaction(DOMStoreWriteTransaction transaction, ActorRef shardActor, @@ -55,18 +58,12 @@ public class ShardWriteTransaction extends ShardTransaction { @Override public void handleReceive(Object message) throws Exception { - if (message instanceof WriteData) { - writeData(transaction, (WriteData) message, !SERIALIZED_REPLY); - - } else if (message instanceof MergeData) { - mergeData(transaction, (MergeData) message, !SERIALIZED_REPLY); - - } else if (message instanceof DeleteData) { - deleteData(transaction, (DeleteData) message, !SERIALIZED_REPLY); - + if (message instanceof BatchedModifications) { + batchedModifications((BatchedModifications)message); } else if (message instanceof ReadyTransaction) { readyTransaction(transaction, !SERIALIZED_REPLY); - + } else if(ReadyTransaction.SERIALIZABLE_CLASS.equals(message.getClass())) { + readyTransaction(transaction, SERIALIZED_REPLY); } else if(WriteData.isSerializedType(message)) { writeData(transaction, WriteData.fromSerializable(message), SERIALIZED_REPLY); @@ -76,22 +73,32 @@ public class ShardWriteTransaction extends ShardTransaction { } else if(DeleteData.isSerializedType(message)) { deleteData(transaction, DeleteData.fromSerializable(message), SERIALIZED_REPLY); - } else if(ReadyTransaction.SERIALIZABLE_CLASS.equals(message.getClass())) { - readyTransaction(transaction, SERIALIZED_REPLY); - } else if (message instanceof GetCompositedModification) { // This is here for testing only - getSender().tell(new GetCompositeModificationReply(modification), getSelf()); + getSender().tell(new GetCompositeModificationReply(compositeModification), getSelf()); } else { super.handleReceive(message); } } + private void batchedModifications(BatchedModifications batched) { + try { + for(Modification modification: batched.getModifications()) { + compositeModification.addModification(modification); + modification.apply(transaction); + } + + getSender().tell(new BatchedModificationsReply(batched.getModifications().size()), getSelf()); + } catch (Exception e) { + getSender().tell(new akka.actor.Status.Failure(e), getSelf()); + } + } + private void writeData(DOMStoreWriteTransaction transaction, WriteData message, boolean returnSerialized) { LOG.debug("writeData at path : {}", message.getPath()); - modification.addModification( + compositeModification.addModification( new WriteModification(message.getPath(), message.getData())); try { transaction.write(message.getPath(), message.getData()); @@ -107,7 +114,7 @@ public class ShardWriteTransaction extends ShardTransaction { boolean returnSerialized) { LOG.debug("mergeData at path : {}", message.getPath()); - modification.addModification( + compositeModification.addModification( new MergeModification(message.getPath(), message.getData())); try { @@ -124,7 +131,7 @@ public class ShardWriteTransaction extends ShardTransaction { boolean returnSerialized) { LOG.debug("deleteData at path : {}", message.getPath()); - modification.addModification(new DeleteModification(message.getPath())); + compositeModification.addModification(new DeleteModification(message.getPath())); try { transaction.delete(message.getPath()); DeleteDataReply deleteDataReply = DeleteDataReply.INSTANCE; @@ -143,7 +150,7 @@ public class ShardWriteTransaction extends ShardTransaction { DOMStoreThreePhaseCommitCohort cohort = transaction.ready(); getShardActor().forward(new ForwardedReadyTransaction(transactionID, getClientTxVersion(), - cohort, modification, returnSerialized), getContext()); + cohort, compositeModification, returnSerialized), getContext()); // The shard will handle the commit from here so we're no longer needed - self-destruct. getSelf().tell(PoisonPill.getInstance(), getSelf()); diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionContextImpl.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionContextImpl.java index 03d1b3a6d7..1e222e4c0a 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionContextImpl.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionContextImpl.java @@ -15,18 +15,19 @@ import com.google.common.collect.Lists; import com.google.common.util.concurrent.SettableFuture; import java.util.List; import org.opendaylight.controller.cluster.datastore.identifiers.TransactionIdentifier; +import org.opendaylight.controller.cluster.datastore.messages.BatchedModifications; import org.opendaylight.controller.cluster.datastore.messages.CloseTransaction; import org.opendaylight.controller.cluster.datastore.messages.DataExists; import org.opendaylight.controller.cluster.datastore.messages.DataExistsReply; -import org.opendaylight.controller.cluster.datastore.messages.DeleteData; -import org.opendaylight.controller.cluster.datastore.messages.MergeData; import org.opendaylight.controller.cluster.datastore.messages.ReadData; import org.opendaylight.controller.cluster.datastore.messages.ReadDataReply; import org.opendaylight.controller.cluster.datastore.messages.ReadyTransaction; import org.opendaylight.controller.cluster.datastore.messages.ReadyTransactionReply; import org.opendaylight.controller.cluster.datastore.messages.SerializableMessage; -import org.opendaylight.controller.cluster.datastore.messages.VersionedSerializableMessage; -import org.opendaylight.controller.cluster.datastore.messages.WriteData; +import org.opendaylight.controller.cluster.datastore.modification.DeleteModification; +import org.opendaylight.controller.cluster.datastore.modification.MergeModification; +import org.opendaylight.controller.cluster.datastore.modification.Modification; +import org.opendaylight.controller.cluster.datastore.modification.WriteModification; import org.opendaylight.controller.cluster.datastore.utils.ActorContext; import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException; import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; @@ -36,7 +37,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import scala.concurrent.Future; -final class TransactionContextImpl extends AbstractTransactionContext { +class TransactionContextImpl extends AbstractTransactionContext { private static final Logger LOG = LoggerFactory.getLogger(TransactionContextImpl.class); private final ActorContext actorContext; @@ -44,8 +45,9 @@ final class TransactionContextImpl extends AbstractTransactionContext { private final ActorSelection actor; private final boolean isTxActorLocal; private final short remoteTransactionVersion; - private final OperationCompleter operationCompleter; + private final OperationCompleter operationCompleter; + private BatchedModifications batchedModifications; TransactionContextImpl(String transactionPath, ActorSelection actor, TransactionIdentifier identifier, ActorContext actorContext, SchemaContext schemaContext, @@ -69,13 +71,12 @@ final class TransactionContextImpl extends AbstractTransactionContext { return actor; } - private Future executeOperationAsync(SerializableMessage msg) { - return completeOperation(actorContext.executeOperationAsync(getActor(), isTxActorLocal ? msg : msg.toSerializable())); + protected short getRemoteTransactionVersion() { + return remoteTransactionVersion; } - private Future executeOperationAsync(VersionedSerializableMessage msg) { - return completeOperation(actorContext.executeOperationAsync(getActor(), isTxActorLocal ? msg : - msg.toSerializable(remoteTransactionVersion))); + protected Future executeOperationAsync(SerializableMessage msg) { + return completeOperation(actorContext.executeOperationAsync(getActor(), isTxActorLocal ? msg : msg.toSerializable())); } @Override @@ -90,6 +91,10 @@ final class TransactionContextImpl extends AbstractTransactionContext { LOG.debug("Tx {} readyTransaction called with {} previous recorded operations pending", identifier, recordedOperationFutures.size()); + // Send the remaining batched modifications if any. + + sendBatchedModifications(); + // Send the ReadyTransaction message to the Tx actor. final Future replyFuture = executeOperationAsync(ReadyTransaction.INSTANCE); @@ -155,25 +160,48 @@ final class TransactionContextImpl extends AbstractTransactionContext { }, TransactionProxy.SAME_FAILURE_TRANSFORMER, actorContext.getClientDispatcher()); } + private void batchModification(Modification modification) { + if(batchedModifications == null) { + batchedModifications = new BatchedModifications(remoteTransactionVersion); + } + + batchedModifications.addModification(modification); + + if(batchedModifications.getModifications().size() >= + actorContext.getDatastoreContext().getShardBatchedModificationCount()) { + sendBatchedModifications(); + } + } + + private void sendBatchedModifications() { + if(batchedModifications != null) { + LOG.debug("Tx {} sending {} batched modifications", identifier, + batchedModifications.getModifications().size()); + + recordedOperationFutures.add(executeOperationAsync(batchedModifications)); + batchedModifications = null; + } + } + @Override public void deleteData(YangInstanceIdentifier path) { LOG.debug("Tx {} deleteData called path = {}", identifier, path); - recordedOperationFutures.add(executeOperationAsync(new DeleteData(path))); + batchModification(new DeleteModification(path)); } @Override public void mergeData(YangInstanceIdentifier path, NormalizedNode data) { LOG.debug("Tx {} mergeData called path = {}", identifier, path); - recordedOperationFutures.add(executeOperationAsync(new MergeData(path, data))); + batchModification(new MergeModification(path, data)); } @Override public void writeData(YangInstanceIdentifier path, NormalizedNode data) { LOG.debug("Tx {} writeData called path = {}", identifier, path); - recordedOperationFutures.add(executeOperationAsync(new WriteData(path, data))); + batchModification(new WriteModification(path, data)); } @Override @@ -182,6 +210,10 @@ final class TransactionContextImpl extends AbstractTransactionContext { LOG.debug("Tx {} readData called path = {}", identifier, path); + // Send the remaining batched modifications if any. + + sendBatchedModifications(); + // If there were any previous recorded put/merge/delete operation reply Futures then we // must wait for them to successfully complete. This is necessary to honor the read // uncommitted semantics of the public API contract. If any one fails then fail the read. @@ -263,6 +295,10 @@ final class TransactionContextImpl extends AbstractTransactionContext { LOG.debug("Tx {} dataExists called path = {}", identifier, path); + // Send the remaining batched modifications if any. + + sendBatchedModifications(); + // If there were any previous recorded put/merge/delete operation reply Futures then we // must wait for them to successfully complete. This is necessary to honor the read // uncommitted semantics of the public API contract. If any one fails then fail this diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionProxy.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionProxy.java index d63ec8010d..58b37be2a2 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionProxy.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionProxy.java @@ -304,7 +304,8 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction { private void throttleOperation(int acquirePermits) { try { - if(!operationLimiter.tryAcquire(acquirePermits, actorContext.getDatastoreContext().getOperationTimeoutInSeconds(), TimeUnit.SECONDS)){ + if(!operationLimiter.tryAcquire(acquirePermits, + actorContext.getDatastoreContext().getOperationTimeoutInSeconds(), TimeUnit.SECONDS)){ LOG.warn("Failed to acquire operation permit for transaction {}", getIdentifier()); } } catch (InterruptedException e) { @@ -689,7 +690,7 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction { private TransactionContext createValidTransactionContext(CreateTransactionReply reply) { String transactionPath = reply.getTransactionPath(); - LOG.debug("Tx {} Received transaction actor path {}", identifier, transactionPath); + LOG.debug("Tx {} Received {}", identifier, reply); ActorSelection transactionActor = actorContext.actorSelection(transactionPath); @@ -707,8 +708,13 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction { // Check if TxActor is created in the same node boolean isTxActorLocal = actorContext.isPathLocal(transactionPath); - return new TransactionContextImpl(transactionPath, transactionActor, identifier, - actorContext, schemaContext, isTxActorLocal, reply.getVersion(), operationCompleter); + if(reply.getVersion() >= DataStoreVersions.LITHIUM_VERSION) { + return new TransactionContextImpl(transactionPath, transactionActor, identifier, + actorContext, schemaContext, isTxActorLocal, reply.getVersion(), operationCompleter); + } else { + return new LegacyTransactionContextImpl(transactionPath, transactionActor, identifier, + actorContext, schemaContext, isTxActorLocal, reply.getVersion(), operationCompleter); + } } } } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/BatchedModifications.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/BatchedModifications.java new file mode 100644 index 0000000000..670641f6ac --- /dev/null +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/BatchedModifications.java @@ -0,0 +1,31 @@ +/* + * Copyright (c) 2015 Brocade Communications Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.controller.cluster.datastore.messages; + +import org.opendaylight.controller.cluster.datastore.modification.MutableCompositeModification; + +/** + * Message used to batch write, merge, delete modification operations to the ShardTransaction actor. + * + * @author Thomas Pantelis + */ +public class BatchedModifications extends MutableCompositeModification implements SerializableMessage { + private static final long serialVersionUID = 1L; + + public BatchedModifications() { + } + + public BatchedModifications(short version) { + super(version); + } + + @Override + public Object toSerializable() { + return this; + } +} diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/BatchedModificationsReply.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/BatchedModificationsReply.java new file mode 100644 index 0000000000..33c5733fdb --- /dev/null +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/BatchedModificationsReply.java @@ -0,0 +1,52 @@ +/* + * Copyright (c) 2015 Brocade Communications Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.controller.cluster.datastore.messages; + +import java.io.IOException; +import java.io.ObjectInput; +import java.io.ObjectOutput; + +/** + * The reply for the BatchedModifications message. + * + * @author Thomas Pantelis + */ +public class BatchedModificationsReply extends VersionedExternalizableMessage { + private static final long serialVersionUID = 1L; + + private int numBatched; + + public BatchedModificationsReply() { + } + + public BatchedModificationsReply(int numBatched) { + this.numBatched = numBatched; + } + + + public int getNumBatched() { + return numBatched; + } + + @Override + public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { + super.readExternal(in); + numBatched = in.readInt(); + } + + @Override + public void writeExternal(ObjectOutput out) throws IOException { + super.writeExternal(out); + out.writeInt(numBatched); + } + + @Override + public Object toSerializable() { + return this; + } +} diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/CreateTransactionReply.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/CreateTransactionReply.java index ffd0f1ccf3..c2bf81fa8e 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/CreateTransactionReply.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/CreateTransactionReply.java @@ -57,4 +57,11 @@ public class CreateTransactionReply implements SerializableMessage { (short)o.getMessageVersion()); } + @Override + public String toString() { + StringBuilder builder = new StringBuilder(); + builder.append("CreateTransactionReply [transactionPath=").append(transactionPath).append(", transactionId=") + .append(transactionId).append(", version=").append(version).append("]"); + return builder.toString(); + } } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/DeleteData.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/DeleteData.java index 04bc63c5a5..5ba787c983 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/DeleteData.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/DeleteData.java @@ -8,7 +8,6 @@ package org.opendaylight.controller.cluster.datastore.messages; -import java.io.Externalizable; import java.io.IOException; import java.io.ObjectInput; import java.io.ObjectOutput; @@ -18,18 +17,22 @@ import org.opendaylight.controller.cluster.datastore.utils.SerializationUtils; import org.opendaylight.controller.protobuff.messages.transaction.ShardTransactionMessages; import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; -public class DeleteData implements VersionedSerializableMessage, Externalizable { +/** + * @deprecated Replaced by BatchedModifications. + */ +@Deprecated +public class DeleteData extends VersionedExternalizableMessage { private static final long serialVersionUID = 1L; public static final Class SERIALIZABLE_CLASS = DeleteData.class; private YangInstanceIdentifier path; - private short version; public DeleteData() { } - public DeleteData(final YangInstanceIdentifier path) { + public DeleteData(final YangInstanceIdentifier path, short version) { + super(version); this.path = path; } @@ -37,26 +40,21 @@ public class DeleteData implements VersionedSerializableMessage, Externalizable return path; } - public short getVersion() { - return version; - } - @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { - version = in.readShort(); // Read the version - don't need to do anything with it now + super.readExternal(in); path = SerializationUtils.deserializePath(in); } @Override public void writeExternal(ObjectOutput out) throws IOException { - out.writeShort(version); + super.writeExternal(out); SerializationUtils.serializePath(path, out); } @Override - public Object toSerializable(short toVersion) { - if(toVersion >= DataStoreVersions.LITHIUM_VERSION) { - version = toVersion; + public Object toSerializable() { + if(getVersion() >= DataStoreVersions.LITHIUM_VERSION) { return this; } else { // To base or R1 Helium version @@ -71,7 +69,8 @@ public class DeleteData implements VersionedSerializableMessage, Externalizable } else { // From base or R1 Helium version ShardTransactionMessages.DeleteData o = (ShardTransactionMessages.DeleteData) serializable; - return new DeleteData(InstanceIdentifierUtils.fromSerializable(o.getInstanceIdentifierPathArguments())); + return new DeleteData(InstanceIdentifierUtils.fromSerializable(o.getInstanceIdentifierPathArguments()), + DataStoreVersions.HELIUM_2_VERSION); } } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/DeleteDataReply.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/DeleteDataReply.java index 0c6ff0e68d..dd21b0e2e6 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/DeleteDataReply.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/DeleteDataReply.java @@ -10,7 +10,12 @@ package org.opendaylight.controller.cluster.datastore.messages; import org.opendaylight.controller.protobuff.messages.transaction.ShardTransactionMessages; +/** + * @deprecated Replaced by BatchedModificationsReply. + */ +@Deprecated public class DeleteDataReply extends EmptyReply { + private static final long serialVersionUID = 1L; private static final Object LEGACY_SERIALIZED_INSTANCE = ShardTransactionMessages.DeleteDataReply.newBuilder().build(); diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/EmptyReply.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/EmptyReply.java index 284c6eff8d..38a37f0ccf 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/EmptyReply.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/EmptyReply.java @@ -14,7 +14,7 @@ import org.opendaylight.controller.cluster.datastore.DataStoreVersions; * * @author Thomas Pantelis */ -public abstract class EmptyReply extends EmptyExternalizable implements VersionedSerializableMessage { +public abstract class EmptyReply extends EmptyExternalizable { private final Object legacySerializedInstance; @@ -23,7 +23,6 @@ public abstract class EmptyReply extends EmptyExternalizable implements Versione this.legacySerializedInstance = legacySerializedInstance; } - @Override public Object toSerializable(short toVersion) { return toVersion >= DataStoreVersions.LITHIUM_VERSION ? this : legacySerializedInstance; } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/MergeData.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/MergeData.java index ae0d630cf2..0f44733503 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/MergeData.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/MergeData.java @@ -16,7 +16,11 @@ import org.opendaylight.controller.protobuff.messages.transaction.ShardTransacti import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode; -public class MergeData extends ModifyData implements VersionedSerializableMessage { +/** + * @deprecated Replaced by BatchedModifications. + */ +@Deprecated +public class MergeData extends ModifyData { private static final long serialVersionUID = 1L; public static final Class SERIALIZABLE_CLASS = MergeData.class; @@ -24,14 +28,13 @@ public class MergeData extends ModifyData implements VersionedSerializableMessag public MergeData() { } - public MergeData(YangInstanceIdentifier path, NormalizedNode data) { - super(path, data); + public MergeData(YangInstanceIdentifier path, NormalizedNode data, short version) { + super(path, data, version); } @Override - public Object toSerializable(short toVersion) { - if(toVersion >= DataStoreVersions.LITHIUM_VERSION) { - setVersion(toVersion); + public Object toSerializable() { + if(getVersion() >= DataStoreVersions.LITHIUM_VERSION) { return this; } else { // To base or R1 Helium version @@ -50,7 +53,8 @@ public class MergeData extends ModifyData implements VersionedSerializableMessag ShardTransactionMessages.MergeData o = (ShardTransactionMessages.MergeData) serializable; Decoded decoded = new NormalizedNodeToNodeCodec(null).decode( o.getInstanceIdentifierPathArguments(), o.getNormalizedNode()); - return new MergeData(decoded.getDecodedPath(), decoded.getDecodedNode()); + return new MergeData(decoded.getDecodedPath(), decoded.getDecodedNode(), + DataStoreVersions.HELIUM_2_VERSION); } } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/MergeDataReply.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/MergeDataReply.java index a4c514bdbf..6936ef14c5 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/MergeDataReply.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/MergeDataReply.java @@ -10,6 +10,10 @@ package org.opendaylight.controller.cluster.datastore.messages; import org.opendaylight.controller.protobuff.messages.transaction.ShardTransactionMessages; +/** + * @deprecated Replaced by BatchedModificationsReply. + */ +@Deprecated public class MergeDataReply extends EmptyReply { private static final long serialVersionUID = 1L; diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/ModifyData.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/ModifyData.java index 69c41c2a56..bbd090f929 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/ModifyData.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/ModifyData.java @@ -8,7 +8,6 @@ package org.opendaylight.controller.cluster.datastore.messages; -import java.io.Externalizable; import java.io.IOException; import java.io.ObjectInput; import java.io.ObjectOutput; @@ -17,17 +16,21 @@ import org.opendaylight.controller.cluster.datastore.utils.SerializationUtils.Ap import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode; -public abstract class ModifyData implements Externalizable { +/** + * @deprecated Replaced by BatchedModifications. + */ +@Deprecated +public abstract class ModifyData extends VersionedExternalizableMessage { private static final long serialVersionUID = 1L; private YangInstanceIdentifier path; private NormalizedNode data; - private short version; protected ModifyData() { } - protected ModifyData(YangInstanceIdentifier path, NormalizedNode data) { + protected ModifyData(YangInstanceIdentifier path, NormalizedNode data, short version) { + super(version); this.path = path; this.data = data; } @@ -40,23 +43,15 @@ public abstract class ModifyData implements Externalizable { return data; } - public short getVersion() { - return version; - } - - protected void setVersion(short version) { - this.version = version; - } - @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { - version = in.readShort(); + super.readExternal(in); SerializationUtils.deserializePathAndNode(in, this, APPLIER); } @Override public void writeExternal(ObjectOutput out) throws IOException { - out.writeShort(version); + super.writeExternal(out); SerializationUtils.serializePathAndNode(path, data, out); } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/ReadDataReply.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/ReadDataReply.java index 8ac6e1b149..b0c163d87f 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/ReadDataReply.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/ReadDataReply.java @@ -9,7 +9,6 @@ package org.opendaylight.controller.cluster.datastore.messages; import com.google.protobuf.ByteString; -import java.io.Externalizable; import java.io.IOException; import java.io.ObjectInput; import java.io.ObjectOutput; @@ -19,18 +18,18 @@ import org.opendaylight.controller.cluster.datastore.utils.SerializationUtils; import org.opendaylight.controller.protobuff.messages.transaction.ShardTransactionMessages; import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode; -public class ReadDataReply implements VersionedSerializableMessage, Externalizable { +public class ReadDataReply extends VersionedExternalizableMessage { private static final long serialVersionUID = 1L; public static final Class SERIALIZABLE_CLASS = ReadDataReply.class; private NormalizedNode normalizedNode; - private short version; public ReadDataReply() { } - public ReadDataReply(NormalizedNode normalizedNode) { + public ReadDataReply(NormalizedNode normalizedNode, short version) { + super(version); this.normalizedNode = normalizedNode; } @@ -40,20 +39,19 @@ public class ReadDataReply implements VersionedSerializableMessage, Externalizab @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { - version = in.readShort(); + super.readExternal(in); normalizedNode = SerializationUtils.deserializeNormalizedNode(in); } @Override public void writeExternal(ObjectOutput out) throws IOException { - out.writeShort(version); + super.writeExternal(out); SerializationUtils.serializeNormalizedNode(normalizedNode, out); } @Override - public Object toSerializable(short toVersion) { - if(toVersion >= DataStoreVersions.LITHIUM_VERSION) { - version = toVersion; + public Object toSerializable() { + if(getVersion() >= DataStoreVersions.LITHIUM_VERSION) { return this; } else { return toSerializableReadDataReply(normalizedNode); @@ -78,7 +76,8 @@ public class ReadDataReply implements VersionedSerializableMessage, Externalizab } else { ShardTransactionMessages.ReadDataReply o = (ShardTransactionMessages.ReadDataReply) serializable; - return new ReadDataReply(new NormalizedNodeToNodeCodec(null).decode(o.getNormalizedNode())); + return new ReadDataReply(new NormalizedNodeToNodeCodec(null).decode(o.getNormalizedNode()), + DataStoreVersions.HELIUM_2_VERSION); } } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/VersionedExternalizableMessage.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/VersionedExternalizableMessage.java new file mode 100644 index 0000000000..2a660fa4b2 --- /dev/null +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/VersionedExternalizableMessage.java @@ -0,0 +1,45 @@ +/* + * Copyright (c) 2015 Brocade Communications Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.controller.cluster.datastore.messages; + +import java.io.Externalizable; +import java.io.IOException; +import java.io.ObjectInput; +import java.io.ObjectOutput; + +/** + * Abstract base class for a versioned Externalizable message. + * + * @author Thomas Pantelis + */ +public abstract class VersionedExternalizableMessage implements Externalizable, SerializableMessage { + private static final long serialVersionUID = 1L; + + private short version; + + public VersionedExternalizableMessage() { + } + + public VersionedExternalizableMessage(short version) { + this.version = version; + } + + public short getVersion() { + return version; + } + + @Override + public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { + version = in.readShort(); + } + + @Override + public void writeExternal(ObjectOutput out) throws IOException { + out.writeShort(version); + } +} diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/VersionedSerializableMessage.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/VersionedSerializableMessage.java deleted file mode 100644 index 5c30b1078e..0000000000 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/VersionedSerializableMessage.java +++ /dev/null @@ -1,17 +0,0 @@ -/* - * Copyright (c) 2014 Brocade Communications Systems, Inc. and others. All rights reserved. - * - * This program and the accompanying materials are made available under the - * terms of the Eclipse Public License v1.0 which accompanies this distribution, - * and is available at http://www.eclipse.org/legal/epl-v10.html - */ -package org.opendaylight.controller.cluster.datastore.messages; - -/** - * Interface for a Serializable message with versioning. - * - * @author Thomas Pantelis - */ -public interface VersionedSerializableMessage { - Object toSerializable(short toVersion); -} diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/WriteData.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/WriteData.java index 989949c88f..a4f648b6b3 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/WriteData.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/WriteData.java @@ -16,7 +16,11 @@ import org.opendaylight.controller.protobuff.messages.transaction.ShardTransacti import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode; -public class WriteData extends ModifyData implements VersionedSerializableMessage { +/** + * @deprecated Replaced by BatchedModifications. + */ +@Deprecated +public class WriteData extends ModifyData { private static final long serialVersionUID = 1L; public static final Class SERIALIZABLE_CLASS = WriteData.class; @@ -24,14 +28,13 @@ public class WriteData extends ModifyData implements VersionedSerializableMessag public WriteData() { } - public WriteData(YangInstanceIdentifier path, NormalizedNode data) { - super(path, data); + public WriteData(YangInstanceIdentifier path, NormalizedNode data, short version) { + super(path, data, version); } @Override - public Object toSerializable(short toVersion) { - if(toVersion >= DataStoreVersions.LITHIUM_VERSION) { - setVersion(toVersion); + public Object toSerializable() { + if(getVersion() >= DataStoreVersions.LITHIUM_VERSION) { return this; } else { // To base or R1 Helium version @@ -50,7 +53,8 @@ public class WriteData extends ModifyData implements VersionedSerializableMessag ShardTransactionMessages.WriteData o = (ShardTransactionMessages.WriteData) serializable; Decoded decoded = new NormalizedNodeToNodeCodec(null).decode( o.getInstanceIdentifierPathArguments(), o.getNormalizedNode()); - return new WriteData(decoded.getDecodedPath(), decoded.getDecodedNode()); + return new WriteData(decoded.getDecodedPath(), decoded.getDecodedNode(), + DataStoreVersions.HELIUM_2_VERSION); } } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/WriteDataReply.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/WriteDataReply.java index 8255828819..3455571a51 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/WriteDataReply.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/WriteDataReply.java @@ -10,6 +10,10 @@ package org.opendaylight.controller.cluster.datastore.messages; import org.opendaylight.controller.protobuff.messages.transaction.ShardTransactionMessages; +/** + * @deprecated Replaced by BatchedModificationsReply. + */ +@Deprecated public class WriteDataReply extends EmptyReply { private static final long serialVersionUID = 1L; diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/modification/AbstractModification.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/modification/AbstractModification.java index f04d004404..77f0858d7b 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/modification/AbstractModification.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/modification/AbstractModification.java @@ -17,8 +17,10 @@ import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; public abstract class AbstractModification implements Modification { private YangInstanceIdentifier path; + private short version; - protected AbstractModification() { + protected AbstractModification(short version) { + this.version = version; } protected AbstractModification(YangInstanceIdentifier path) { @@ -32,4 +34,8 @@ public abstract class AbstractModification implements Modification { public YangInstanceIdentifier getPath() { return path; } + + public short getVersion() { + return version; + } } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/modification/DeleteModification.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/modification/DeleteModification.java index 833f86fb98..3a63f5b173 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/modification/DeleteModification.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/modification/DeleteModification.java @@ -25,6 +25,11 @@ public class DeleteModification extends AbstractModification { private static final long serialVersionUID = 1L; public DeleteModification() { + this(DataStoreVersions.CURRENT_VERSION); + } + + public DeleteModification(short version) { + super(version); } public DeleteModification(YangInstanceIdentifier path) { @@ -43,13 +48,11 @@ public class DeleteModification extends AbstractModification { @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { - in.readShort(); setPath(SerializationUtils.deserializePath(in)); } @Override public void writeExternal(ObjectOutput out) throws IOException { - out.writeShort(DataStoreVersions.CURRENT_VERSION); SerializationUtils.serializePath(getPath(), out); } @@ -66,8 +69,9 @@ public class DeleteModification extends AbstractModification { return new DeleteModification(InstanceIdentifierUtils.fromSerializable(o.getPath())); } - public static DeleteModification fromStream(ObjectInput in) throws ClassNotFoundException, IOException { - DeleteModification mod = new DeleteModification(); + public static DeleteModification fromStream(ObjectInput in, short version) + throws ClassNotFoundException, IOException { + DeleteModification mod = new DeleteModification(version); mod.readExternal(in); return mod; } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/modification/MergeModification.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/modification/MergeModification.java index 571443eedd..7ba74f4e7f 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/modification/MergeModification.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/modification/MergeModification.java @@ -10,6 +10,7 @@ package org.opendaylight.controller.cluster.datastore.modification; import java.io.IOException; import java.io.ObjectInput; +import org.opendaylight.controller.cluster.datastore.DataStoreVersions; import org.opendaylight.controller.cluster.datastore.node.NormalizedNodeToNodeCodec; import org.opendaylight.controller.cluster.datastore.node.NormalizedNodeToNodeCodec.Decoded; import org.opendaylight.controller.protobuff.messages.persistent.PersistentMessages; @@ -24,6 +25,11 @@ public class MergeModification extends WriteModification { private static final long serialVersionUID = 1L; public MergeModification() { + this(DataStoreVersions.CURRENT_VERSION); + } + + public MergeModification(short version) { + super(version); } public MergeModification(final YangInstanceIdentifier path, final NormalizedNode data) { @@ -47,8 +53,9 @@ public class MergeModification extends WriteModification { return new MergeModification(decoded.getDecodedPath(), decoded.getDecodedNode()); } - public static MergeModification fromStream(ObjectInput in) throws ClassNotFoundException, IOException { - MergeModification mod = new MergeModification(); + public static MergeModification fromStream(ObjectInput in, short version) + throws ClassNotFoundException, IOException { + MergeModification mod = new MergeModification(version); mod.readExternal(in); return mod; } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/modification/MutableCompositeModification.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/modification/MutableCompositeModification.java index 5d7947b19f..b597742319 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/modification/MutableCompositeModification.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/modification/MutableCompositeModification.java @@ -27,10 +27,15 @@ import org.opendaylight.controller.sal.core.spi.data.DOMStoreWriteTransaction; public class MutableCompositeModification implements CompositeModification { private static final long serialVersionUID = 1L; - private final List modifications; + private final List modifications = new ArrayList<>(); + private short version; public MutableCompositeModification() { - modifications = new ArrayList<>(); + this(DataStoreVersions.CURRENT_VERSION); + } + + public MutableCompositeModification(short version) { + this.version = version; } @Override @@ -45,6 +50,14 @@ public class MutableCompositeModification implements CompositeModification { return COMPOSITE; } + public short getVersion() { + return version; + } + + public void setVersion(short version) { + this.version = version; + } + /** * Add a new Modification to the list of Modifications represented by this * composite @@ -62,7 +75,7 @@ public class MutableCompositeModification implements CompositeModification { @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { - in.readShort(); + version = in.readShort(); int size = in.readInt(); @@ -75,15 +88,15 @@ public class MutableCompositeModification implements CompositeModification { byte type = in.readByte(); switch(type) { case Modification.WRITE: - modifications.add(WriteModification.fromStream(in)); + modifications.add(WriteModification.fromStream(in, version)); break; case Modification.MERGE: - modifications.add(MergeModification.fromStream(in)); + modifications.add(MergeModification.fromStream(in, version)); break; case Modification.DELETE: - modifications.add(DeleteModification.fromStream(in)); + modifications.add(DeleteModification.fromStream(in, version)); break; } } @@ -94,7 +107,7 @@ public class MutableCompositeModification implements CompositeModification { @Override public void writeExternal(ObjectOutput out) throws IOException { - out.writeShort(DataStoreVersions.CURRENT_VERSION); + out.writeShort(version); out.writeInt(modifications.size()); @@ -121,8 +134,7 @@ public class MutableCompositeModification implements CompositeModification { builder.setTimeStamp(System.nanoTime()); for (Modification m : modifications) { - builder.addModification( - (PersistentMessages.Modification) m.toSerializable()); + builder.addModification((PersistentMessages.Modification) m.toSerializable()); } return builder.build(); diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/modification/WriteModification.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/modification/WriteModification.java index 9c122c9ade..2fdca5f379 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/modification/WriteModification.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/modification/WriteModification.java @@ -31,6 +31,11 @@ public class WriteModification extends AbstractModification { private NormalizedNode data; public WriteModification() { + this(DataStoreVersions.CURRENT_VERSION); + } + + public WriteModification(short version) { + super(version); } public WriteModification(final YangInstanceIdentifier path, final NormalizedNode data) { @@ -54,14 +59,11 @@ public class WriteModification extends AbstractModification { @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { - in.readShort(); // version - SerializationUtils.deserializePathAndNode(in, this, APPLIER); } @Override public void writeExternal(ObjectOutput out) throws IOException { - out.writeShort(DataStoreVersions.CURRENT_VERSION); SerializationUtils.serializePathAndNode(getPath(), data, out); } @@ -81,8 +83,9 @@ public class WriteModification extends AbstractModification { return new WriteModification(decoded.getDecodedPath(), decoded.getDecodedNode()); } - public static WriteModification fromStream(ObjectInput in) throws ClassNotFoundException, IOException { - WriteModification mod = new WriteModification(); + public static WriteModification fromStream(ObjectInput in, short version) + throws ClassNotFoundException, IOException { + WriteModification mod = new WriteModification(version); mod.readExternal(in); return mod; } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/config/yang/config/distributed_datastore_provider/DistributedConfigDataStoreProviderModule.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/config/yang/config/distributed_datastore_provider/DistributedConfigDataStoreProviderModule.java index 7e8307465b..75e45b1d4f 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/config/yang/config/distributed_datastore_provider/DistributedConfigDataStoreProviderModule.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/config/yang/config/distributed_datastore_provider/DistributedConfigDataStoreProviderModule.java @@ -1,12 +1,10 @@ package org.opendaylight.controller.config.yang.config.distributed_datastore_provider; import java.util.concurrent.TimeUnit; - import org.opendaylight.controller.cluster.datastore.DatastoreContext; import org.opendaylight.controller.cluster.datastore.DistributedDataStoreFactory; import org.opendaylight.controller.md.sal.dom.store.impl.InMemoryDOMDataStoreConfigProperties; import org.osgi.framework.BundleContext; - import scala.concurrent.duration.Duration; public class DistributedConfigDataStoreProviderModule extends @@ -68,6 +66,7 @@ public class DistributedConfigDataStoreProviderModule extends props.getShardIsolatedLeaderCheckIntervalInMillis().getValue()) .shardElectionTimeoutFactor(props.getShardElectionTimeoutFactor().getValue()) .transactionCreationInitialRateLimit(props.getTxCreationInitialRateLimit().getValue()) + .shardBatchedModificationCount(props.getShardBatchedModificationCount().getValue().intValue()) .build(); return DistributedDataStoreFactory.createInstance(getConfigSchemaServiceDependency(), diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/config/yang/config/distributed_datastore_provider/DistributedOperationalDataStoreProviderModule.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/config/yang/config/distributed_datastore_provider/DistributedOperationalDataStoreProviderModule.java index 0655468531..f5d46a9a7e 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/config/yang/config/distributed_datastore_provider/DistributedOperationalDataStoreProviderModule.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/config/yang/config/distributed_datastore_provider/DistributedOperationalDataStoreProviderModule.java @@ -1,12 +1,10 @@ package org.opendaylight.controller.config.yang.config.distributed_datastore_provider; import java.util.concurrent.TimeUnit; - import org.opendaylight.controller.cluster.datastore.DatastoreContext; import org.opendaylight.controller.cluster.datastore.DistributedDataStoreFactory; import org.opendaylight.controller.md.sal.dom.store.impl.InMemoryDOMDataStoreConfigProperties; import org.osgi.framework.BundleContext; - import scala.concurrent.duration.Duration; public class DistributedOperationalDataStoreProviderModule extends @@ -68,6 +66,7 @@ public class DistributedOperationalDataStoreProviderModule extends props.getShardIsolatedLeaderCheckIntervalInMillis().getValue()) .shardElectionTimeoutFactor(props.getShardElectionTimeoutFactor().getValue()) .transactionCreationInitialRateLimit(props.getTxCreationInitialRateLimit().getValue()) + .shardBatchedModificationCount(props.getShardBatchedModificationCount().getValue().intValue()) .build(); return DistributedDataStoreFactory.createInstance(getOperationalSchemaServiceDependency(), diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/yang/distributed-datastore-provider.yang b/opendaylight/md-sal/sal-distributed-datastore/src/main/yang/distributed-datastore-provider.yang index e2ee7373d0..46918890b6 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/yang/distributed-datastore-provider.yang +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/yang/distributed-datastore-provider.yang @@ -156,6 +156,15 @@ module distributed-datastore-provider { an operation (eg transaction create)."; } + leaf shard-batched-modification-count { + default 100; + type non-zero-uint32-type; + description "The number of transaction modification operations (put, merge, delete) to + batch before sending to the shard transaction actor. Batching improves + performance as less modifications messages are sent to the actor and thus + lessens the chance that the transaction actor's mailbox queue could get full."; + } + leaf enable-metric-capture { default false; type boolean; diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DatastoreContextTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DatastoreContextTest.java index d3a3a8fc2d..0099b58dfb 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DatastoreContextTest.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DatastoreContextTest.java @@ -32,6 +32,7 @@ public class DatastoreContextTest { assertEquals(DatastoreContext.DEFAULT_SHARD_SNAPSHOT_DATA_THRESHOLD_PERCENTAGE, build.getShardRaftConfig().getSnapshotDataThresholdPercentage()); assertEquals(DatastoreContext.DEFAULT_SHARD_ELECTION_TIMEOUT_FACTOR, build.getShardRaftConfig().getElectionTimeoutFactor()); assertEquals(DatastoreContext.DEFAULT_TX_CREATION_INITIAL_RATE_LIMIT, build.getTransactionCreationInitialRateLimit()); + assertEquals(DatastoreContext.DEFAULT_SHARD_BATCHED_MODIFICATION_COUNT, build.getShardBatchedModificationCount()); } -} \ No newline at end of file +} diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DistributedDataStoreTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DistributedDataStoreTest.java index 66fa876277..4ec035ee3b 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DistributedDataStoreTest.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DistributedDataStoreTest.java @@ -25,6 +25,7 @@ public class DistributedDataStoreTest extends AbstractActorTest { schemaContext = TestModel.createTestContext(); doReturn(schemaContext).when(actorContext).getSchemaContext(); + doReturn(DatastoreContext.newBuilder().build()).when(actorContext).getDatastoreContext(); } @Test diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/OperationCompleterTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/OperationCompleterTest.java new file mode 100644 index 0000000000..e7afe262b9 --- /dev/null +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/OperationCompleterTest.java @@ -0,0 +1,45 @@ +/* + * Copyright (c) 2015 Brocade Communications Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.controller.cluster.datastore; + +import static org.junit.Assert.assertEquals; +import java.util.concurrent.Semaphore; +import org.junit.Test; +import org.opendaylight.controller.cluster.datastore.messages.BatchedModificationsReply; +import org.opendaylight.controller.cluster.datastore.messages.DataExistsReply; + +/** + * Unit tests for OperationCompleter. + * + * @author Thomas Pantelis + */ +public class OperationCompleterTest { + + @Test + public void testOnComplete() throws Exception { + int permits = 10; + Semaphore operationLimiter = new Semaphore(permits); + operationLimiter.acquire(permits); + int availablePermits = 0; + + OperationCompleter completer = new OperationCompleter(operationLimiter ); + + completer.onComplete(null, new DataExistsReply(true)); + assertEquals("availablePermits", ++availablePermits, operationLimiter.availablePermits()); + + completer.onComplete(null, new DataExistsReply(true)); + assertEquals("availablePermits", ++availablePermits, operationLimiter.availablePermits()); + + completer.onComplete(null, new IllegalArgumentException()); + assertEquals("availablePermits", ++availablePermits, operationLimiter.availablePermits()); + + completer.onComplete(null, new BatchedModificationsReply(4)); + availablePermits += 4; + assertEquals("availablePermits", availablePermits, operationLimiter.availablePermits()); + } +} diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTransactionHeliumBackwardsCompatibilityTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTransactionHeliumBackwardsCompatibilityTest.java index 58cec67a2d..1d1b08b5f8 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTransactionHeliumBackwardsCompatibilityTest.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTransactionHeliumBackwardsCompatibilityTest.java @@ -79,8 +79,8 @@ public class ShardTransactionHeliumBackwardsCompatibilityTest extends AbstractAc // Write data to the Tx txActor.tell(new WriteData(TestModel.TEST_PATH, - ImmutableNodes.containerNode(TestModel.TEST_QNAME)).toSerializable( - DataStoreVersions.BASE_HELIUM_VERSION), getRef()); + ImmutableNodes.containerNode(TestModel.TEST_QNAME), DataStoreVersions.BASE_HELIUM_VERSION). + toSerializable(), getRef()); expectMsgClass(duration, ShardTransactionMessages.WriteDataReply.class); @@ -153,9 +153,11 @@ public class ShardTransactionHeliumBackwardsCompatibilityTest extends AbstractAc // Write data to the Tx txActor.tell(new WriteData(TestModel.TEST_PATH, - ImmutableNodes.containerNode(TestModel.TEST_QNAME)), getRef()); + ImmutableNodes.containerNode(TestModel.TEST_QNAME), + DataStoreVersions.BASE_HELIUM_VERSION).toSerializable(), getRef()); - expectMsgClass(duration, WriteDataReply.class); + expectMsgClass(duration, WriteDataReply.INSTANCE.toSerializable( + DataStoreVersions.BASE_HELIUM_VERSION).getClass()); // Ready the Tx diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTransactionTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTransactionTest.java index 851fb0114b..2973d277f5 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTransactionTest.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTransactionTest.java @@ -14,10 +14,14 @@ import java.util.Collections; import java.util.concurrent.TimeUnit; import org.junit.Before; import org.junit.Test; +import org.mockito.InOrder; +import org.mockito.Mockito; import org.opendaylight.controller.cluster.datastore.ShardWriteTransaction.GetCompositeModificationReply; import org.opendaylight.controller.cluster.datastore.exceptions.UnknownMessageException; import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier; import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shard.ShardStats; +import org.opendaylight.controller.cluster.datastore.messages.BatchedModifications; +import org.opendaylight.controller.cluster.datastore.messages.BatchedModificationsReply; import org.opendaylight.controller.cluster.datastore.messages.CloseTransaction; import org.opendaylight.controller.cluster.datastore.messages.CloseTransactionReply; import org.opendaylight.controller.cluster.datastore.messages.CreateSnapshot; @@ -46,9 +50,11 @@ import org.opendaylight.controller.md.cluster.datastore.model.TestModel; import org.opendaylight.controller.md.sal.dom.store.impl.InMemoryDOMDataStore; import org.opendaylight.controller.protobuff.messages.transaction.ShardTransactionMessages; import org.opendaylight.controller.sal.core.spi.data.DOMStoreTransaction; +import org.opendaylight.controller.sal.core.spi.data.DOMStoreWriteTransaction; import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode; import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes; +import org.opendaylight.yangtools.yang.data.impl.schema.builder.impl.ImmutableContainerNodeBuilder; import org.opendaylight.yangtools.yang.model.api.SchemaContext; import scala.concurrent.duration.Duration; @@ -250,8 +256,8 @@ public class ShardTransactionTest extends AbstractActorTest { "testOnReceiveWriteData"); transaction.tell(new WriteData(TestModel.TEST_PATH, - ImmutableNodes.containerNode(TestModel.TEST_QNAME)).toSerializable( - DataStoreVersions.HELIUM_2_VERSION), getRef()); + ImmutableNodes.containerNode(TestModel.TEST_QNAME), DataStoreVersions.HELIUM_2_VERSION). + toSerializable(), getRef()); expectMsgClass(duration("5 seconds"), ShardTransactionMessages.WriteDataReply.class); @@ -259,7 +265,7 @@ public class ShardTransactionTest extends AbstractActorTest { // unserialized write transaction.tell(new WriteData(TestModel.TEST_PATH, - ImmutableNodes.containerNode(TestModel.TEST_QNAME)), + ImmutableNodes.containerNode(TestModel.TEST_QNAME), DataStoreVersions.CURRENT_VERSION), getRef()); expectMsgClass(duration("5 seconds"), WriteDataReply.class); @@ -293,8 +299,8 @@ public class ShardTransactionTest extends AbstractActorTest { "testMergeData"); transaction.tell(new MergeData(TestModel.TEST_PATH, - ImmutableNodes.containerNode(TestModel.TEST_QNAME)).toSerializable( - DataStoreVersions.HELIUM_2_VERSION), getRef()); + ImmutableNodes.containerNode(TestModel.TEST_QNAME), DataStoreVersions.HELIUM_2_VERSION). + toSerializable(), getRef()); expectMsgClass(duration("5 seconds"), ShardTransactionMessages.MergeDataReply.class); @@ -302,7 +308,7 @@ public class ShardTransactionTest extends AbstractActorTest { //unserialized merge transaction.tell(new MergeData(TestModel.TEST_PATH, - ImmutableNodes.containerNode(TestModel.TEST_QNAME)), + ImmutableNodes.containerNode(TestModel.TEST_QNAME), DataStoreVersions.CURRENT_VERSION), getRef()); expectMsgClass(duration("5 seconds"), MergeDataReply.class); @@ -335,20 +341,73 @@ public class ShardTransactionTest extends AbstractActorTest { final ActorRef transaction = newTransactionActor(store.newWriteOnlyTransaction(), "testDeleteData"); - transaction.tell(new DeleteData(TestModel.TEST_PATH).toSerializable( - DataStoreVersions.HELIUM_2_VERSION), getRef()); + transaction.tell(new DeleteData(TestModel.TEST_PATH, DataStoreVersions.HELIUM_2_VERSION). + toSerializable(), getRef()); expectMsgClass(duration("5 seconds"), ShardTransactionMessages.DeleteDataReply.class); assertModification(transaction, DeleteModification.class); //unserialized - transaction.tell(new DeleteData(TestModel.TEST_PATH), getRef()); + transaction.tell(new DeleteData(TestModel.TEST_PATH, DataStoreVersions.CURRENT_VERSION), getRef()); expectMsgClass(duration("5 seconds"), DeleteDataReply.class); }}; } + @Test + public void testOnReceiveBatchedModifications() throws Exception { + new JavaTestKit(getSystem()) {{ + + DOMStoreWriteTransaction mockWriteTx = Mockito.mock(DOMStoreWriteTransaction.class); + final ActorRef transaction = newTransactionActor(mockWriteTx, "testOnReceiveBatchedModifications"); + + YangInstanceIdentifier writePath = TestModel.TEST_PATH; + NormalizedNode writeData = ImmutableContainerNodeBuilder.create().withNodeIdentifier( + new YangInstanceIdentifier.NodeIdentifier(TestModel.TEST_QNAME)). + withChild(ImmutableNodes.leafNode(TestModel.DESC_QNAME, "foo")).build(); + + YangInstanceIdentifier mergePath = TestModel.OUTER_LIST_PATH; + NormalizedNode mergeData = ImmutableContainerNodeBuilder.create().withNodeIdentifier( + new YangInstanceIdentifier.NodeIdentifier(TestModel.OUTER_LIST_QNAME)).build(); + + YangInstanceIdentifier deletePath = TestModel.TEST_PATH; + + BatchedModifications batched = new BatchedModifications(DataStoreVersions.CURRENT_VERSION); + batched.addModification(new WriteModification(writePath, writeData)); + batched.addModification(new MergeModification(mergePath, mergeData)); + batched.addModification(new DeleteModification(deletePath)); + + transaction.tell(batched, getRef()); + + BatchedModificationsReply reply = expectMsgClass(duration("5 seconds"), BatchedModificationsReply.class); + assertEquals("getNumBatched", 3, reply.getNumBatched()); + + JavaTestKit verification = new JavaTestKit(getSystem()); + transaction.tell(new ShardWriteTransaction.GetCompositedModification(), verification.getRef()); + + CompositeModification compositeModification = verification.expectMsgClass(duration("5 seconds"), + GetCompositeModificationReply.class).getModification(); + + assertEquals("CompositeModification size", 3, compositeModification.getModifications().size()); + + WriteModification write = (WriteModification)compositeModification.getModifications().get(0); + assertEquals("getPath", writePath, write.getPath()); + assertEquals("getData", writeData, write.getData()); + + MergeModification merge = (MergeModification)compositeModification.getModifications().get(1); + assertEquals("getPath", mergePath, merge.getPath()); + assertEquals("getData", mergeData, merge.getData()); + + DeleteModification delete = (DeleteModification)compositeModification.getModifications().get(2); + assertEquals("getPath", deletePath, delete.getPath()); + + InOrder inOrder = Mockito.inOrder(mockWriteTx); + inOrder.verify(mockWriteTx).write(writePath, writeData); + inOrder.verify(mockWriteTx).merge(mergePath, mergeData); + inOrder.verify(mockWriteTx).delete(deletePath); + }}; + } @Test public void testOnReceiveReadyTransaction() throws Exception { @@ -463,8 +522,8 @@ public class ShardTransactionTest extends AbstractActorTest { DataStoreVersions.CURRENT_VERSION); final TestActorRef transaction = TestActorRef.apply(props,getSystem()); - transaction.receive(new DeleteData(TestModel.TEST_PATH).toSerializable( - DataStoreVersions.CURRENT_VERSION), ActorRef.noSender()); + transaction.receive(new DeleteData(TestModel.TEST_PATH, DataStoreVersions.CURRENT_VERSION). + toSerializable(), ActorRef.noSender()); } @Test diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/TransactionChainProxyTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/TransactionChainProxyTest.java index 23c3a82a38..88ab0dd292 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/TransactionChainProxyTest.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/TransactionChainProxyTest.java @@ -43,6 +43,7 @@ public class TransactionChainProxyTest extends AbstractActorTest{ actorContext.setSchemaContext(schemaContext); doReturn(schemaContext).when(mockActorContext).getSchemaContext(); + doReturn(DatastoreContext.newBuilder().build()).when(mockActorContext).getDatastoreContext(); } @SuppressWarnings("resource") diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/TransactionProxyTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/TransactionProxyTest.java index fa2f9187d6..6573308c12 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/TransactionProxyTest.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/TransactionProxyTest.java @@ -30,6 +30,7 @@ import com.google.common.util.concurrent.Uninterruptibles; import com.typesafe.config.Config; import com.typesafe.config.ConfigFactory; import java.io.IOException; +import java.util.ArrayList; import java.util.List; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; @@ -39,13 +40,18 @@ import org.junit.Assert; import org.junit.Before; import org.junit.BeforeClass; import org.junit.Test; +import org.mockito.ArgumentCaptor; import org.mockito.ArgumentMatcher; +import org.mockito.InOrder; import org.mockito.Mock; import org.mockito.Mockito; import org.mockito.MockitoAnnotations; +import org.opendaylight.controller.cluster.datastore.DatastoreContext.Builder; import org.opendaylight.controller.cluster.datastore.TransactionProxy.TransactionType; import org.opendaylight.controller.cluster.datastore.exceptions.PrimaryNotFoundException; import org.opendaylight.controller.cluster.datastore.exceptions.TimeoutException; +import org.opendaylight.controller.cluster.datastore.messages.BatchedModifications; +import org.opendaylight.controller.cluster.datastore.messages.BatchedModificationsReply; import org.opendaylight.controller.cluster.datastore.messages.CloseTransaction; import org.opendaylight.controller.cluster.datastore.messages.CreateTransaction; import org.opendaylight.controller.cluster.datastore.messages.DataExists; @@ -60,6 +66,11 @@ import org.opendaylight.controller.cluster.datastore.messages.ReadyTransaction; import org.opendaylight.controller.cluster.datastore.messages.ReadyTransactionReply; import org.opendaylight.controller.cluster.datastore.messages.WriteData; import org.opendaylight.controller.cluster.datastore.messages.WriteDataReply; +import org.opendaylight.controller.cluster.datastore.modification.AbstractModification; +import org.opendaylight.controller.cluster.datastore.modification.DeleteModification; +import org.opendaylight.controller.cluster.datastore.modification.MergeModification; +import org.opendaylight.controller.cluster.datastore.modification.Modification; +import org.opendaylight.controller.cluster.datastore.modification.WriteModification; import org.opendaylight.controller.cluster.datastore.shardstrategy.DefaultShardStrategy; import org.opendaylight.controller.cluster.datastore.shardstrategy.ShardStrategyFactory; import org.opendaylight.controller.cluster.datastore.utils.ActorContext; @@ -71,6 +82,7 @@ import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException; import org.opendaylight.controller.protobuff.messages.transaction.ShardTransactionMessages; import org.opendaylight.controller.protobuff.messages.transaction.ShardTransactionMessages.CreateTransactionReply; import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort; +import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode; import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes; import org.opendaylight.yangtools.yang.model.api.SchemaContext; @@ -102,7 +114,10 @@ public class TransactionProxyTest { @Mock private ClusterWrapper mockClusterWrapper; - String memberName = "mock-member"; + private final String memberName = "mock-member"; + + private final Builder dataStoreContextBuilder = DatastoreContext.newBuilder().operationTimeoutInSeconds(2). + shardBatchedModificationCount(1); @BeforeClass public static void setUpClass() throws IOException { @@ -126,15 +141,13 @@ public class TransactionProxyTest { schemaContext = TestModel.createTestContext(); - DatastoreContext dataStoreContext = DatastoreContext.newBuilder().operationTimeoutInSeconds(2).build(); - doReturn(getSystem()).when(mockActorContext).getActorSystem(); doReturn(getSystem().dispatchers().defaultGlobalDispatcher()).when(mockActorContext).getClientDispatcher(); doReturn(memberName).when(mockActorContext).getCurrentMemberName(); doReturn(schemaContext).when(mockActorContext).getSchemaContext(); doReturn(mockClusterWrapper).when(mockActorContext).getClusterWrapper(); doReturn(mockClusterWrapper).when(mockActorContext).getClusterWrapper(); - doReturn(dataStoreContext).when(mockActorContext).getDatastoreContext(); + doReturn(dataStoreContextBuilder.build()).when(mockActorContext).getDatastoreContext(); doReturn(10).when(mockActorContext).getTransactionOutstandingOperationLimit(); ShardStrategyFactory.setConfiguration(configuration); @@ -187,11 +200,15 @@ public class TransactionProxyTest { } private ReadData eqSerializedReadData() { + return eqSerializedReadData(TestModel.TEST_PATH); + } + + private ReadData eqSerializedReadData(final YangInstanceIdentifier path) { ArgumentMatcher matcher = new ArgumentMatcher() { @Override public boolean matches(Object argument) { return ReadData.SERIALIZABLE_CLASS.equals(argument.getClass()) && - ReadData.fromSerializable(argument).getPath().equals(TestModel.TEST_PATH); + ReadData.fromSerializable(argument).getPath().equals(path); } }; @@ -210,23 +227,13 @@ public class TransactionProxyTest { return argThat(matcher); } - private WriteData eqSerializedWriteData(final NormalizedNode nodeToWrite) { - return eqSerializedWriteData(nodeToWrite, DataStoreVersions.CURRENT_VERSION); - } - - private WriteData eqSerializedWriteData(final NormalizedNode nodeToWrite, - final int transactionVersion) { + private WriteData eqLegacyWriteData(final NormalizedNode nodeToWrite) { ArgumentMatcher matcher = new ArgumentMatcher() { @Override public boolean matches(Object argument) { - if((transactionVersion >= DataStoreVersions.LITHIUM_VERSION && - WriteData.SERIALIZABLE_CLASS.equals(argument.getClass())) || - (transactionVersion < DataStoreVersions.LITHIUM_VERSION && - ShardTransactionMessages.WriteData.class.equals(argument.getClass()))) { - + if(ShardTransactionMessages.WriteData.class.equals(argument.getClass())) { WriteData obj = WriteData.fromSerializable(argument); - return obj.getPath().equals(TestModel.TEST_PATH) && - obj.getData().equals(nodeToWrite); + return obj.getPath().equals(TestModel.TEST_PATH) && obj.getData().equals(nodeToWrite); } return false; @@ -236,39 +243,13 @@ public class TransactionProxyTest { return argThat(matcher); } - private WriteData eqWriteData(final NormalizedNode nodeToWrite) { - ArgumentMatcher matcher = new ArgumentMatcher() { - @Override - public boolean matches(Object argument) { - if(argument instanceof WriteData) { - WriteData obj = (WriteData) argument; - return obj.getPath().equals(TestModel.TEST_PATH) && - obj.getData().equals(nodeToWrite); - } - return false; - } - }; - - return argThat(matcher); - } - - private MergeData eqSerializedMergeData(final NormalizedNode nodeToWrite) { - return eqSerializedMergeData(nodeToWrite, DataStoreVersions.CURRENT_VERSION); - } - - private MergeData eqSerializedMergeData(final NormalizedNode nodeToWrite, - final int transactionVersion) { + private MergeData eqLegacyMergeData(final NormalizedNode nodeToWrite) { ArgumentMatcher matcher = new ArgumentMatcher() { @Override public boolean matches(Object argument) { - if((transactionVersion >= DataStoreVersions.LITHIUM_VERSION && - MergeData.SERIALIZABLE_CLASS.equals(argument.getClass())) || - (transactionVersion < DataStoreVersions.LITHIUM_VERSION && - ShardTransactionMessages.MergeData.class.equals(argument.getClass()))) { - + if(ShardTransactionMessages.MergeData.class.equals(argument.getClass())) { MergeData obj = MergeData.fromSerializable(argument); - return obj.getPath().equals(TestModel.TEST_PATH) && - obj.getData().equals(nodeToWrite); + return obj.getPath().equals(TestModel.TEST_PATH) && obj.getData().equals(nodeToWrite); } return false; @@ -278,41 +259,12 @@ public class TransactionProxyTest { return argThat(matcher); } - private MergeData eqMergeData(final NormalizedNode nodeToWrite) { - ArgumentMatcher matcher = new ArgumentMatcher() { - @Override - public boolean matches(Object argument) { - if(argument instanceof MergeData) { - MergeData obj = ((MergeData) argument); - return obj.getPath().equals(TestModel.TEST_PATH) && - obj.getData().equals(nodeToWrite); - } - - return false; - } - }; - - return argThat(matcher); - } - - private DeleteData eqSerializedDeleteData() { - ArgumentMatcher matcher = new ArgumentMatcher() { - @Override - public boolean matches(Object argument) { - return DeleteData.SERIALIZABLE_CLASS.equals(argument.getClass()) && - DeleteData.fromSerializable(argument).getPath().equals(TestModel.TEST_PATH); - } - }; - - return argThat(matcher); - } - - private DeleteData eqDeleteData() { + private DeleteData eqLegacyDeleteData(final YangInstanceIdentifier expPath) { ArgumentMatcher matcher = new ArgumentMatcher() { @Override public boolean matches(Object argument) { - return argument instanceof DeleteData && - ((DeleteData)argument).getPath().equals(TestModel.TEST_PATH); + return ShardTransactionMessages.DeleteData.class.equals(argument.getClass()) && + DeleteData.fromSerializable(argument).getPath().equals(expPath); } }; @@ -329,7 +281,7 @@ public class TransactionProxyTest { private Future readSerializedDataReply(NormalizedNode data, short transactionVersion) { - return Futures.successful(new ReadDataReply(data).toSerializable(transactionVersion)); + return Futures.successful(new ReadDataReply(data, transactionVersion).toSerializable()); } private Future readSerializedDataReply(NormalizedNode data) { @@ -337,7 +289,7 @@ public class TransactionProxyTest { } private Future readDataReply(NormalizedNode data) { - return Futures.successful(new ReadDataReply(data)); + return Futures.successful(new ReadDataReply(data, DataStoreVersions.CURRENT_VERSION)); } private Future dataExistsSerializedReply(boolean exists) { @@ -348,48 +300,41 @@ public class TransactionProxyTest { return Futures.successful(new DataExistsReply(exists)); } - private Future writeSerializedDataReply(short version) { - return Futures.successful(new WriteDataReply().toSerializable(version)); - } - - private Future writeSerializedDataReply() { - return writeSerializedDataReply(DataStoreVersions.CURRENT_VERSION); - } - - private Future writeDataReply() { - return Futures.successful(new WriteDataReply()); - } - - private Future mergeSerializedDataReply(short version) { - return Futures.successful(new MergeDataReply().toSerializable(version)); - } - - private Future mergeSerializedDataReply() { - return mergeSerializedDataReply(DataStoreVersions.CURRENT_VERSION); + private Future batchedModificationsReply(int count) { + return Futures.successful(new BatchedModificationsReply(count)); } private Future incompleteFuture(){ return mock(Future.class); } - private Future mergeDataReply() { - return Futures.successful(new MergeDataReply()); + private ActorSelection actorSelection(ActorRef actorRef) { + return getSystem().actorSelection(actorRef.path()); + } + + private void expectBatchedModifications(ActorRef actorRef, int count) { + doReturn(batchedModificationsReply(count)).when(mockActorContext).executeOperationAsync( + eq(actorSelection(actorRef)), isA(BatchedModifications.class)); } - private Future deleteSerializedDataReply(short version) { - return Futures.successful(new DeleteDataReply().toSerializable(version)); + private void expectBatchedModifications(int count) { + doReturn(batchedModificationsReply(count)).when(mockActorContext).executeOperationAsync( + any(ActorSelection.class), isA(BatchedModifications.class)); } - private Future deleteSerializedDataReply() { - return deleteSerializedDataReply(DataStoreVersions.CURRENT_VERSION); + private void expectIncompleteBatchedModifications() { + doReturn(incompleteFuture()).when(mockActorContext).executeOperationAsync( + any(ActorSelection.class), isA(BatchedModifications.class)); } - private Future deleteDataReply() { - return Futures.successful(new DeleteDataReply()); + private void expectReadyTransaction(ActorRef actorRef) { + doReturn(readySerializedTxReply(actorRef.path().toString())).when(mockActorContext).executeOperationAsync( + eq(actorSelection(actorRef)), isA(ReadyTransaction.SERIALIZABLE_CLASS)); } - private ActorSelection actorSelection(ActorRef actorRef) { - return getSystem().actorSelection(actorRef.path()); + private void expectFailedBatchedModifications(ActorRef actorRef) { + doReturn(Futures.failed(new TestException())).when(mockActorContext).executeOperationAsync( + eq(actorSelection(actorRef)), isA(BatchedModifications.class)); } private CreateTransactionReply createTransactionReply(ActorRef actorRef, int transactionVersion){ @@ -446,8 +391,7 @@ public class TransactionProxyTest { public void testRead() throws Exception { ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_ONLY); - TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, - READ_ONLY); + TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, READ_ONLY); doReturn(readSerializedDataReply(null)).when(mockActorContext).executeOperationAsync( eq(actorSelection(actorRef)), eqSerializedReadData()); @@ -476,8 +420,7 @@ public class TransactionProxyTest { doReturn(Futures.successful(new Object())).when(mockActorContext). executeOperationAsync(eq(actorSelection(actorRef)), eqSerializedReadData()); - TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, - READ_ONLY); + TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, READ_ONLY); transactionProxy.read(TestModel.TEST_PATH).checkedGet(5, TimeUnit.SECONDS); } @@ -489,8 +432,7 @@ public class TransactionProxyTest { doReturn(Futures.failed(new TestException())).when(mockActorContext). executeOperationAsync(eq(actorSelection(actorRef)), eqSerializedReadData()); - TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, - READ_ONLY); + TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, READ_ONLY); propagateReadFailedExceptionCause(transactionProxy.read(TestModel.TEST_PATH)); } @@ -541,21 +483,19 @@ public class TransactionProxyTest { @Test(expected = TestException.class) public void testReadWithPriorRecordingOperationFailure() throws Throwable { + doReturn(dataStoreContextBuilder.shardBatchedModificationCount(2).build()). + when(mockActorContext).getDatastoreContext(); + ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_WRITE); NormalizedNode nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME); - doReturn(writeSerializedDataReply()).when(mockActorContext).executeOperationAsync( - eq(actorSelection(actorRef)), eqSerializedWriteData(nodeToWrite)); - - doReturn(Futures.failed(new TestException())).when(mockActorContext). - executeOperationAsync(eq(actorSelection(actorRef)), eqSerializedDeleteData()); + expectFailedBatchedModifications(actorRef); doReturn(readSerializedDataReply(null)).when(mockActorContext).executeOperationAsync( eq(actorSelection(actorRef)), eqSerializedReadData()); - TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, - READ_WRITE); + TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, READ_WRITE); transactionProxy.write(TestModel.TEST_PATH, nodeToWrite); @@ -575,14 +515,12 @@ public class TransactionProxyTest { NormalizedNode expectedNode = ImmutableNodes.containerNode(TestModel.TEST_QNAME); - doReturn(writeSerializedDataReply()).when(mockActorContext).executeOperationAsync( - eq(actorSelection(actorRef)), eqSerializedWriteData(expectedNode)); + expectBatchedModifications(actorRef, 1); doReturn(readSerializedDataReply(expectedNode)).when(mockActorContext).executeOperationAsync( eq(actorSelection(actorRef)), eqSerializedReadData()); - TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, - READ_WRITE); + TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, READ_WRITE); transactionProxy.write(TestModel.TEST_PATH, expectedNode); @@ -590,16 +528,19 @@ public class TransactionProxyTest { TestModel.TEST_PATH).get(5, TimeUnit.SECONDS); assertEquals("NormalizedNode isPresent", true, readOptional.isPresent()); - assertEquals("Response NormalizedNode", expectedNode, readOptional.get()); + + InOrder inOrder = Mockito.inOrder(mockActorContext); + inOrder.verify(mockActorContext).executeOperationAsync( + eq(actorSelection(actorRef)), isA(BatchedModifications.class)); + + inOrder.verify(mockActorContext).executeOperationAsync( + eq(actorSelection(actorRef)), eqSerializedReadData()); } @Test(expected=IllegalStateException.class) public void testReadPreConditionCheck() { - - TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, - WRITE_ONLY); - + TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, WRITE_ONLY); transactionProxy.read(TestModel.TEST_PATH); } @@ -625,8 +566,7 @@ public class TransactionProxyTest { public void testExists() throws Exception { ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_ONLY); - TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, - READ_ONLY); + TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, READ_ONLY); doReturn(dataExistsSerializedReply(false)).when(mockActorContext).executeOperationAsync( eq(actorSelection(actorRef)), eqSerializedDataExists()); @@ -673,23 +613,21 @@ public class TransactionProxyTest { doReturn(Futures.failed(new TestException())).when(mockActorContext). executeOperationAsync(eq(actorSelection(actorRef)), eqSerializedDataExists()); - TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, - READ_ONLY); + TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, READ_ONLY); propagateReadFailedExceptionCause(transactionProxy.exists(TestModel.TEST_PATH)); } @Test(expected = TestException.class) public void testExistsWithPriorRecordingOperationFailure() throws Throwable { + doReturn(dataStoreContextBuilder.shardBatchedModificationCount(2).build()). + when(mockActorContext).getDatastoreContext(); + ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_WRITE); NormalizedNode nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME); - doReturn(writeSerializedDataReply()).when(mockActorContext).executeOperationAsync( - eq(actorSelection(actorRef)), eqSerializedWriteData(nodeToWrite)); - - doReturn(Futures.failed(new TestException())).when(mockActorContext). - executeOperationAsync(eq(actorSelection(actorRef)), eqSerializedDeleteData()); + expectFailedBatchedModifications(actorRef); doReturn(dataExistsSerializedReply(false)).when(mockActorContext).executeOperationAsync( eq(actorSelection(actorRef)), eqSerializedDataExists()); @@ -715,28 +653,30 @@ public class TransactionProxyTest { NormalizedNode nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME); - doReturn(writeSerializedDataReply()).when(mockActorContext).executeOperationAsync( - eq(actorSelection(actorRef)), eqSerializedWriteData(nodeToWrite)); + expectBatchedModifications(actorRef, 1); doReturn(dataExistsSerializedReply(true)).when(mockActorContext).executeOperationAsync( eq(actorSelection(actorRef)), eqSerializedDataExists()); - TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, - READ_WRITE); + TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, READ_WRITE); transactionProxy.write(TestModel.TEST_PATH, nodeToWrite); Boolean exists = transactionProxy.exists(TestModel.TEST_PATH).checkedGet(); assertEquals("Exists response", true, exists); + + InOrder inOrder = Mockito.inOrder(mockActorContext); + inOrder.verify(mockActorContext).executeOperationAsync( + eq(actorSelection(actorRef)), isA(BatchedModifications.class)); + + inOrder.verify(mockActorContext).executeOperationAsync( + eq(actorSelection(actorRef)), eqSerializedDataExists()); } @Test(expected=IllegalStateException.class) public void testExistsPreConditionCheck() { - - TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, - WRITE_ONLY); - + TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, WRITE_ONLY); transactionProxy.exists(TestModel.TEST_PATH); } @@ -757,7 +697,7 @@ public class TransactionProxyTest { // Expected } } else { - assertEquals("Recording operation Future result type", expResultType, + assertEquals(String.format("Recording operation %d Future result type", i +1 ), expResultType, Await.result(future, Duration.create(5, TimeUnit.SECONDS)).getClass()); } } @@ -769,19 +709,20 @@ public class TransactionProxyTest { NormalizedNode nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME); - doReturn(writeSerializedDataReply()).when(mockActorContext).executeOperationAsync( - eq(actorSelection(actorRef)), eqSerializedWriteData(nodeToWrite)); + expectBatchedModifications(actorRef, 1); + expectReadyTransaction(actorRef); - TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, - WRITE_ONLY); + TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, WRITE_ONLY); transactionProxy.write(TestModel.TEST_PATH, nodeToWrite); - verify(mockActorContext).executeOperationAsync( - eq(actorSelection(actorRef)), eqSerializedWriteData(nodeToWrite)); + // This sends the batched modification. + transactionProxy.ready(); + + verifyOneBatchedModification(actorRef, new WriteModification(TestModel.TEST_PATH, nodeToWrite)); verifyRecordingOperationFutures(transactionProxy.getRecordedOperationFutures(), - WriteDataReply.class); + BatchedModificationsReply.class); } @Test @@ -796,10 +737,10 @@ public class TransactionProxyTest { doReturn(readSerializedDataReply(null)).when(mockActorContext).executeOperationAsync( eq(actorSelection(actorRef)), eqSerializedReadData()); - final NormalizedNode nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME); + expectBatchedModifications(actorRef, 1); + expectReadyTransaction(actorRef); - doReturn(writeSerializedDataReply()).when(mockActorContext).executeOperationAsync( - eq(actorSelection(actorRef)), eqSerializedWriteData(nodeToWrite)); + final NormalizedNode nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME); final TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, READ_WRITE); @@ -833,33 +774,28 @@ public class TransactionProxyTest { throw caughtEx.get(); } - verify(mockActorContext).executeOperationAsync( - eq(actorSelection(actorRef)), eqSerializedWriteData(nodeToWrite)); + // This sends the batched modification. + transactionProxy.ready(); + + verifyOneBatchedModification(actorRef, new WriteModification(TestModel.TEST_PATH, nodeToWrite)); verifyRecordingOperationFutures(transactionProxy.getRecordedOperationFutures(), - WriteDataReply.class); + BatchedModificationsReply.class); } @Test(expected=IllegalStateException.class) public void testWritePreConditionCheck() { - - TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, - READ_ONLY); - - transactionProxy.write(TestModel.TEST_PATH, - ImmutableNodes.containerNode(TestModel.TEST_QNAME)); + TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, READ_ONLY); + transactionProxy.write(TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME)); } @Test(expected=IllegalStateException.class) public void testWriteAfterReadyPreConditionCheck() { - - TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, - WRITE_ONLY); + TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, WRITE_ONLY); transactionProxy.ready(); - transactionProxy.write(TestModel.TEST_PATH, - ImmutableNodes.containerNode(TestModel.TEST_QNAME)); + transactionProxy.write(TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME)); } @Test @@ -868,37 +804,40 @@ public class TransactionProxyTest { NormalizedNode nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME); - doReturn(mergeSerializedDataReply()).when(mockActorContext).executeOperationAsync( - eq(actorSelection(actorRef)), eqSerializedMergeData(nodeToWrite)); + expectBatchedModifications(actorRef, 1); + expectReadyTransaction(actorRef); TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, WRITE_ONLY); transactionProxy.merge(TestModel.TEST_PATH, nodeToWrite); - verify(mockActorContext).executeOperationAsync( - eq(actorSelection(actorRef)), eqSerializedMergeData(nodeToWrite)); + // This sends the batched modification. + transactionProxy.ready(); + + verifyOneBatchedModification(actorRef, new MergeModification(TestModel.TEST_PATH, nodeToWrite)); verifyRecordingOperationFutures(transactionProxy.getRecordedOperationFutures(), - MergeDataReply.class); + BatchedModificationsReply.class); } @Test public void testDelete() throws Exception { ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), WRITE_ONLY); - doReturn(deleteSerializedDataReply()).when(mockActorContext).executeOperationAsync( - eq(actorSelection(actorRef)), eqSerializedDeleteData()); + expectBatchedModifications(actorRef, 1); + expectReadyTransaction(actorRef); - TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, - WRITE_ONLY); + TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, WRITE_ONLY); transactionProxy.delete(TestModel.TEST_PATH); - verify(mockActorContext).executeOperationAsync( - eq(actorSelection(actorRef)), eqSerializedDeleteData()); + // This sends the batched modification. + transactionProxy.ready(); + + verifyOneBatchedModification(actorRef, new DeleteModification(TestModel.TEST_PATH)); verifyRecordingOperationFutures(transactionProxy.getRecordedOperationFutures(), - DeleteDataReply.class); + BatchedModificationsReply.class); } private void verifyCohortFutures(ThreePhaseCommitCohortProxy proxy, @@ -935,14 +874,10 @@ public class TransactionProxyTest { doReturn(readSerializedDataReply(null)).when(mockActorContext).executeOperationAsync( eq(actorSelection(actorRef)), eqSerializedReadData()); - doReturn(writeSerializedDataReply()).when(mockActorContext).executeOperationAsync( - eq(actorSelection(actorRef)), eqSerializedWriteData(nodeToWrite)); + expectBatchedModifications(actorRef, 1); + expectReadyTransaction(actorRef); - doReturn(readySerializedTxReply(actorRef.path().toString())).when(mockActorContext).executeOperationAsync( - eq(actorSelection(actorRef)), isA(ReadyTransaction.SERIALIZABLE_CLASS)); - - TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, - READ_WRITE); + TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, READ_WRITE); transactionProxy.read(TestModel.TEST_PATH); @@ -955,9 +890,12 @@ public class TransactionProxyTest { ThreePhaseCommitCohortProxy proxy = (ThreePhaseCommitCohortProxy) ready; verifyRecordingOperationFutures(transactionProxy.getRecordedOperationFutures(), - WriteDataReply.class); + BatchedModificationsReply.class); verifyCohortFutures(proxy, getSystem().actorSelection(actorRef.path())); + + verify(mockActorContext).executeOperationAsync(eq(actorSelection(actorRef)), + isA(BatchedModifications.class)); } private ActorRef testCompatibilityWithHeliumVersion(short version) throws Exception { @@ -969,14 +907,16 @@ public class TransactionProxyTest { doReturn(readSerializedDataReply(testNode, version)).when(mockActorContext).executeOperationAsync( eq(actorSelection(actorRef)), eqSerializedReadData()); - doReturn(writeSerializedDataReply(version)).when(mockActorContext).executeOperationAsync( - eq(actorSelection(actorRef)), eqSerializedWriteData(testNode, version)); + doReturn(Futures.successful(new WriteDataReply().toSerializable(version))).when(mockActorContext). + executeOperationAsync(eq(actorSelection(actorRef)), eqLegacyWriteData(testNode)); - doReturn(mergeSerializedDataReply(version)).when(mockActorContext).executeOperationAsync( - eq(actorSelection(actorRef)), eqSerializedMergeData(testNode, version)); + doReturn(Futures.successful(new MergeDataReply().toSerializable(version))).when(mockActorContext). + executeOperationAsync(eq(actorSelection(actorRef)), eqLegacyMergeData(testNode)); - doReturn(readySerializedTxReply(actorRef.path().toString())).when(mockActorContext).executeOperationAsync( - eq(actorSelection(actorRef)), isA(ReadyTransaction.SERIALIZABLE_CLASS)); + doReturn(Futures.successful(new DeleteDataReply().toSerializable(version))).when(mockActorContext). + executeOperationAsync(eq(actorSelection(actorRef)), eqLegacyDeleteData(TestModel.TEST_PATH)); + + expectReadyTransaction(actorRef); doReturn(actorRef.path().toString()).when(mockActorContext).resolvePath(eq(actorRef.path().toString()), eq(actorRef.path().toString())); @@ -993,6 +933,8 @@ public class TransactionProxyTest { transactionProxy.merge(TestModel.TEST_PATH, testNode); + transactionProxy.delete(TestModel.TEST_PATH); + DOMStoreThreePhaseCommitCohort ready = transactionProxy.ready(); assertTrue(ready instanceof ThreePhaseCommitCohortProxy); @@ -1000,7 +942,8 @@ public class TransactionProxyTest { ThreePhaseCommitCohortProxy proxy = (ThreePhaseCommitCohortProxy) ready; verifyRecordingOperationFutures(transactionProxy.getRecordedOperationFutures(), - ShardTransactionMessages.WriteDataReply.class, ShardTransactionMessages.MergeDataReply.class); + ShardTransactionMessages.WriteDataReply.class, ShardTransactionMessages.MergeDataReply.class, + ShardTransactionMessages.DeleteDataReply.class); verifyCohortFutures(proxy, getSystem().actorSelection(actorRef.path())); @@ -1029,21 +972,13 @@ public class TransactionProxyTest { NormalizedNode nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME); - doReturn(mergeSerializedDataReply()).when(mockActorContext).executeOperationAsync( - eq(actorSelection(actorRef)), eqSerializedMergeData(nodeToWrite)); + expectFailedBatchedModifications(actorRef); - doReturn(Futures.failed(new TestException())).when(mockActorContext). - executeOperationAsync(eq(actorSelection(actorRef)), eqSerializedWriteData(nodeToWrite)); - - doReturn(readySerializedTxReply(actorRef.path().toString())).when(mockActorContext).executeOperationAsync( - eq(actorSelection(actorRef)), isA(ReadyTransaction.SERIALIZABLE_CLASS)); + expectReadyTransaction(actorRef); doReturn(false).when(mockActorContext).isPathLocal(actorRef.path().toString()); - TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, - WRITE_ONLY); - - transactionProxy.merge(TestModel.TEST_PATH, nodeToWrite); + TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, WRITE_ONLY); transactionProxy.write(TestModel.TEST_PATH, nodeToWrite); @@ -1055,8 +990,7 @@ public class TransactionProxyTest { verifyCohortFutures(proxy, TestException.class); - verifyRecordingOperationFutures(transactionProxy.getRecordedOperationFutures(), - MergeDataReply.class, TestException.class); + verifyRecordingOperationFutures(transactionProxy.getRecordedOperationFutures(), TestException.class); } @Test @@ -1065,15 +999,13 @@ public class TransactionProxyTest { NormalizedNode nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME); - doReturn(mergeSerializedDataReply()).when(mockActorContext).executeOperationAsync( - eq(actorSelection(actorRef)), eqSerializedMergeData(nodeToWrite)); + expectBatchedModifications(actorRef, 1); doReturn(Futures.failed(new TestException())).when(mockActorContext). executeOperationAsync(eq(actorSelection(actorRef)), isA(ReadyTransaction.SERIALIZABLE_CLASS)); - TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, - WRITE_ONLY); + TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, WRITE_ONLY); transactionProxy.merge(TestModel.TEST_PATH, nodeToWrite); @@ -1084,7 +1016,7 @@ public class TransactionProxyTest { ThreePhaseCommitCohortProxy proxy = (ThreePhaseCommitCohortProxy) ready; verifyRecordingOperationFutures(transactionProxy.getRecordedOperationFutures(), - MergeDataReply.class); + BatchedModificationsReply.class); verifyCohortFutures(proxy, TestException.class); } @@ -1095,8 +1027,7 @@ public class TransactionProxyTest { doReturn(Futures.failed(new PrimaryNotFoundException("mock"))).when( mockActorContext).findPrimaryShardAsync(anyString()); - TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, - WRITE_ONLY); + TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, WRITE_ONLY); NormalizedNode nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME); @@ -1121,15 +1052,13 @@ public class TransactionProxyTest { NormalizedNode nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME); - doReturn(writeSerializedDataReply()).when(mockActorContext).executeOperationAsync( - eq(actorSelection(actorRef)), eqSerializedWriteData(nodeToWrite)); + expectBatchedModifications(actorRef, 1); doReturn(Futures.successful(new Object())).when(mockActorContext). executeOperationAsync(eq(actorSelection(actorRef)), isA(ReadyTransaction.SERIALIZABLE_CLASS)); - TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, - WRITE_ONLY); + TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, WRITE_ONLY); transactionProxy.write(TestModel.TEST_PATH, nodeToWrite); @@ -1160,8 +1089,7 @@ public class TransactionProxyTest { doReturn(readSerializedDataReply(null)).when(mockActorContext).executeOperationAsync( eq(actorSelection(actorRef)), eqSerializedReadData()); - TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, - READ_WRITE); + TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, READ_WRITE); transactionProxy.read(TestModel.TEST_PATH); @@ -1197,9 +1125,7 @@ public class TransactionProxyTest { String actorPath = "akka.tcp://system@127.0.0.1:2550/user/tx-actor"; CreateTransactionReply createTransactionReply = CreateTransactionReply.newBuilder() - .setTransactionId("txn-1") - .setTransactionActorPath(actorPath) - .build(); + .setTransactionId("txn-1").setTransactionActorPath(actorPath).build(); doReturn(Futures.successful(createTransactionReply)).when(mockActorContext). executeOperationAsync(eq(actorSystem.actorSelection(shardActorRef.path())), @@ -1240,7 +1166,7 @@ public class TransactionProxyTest { } @Test - public void testLocalTxActorWrite() throws Exception { + public void testLocalTxActorReady() throws Exception { ActorSystem actorSystem = getSystem(); ActorRef shardActorRef = actorSystem.actorOf(Props.create(DoNothingActor.class)); @@ -1251,48 +1177,26 @@ public class TransactionProxyTest { when(mockActorContext).findPrimaryShardAsync(eq(DefaultShardStrategy.DEFAULT_SHARD)); String actorPath = "akka.tcp://system@127.0.0.1:2550/user/tx-actor"; - CreateTransactionReply createTransactionReply = CreateTransactionReply.newBuilder() - .setTransactionId("txn-1") - .setTransactionActorPath(actorPath) - .build(); + CreateTransactionReply createTransactionReply = CreateTransactionReply.newBuilder(). + setTransactionId("txn-1").setTransactionActorPath(actorPath). + setMessageVersion(DataStoreVersions.CURRENT_VERSION).build(); doReturn(Futures.successful(createTransactionReply)).when(mockActorContext). - executeOperationAsync(eq(actorSystem.actorSelection(shardActorRef.path())), + executeOperationAsync(eq(actorSystem.actorSelection(shardActorRef.path())), eqCreateTransaction(memberName, WRITE_ONLY)); doReturn(true).when(mockActorContext).isPathLocal(actorPath); - NormalizedNode nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME); - - doReturn(writeDataReply()).when(mockActorContext).executeOperationAsync( - any(ActorSelection.class), eqWriteData(nodeToWrite)); + doReturn(batchedModificationsReply(1)).when(mockActorContext).executeOperationAsync( + any(ActorSelection.class), isA(BatchedModifications.class)); TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, WRITE_ONLY); - transactionProxy.write(TestModel.TEST_PATH, nodeToWrite); - - verify(mockActorContext).executeOperationAsync( - any(ActorSelection.class), eqWriteData(nodeToWrite)); - - //testing local merge - doReturn(mergeDataReply()).when(mockActorContext).executeOperationAsync( - any(ActorSelection.class), eqMergeData(nodeToWrite)); - - transactionProxy.merge(TestModel.TEST_PATH, nodeToWrite); - - verify(mockActorContext).executeOperationAsync( - any(ActorSelection.class), eqMergeData(nodeToWrite)); - - - //testing local delete - doReturn(deleteDataReply()).when(mockActorContext).executeOperationAsync( - any(ActorSelection.class), eqDeleteData()); - transactionProxy.delete(TestModel.TEST_PATH); - - verify(mockActorContext).executeOperationAsync(any(ActorSelection.class), eqDeleteData()); + NormalizedNode nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME); + transactionProxy.write(TestModel.TEST_PATH, nodeToWrite); verifyRecordingOperationFutures(transactionProxy.getRecordedOperationFutures(), - WriteDataReply.class, MergeDataReply.class, DeleteDataReply.class); + BatchedModificationsReply.class); // testing ready doReturn(readyTxReply(shardActorRef.path().toString())).when(mockActorContext).executeOperationAsync( @@ -1333,10 +1237,9 @@ public class TransactionProxyTest { } String actorPath = "akka.tcp://system@127.0.0.1:2550/user/tx-actor"; - CreateTransactionReply createTransactionReply = CreateTransactionReply.newBuilder() - .setTransactionId("txn-1") - .setTransactionActorPath(actorPath) - .build(); + CreateTransactionReply createTransactionReply = CreateTransactionReply.newBuilder(). + setTransactionId("txn-1").setTransactionActorPath(actorPath). + setMessageVersion(DataStoreVersions.CURRENT_VERSION).build(); doReturn(Futures.successful(createTransactionReply)).when(mockActorContext). executeOperationAsync(eq(actorSystem.actorSelection(shardActorRef.path())), @@ -1352,9 +1255,9 @@ public class TransactionProxyTest { long end = System.nanoTime(); - Assert.assertTrue(String.format("took less time than expected %s was %s", - TimeUnit.SECONDS.toNanos(mockActorContext.getDatastoreContext().getOperationTimeoutInSeconds()), - (end-start)), (end - start) > TimeUnit.SECONDS.toNanos(mockActorContext.getDatastoreContext().getOperationTimeoutInSeconds())); + long expected = TimeUnit.SECONDS.toNanos(mockActorContext.getDatastoreContext().getOperationTimeoutInSeconds()); + Assert.assertTrue(String.format("Expected elapsed time: %s. Actual: %s", + expected, (end-start)), (end - start) > expected); } @@ -1380,10 +1283,9 @@ public class TransactionProxyTest { } String actorPath = "akka.tcp://system@127.0.0.1:2550/user/tx-actor"; - CreateTransactionReply createTransactionReply = CreateTransactionReply.newBuilder() - .setTransactionId("txn-1") - .setTransactionActorPath(actorPath) - .build(); + CreateTransactionReply createTransactionReply = CreateTransactionReply.newBuilder(). + setTransactionId("txn-1").setTransactionActorPath(actorPath). + setMessageVersion(DataStoreVersions.CURRENT_VERSION).build(); doReturn(Futures.successful(createTransactionReply)).when(mockActorContext). executeOperationAsync(eq(actorSystem.actorSelection(shardActorRef.path())), @@ -1399,9 +1301,9 @@ public class TransactionProxyTest { long end = System.nanoTime(); - Assert.assertTrue(String.format("took more time than expected %s was %s", - TimeUnit.SECONDS.toNanos(mockActorContext.getDatastoreContext().getOperationTimeoutInSeconds()), - (end-start)), (end - start) <= TimeUnit.SECONDS.toNanos(mockActorContext.getDatastoreContext().getOperationTimeoutInSeconds())); + long expected = TimeUnit.SECONDS.toNanos(mockActorContext.getDatastoreContext().getOperationTimeoutInSeconds()); + Assert.assertTrue(String.format("Expected elapsed time: %s. Actual: %s", + expected, (end-start)), (end - start) <= expected); } public void testWriteThrottling(boolean shardFound){ @@ -1411,8 +1313,7 @@ public class TransactionProxyTest { public void run(TransactionProxy transactionProxy) { NormalizedNode nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME); - doReturn(incompleteFuture()).when(mockActorContext).executeOperationAsync( - any(ActorSelection.class), eqWriteData(nodeToWrite)); + expectBatchedModifications(2); transactionProxy.write(TestModel.TEST_PATH, nodeToWrite); @@ -1428,15 +1329,13 @@ public class TransactionProxyTest { public void run(TransactionProxy transactionProxy) { NormalizedNode nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME); - doReturn(incompleteFuture()).when(mockActorContext).executeOperationAsync( - any(ActorSelection.class), eqWriteData(nodeToWrite)); + expectIncompleteBatchedModifications(); transactionProxy.write(TestModel.TEST_PATH, nodeToWrite); transactionProxy.write(TestModel.TEST_PATH, nodeToWrite); } }); - } @Test @@ -1447,8 +1346,7 @@ public class TransactionProxyTest { public void run(TransactionProxy transactionProxy) { NormalizedNode nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME); - doReturn(incompleteFuture()).when(mockActorContext).executeOperationAsync( - any(ActorSelection.class), eqWriteData(nodeToWrite)); + expectBatchedModifications(2); transactionProxy.write(TestModel.TEST_PATH, nodeToWrite); @@ -1466,15 +1364,13 @@ public class TransactionProxyTest { public void run(TransactionProxy transactionProxy) { NormalizedNode nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME); - doReturn(writeSerializedDataReply()).when(mockActorContext).executeOperationAsync( - any(ActorSelection.class), eqSerializedWriteData(nodeToWrite)); + expectBatchedModifications(2); transactionProxy.write(TestModel.TEST_PATH, nodeToWrite); transactionProxy.write(TestModel.TEST_PATH, nodeToWrite); } }); - } @Test @@ -1485,8 +1381,7 @@ public class TransactionProxyTest { public void run(TransactionProxy transactionProxy) { NormalizedNode nodeToMerge = ImmutableNodes.containerNode(TestModel.TEST_QNAME); - doReturn(incompleteFuture()).when(mockActorContext).executeOperationAsync( - any(ActorSelection.class), eqMergeData(nodeToMerge)); + expectIncompleteBatchedModifications(); transactionProxy.merge(TestModel.TEST_PATH, nodeToMerge); @@ -1503,8 +1398,7 @@ public class TransactionProxyTest { public void run(TransactionProxy transactionProxy) { NormalizedNode nodeToMerge = ImmutableNodes.containerNode(TestModel.TEST_QNAME); - doReturn(incompleteFuture()).when(mockActorContext).executeOperationAsync( - any(ActorSelection.class), eqMergeData(nodeToMerge)); + expectBatchedModifications(2); transactionProxy.merge(TestModel.TEST_PATH, nodeToMerge); @@ -1520,8 +1414,7 @@ public class TransactionProxyTest { public void run(TransactionProxy transactionProxy) { NormalizedNode nodeToMerge = ImmutableNodes.containerNode(TestModel.TEST_QNAME); - doReturn(mergeDataReply()).when(mockActorContext).executeOperationAsync( - any(ActorSelection.class), eqMergeData(nodeToMerge)); + expectBatchedModifications(2); transactionProxy.merge(TestModel.TEST_PATH, nodeToMerge); @@ -1537,8 +1430,7 @@ public class TransactionProxyTest { throttleOperation(new TransactionProxyOperation() { @Override public void run(TransactionProxy transactionProxy) { - doReturn(incompleteFuture()).when(mockActorContext).executeOperationAsync( - any(ActorSelection.class), eqDeleteData()); + expectIncompleteBatchedModifications(); transactionProxy.delete(TestModel.TEST_PATH); @@ -1554,8 +1446,7 @@ public class TransactionProxyTest { completeOperation(new TransactionProxyOperation() { @Override public void run(TransactionProxy transactionProxy) { - doReturn(incompleteFuture()).when(mockActorContext).executeOperationAsync( - any(ActorSelection.class), eqDeleteData()); + expectBatchedModifications(2); transactionProxy.delete(TestModel.TEST_PATH); @@ -1569,8 +1460,7 @@ public class TransactionProxyTest { completeOperation(new TransactionProxyOperation() { @Override public void run(TransactionProxy transactionProxy) { - doReturn(deleteDataReply()).when(mockActorContext).executeOperationAsync( - any(ActorSelection.class), eqDeleteData()); + expectBatchedModifications(2); transactionProxy.delete(TestModel.TEST_PATH); @@ -1688,8 +1578,7 @@ public class TransactionProxyTest { public void run(TransactionProxy transactionProxy) { NormalizedNode nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME); - doReturn(incompleteFuture()).when(mockActorContext).executeOperationAsync( - any(ActorSelection.class), eqWriteData(nodeToWrite)); + expectBatchedModifications(1); doReturn(incompleteFuture()).when(mockActorContext).executeOperationAsync( any(ActorSelection.class), any(ReadyTransaction.class)); @@ -1710,11 +1599,7 @@ public class TransactionProxyTest { NormalizedNode nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME); NormalizedNode carsNode = ImmutableNodes.containerNode(CarsModel.BASE_QNAME); - doReturn(writeDataReply()).when(mockActorContext).executeOperationAsync( - any(ActorSelection.class), eqWriteData(nodeToWrite)); - - doReturn(incompleteFuture()).when(mockActorContext).executeOperationAsync( - any(ActorSelection.class), eqWriteData(carsNode)); + expectBatchedModifications(2); doReturn(incompleteFuture()).when(mockActorContext).executeOperationAsync( any(ActorSelection.class), any(ReadyTransaction.class)); @@ -1727,4 +1612,203 @@ public class TransactionProxyTest { } }, 2, true); } + + @Test + public void testModificationOperationBatching() throws Throwable { + int shardBatchedModificationCount = 3; + doReturn(dataStoreContextBuilder.shardBatchedModificationCount(shardBatchedModificationCount).build()). + when(mockActorContext).getDatastoreContext(); + + ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_WRITE); + + expectBatchedModifications(actorRef, shardBatchedModificationCount); + + expectReadyTransaction(actorRef); + + YangInstanceIdentifier writePath1 = TestModel.TEST_PATH; + NormalizedNode writeNode1 = ImmutableNodes.containerNode(TestModel.TEST_QNAME); + + YangInstanceIdentifier writePath2 = TestModel.OUTER_LIST_PATH; + NormalizedNode writeNode2 = ImmutableNodes.containerNode(TestModel.OUTER_LIST_QNAME); + + YangInstanceIdentifier writePath3 = TestModel.INNER_LIST_PATH; + NormalizedNode writeNode3 = ImmutableNodes.containerNode(TestModel.INNER_LIST_QNAME); + + YangInstanceIdentifier mergePath1 = TestModel.TEST_PATH; + NormalizedNode mergeNode1 = ImmutableNodes.containerNode(TestModel.TEST_QNAME); + + YangInstanceIdentifier mergePath2 = TestModel.OUTER_LIST_PATH; + NormalizedNode mergeNode2 = ImmutableNodes.containerNode(TestModel.OUTER_LIST_QNAME); + + YangInstanceIdentifier mergePath3 = TestModel.INNER_LIST_PATH; + NormalizedNode mergeNode3 = ImmutableNodes.containerNode(TestModel.INNER_LIST_QNAME); + + YangInstanceIdentifier deletePath1 = TestModel.TEST_PATH; + YangInstanceIdentifier deletePath2 = TestModel.OUTER_LIST_PATH; + + TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, READ_WRITE); + + transactionProxy.write(writePath1, writeNode1); + transactionProxy.write(writePath2, writeNode2); + transactionProxy.delete(deletePath1); + transactionProxy.merge(mergePath1, mergeNode1); + transactionProxy.merge(mergePath2, mergeNode2); + transactionProxy.write(writePath3, writeNode3); + transactionProxy.merge(mergePath3, mergeNode3); + transactionProxy.delete(deletePath2); + + // This sends the last batch. + transactionProxy.ready(); + + List batchedModifications = captureBatchedModifications(actorRef); + assertEquals("Captured BatchedModifications count", 3, batchedModifications.size()); + + verifyBatchedModifications(batchedModifications.get(0), new WriteModification(writePath1, writeNode1), + new WriteModification(writePath2, writeNode2), new DeleteModification(deletePath1)); + + verifyBatchedModifications(batchedModifications.get(1), new MergeModification(mergePath1, mergeNode1), + new MergeModification(mergePath2, mergeNode2), new WriteModification(writePath3, writeNode3)); + + verifyBatchedModifications(batchedModifications.get(2), new MergeModification(mergePath3, mergeNode3), + new DeleteModification(deletePath2)); + + verifyRecordingOperationFutures(transactionProxy.getRecordedOperationFutures(), + BatchedModificationsReply.class, BatchedModificationsReply.class, BatchedModificationsReply.class); + } + + @Test + public void testModificationOperationBatchingWithInterleavedReads() throws Throwable { + int shardBatchedModificationCount = 10; + doReturn(dataStoreContextBuilder.shardBatchedModificationCount(shardBatchedModificationCount).build()). + when(mockActorContext).getDatastoreContext(); + + ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_WRITE); + + expectBatchedModifications(actorRef, shardBatchedModificationCount); + + YangInstanceIdentifier writePath1 = TestModel.TEST_PATH; + NormalizedNode writeNode1 = ImmutableNodes.containerNode(TestModel.TEST_QNAME); + + YangInstanceIdentifier writePath2 = TestModel.OUTER_LIST_PATH; + NormalizedNode writeNode2 = ImmutableNodes.containerNode(TestModel.OUTER_LIST_QNAME); + + YangInstanceIdentifier mergePath1 = TestModel.TEST_PATH; + NormalizedNode mergeNode1 = ImmutableNodes.containerNode(TestModel.TEST_QNAME); + + YangInstanceIdentifier mergePath2 = TestModel.INNER_LIST_PATH; + NormalizedNode mergeNode2 = ImmutableNodes.containerNode(TestModel.INNER_LIST_QNAME); + + YangInstanceIdentifier deletePath = TestModel.OUTER_LIST_PATH; + + doReturn(readSerializedDataReply(writeNode2)).when(mockActorContext).executeOperationAsync( + eq(actorSelection(actorRef)), eqSerializedReadData(writePath2)); + + doReturn(readSerializedDataReply(mergeNode2)).when(mockActorContext).executeOperationAsync( + eq(actorSelection(actorRef)), eqSerializedReadData(mergePath2)); + + doReturn(dataExistsSerializedReply(true)).when(mockActorContext).executeOperationAsync( + eq(actorSelection(actorRef)), eqSerializedDataExists()); + + TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, READ_WRITE); + + transactionProxy.write(writePath1, writeNode1); + transactionProxy.write(writePath2, writeNode2); + + Optional> readOptional = transactionProxy.read(writePath2). + get(5, TimeUnit.SECONDS); + + assertEquals("NormalizedNode isPresent", true, readOptional.isPresent()); + assertEquals("Response NormalizedNode", writeNode2, readOptional.get()); + + transactionProxy.merge(mergePath1, mergeNode1); + transactionProxy.merge(mergePath2, mergeNode2); + + readOptional = transactionProxy.read(mergePath2).get(5, TimeUnit.SECONDS); + + transactionProxy.delete(deletePath); + + Boolean exists = transactionProxy.exists(TestModel.TEST_PATH).checkedGet(); + assertEquals("Exists response", true, exists); + + assertEquals("NormalizedNode isPresent", true, readOptional.isPresent()); + assertEquals("Response NormalizedNode", mergeNode2, readOptional.get()); + + List batchedModifications = captureBatchedModifications(actorRef); + assertEquals("Captured BatchedModifications count", 3, batchedModifications.size()); + + verifyBatchedModifications(batchedModifications.get(0), new WriteModification(writePath1, writeNode1), + new WriteModification(writePath2, writeNode2)); + + verifyBatchedModifications(batchedModifications.get(1), new MergeModification(mergePath1, mergeNode1), + new MergeModification(mergePath2, mergeNode2)); + + verifyBatchedModifications(batchedModifications.get(2), new DeleteModification(deletePath)); + + InOrder inOrder = Mockito.inOrder(mockActorContext); + inOrder.verify(mockActorContext).executeOperationAsync( + eq(actorSelection(actorRef)), isA(BatchedModifications.class)); + + inOrder.verify(mockActorContext).executeOperationAsync( + eq(actorSelection(actorRef)), eqSerializedReadData(writePath2)); + + inOrder.verify(mockActorContext).executeOperationAsync( + eq(actorSelection(actorRef)), isA(BatchedModifications.class)); + + inOrder.verify(mockActorContext).executeOperationAsync( + eq(actorSelection(actorRef)), eqSerializedReadData(mergePath2)); + + inOrder.verify(mockActorContext).executeOperationAsync( + eq(actorSelection(actorRef)), isA(BatchedModifications.class)); + + inOrder.verify(mockActorContext).executeOperationAsync( + eq(actorSelection(actorRef)), eqSerializedDataExists()); + + verifyRecordingOperationFutures(transactionProxy.getRecordedOperationFutures(), + BatchedModificationsReply.class, BatchedModificationsReply.class, BatchedModificationsReply.class); + } + + private List captureBatchedModifications(ActorRef actorRef) { + ArgumentCaptor batchedModificationsCaptor = + ArgumentCaptor.forClass(BatchedModifications.class); + verify(mockActorContext, Mockito.atLeastOnce()).executeOperationAsync( + eq(actorSelection(actorRef)), batchedModificationsCaptor.capture()); + + List batchedModifications = filterCaptured( + batchedModificationsCaptor, BatchedModifications.class); + return batchedModifications; + } + + private List filterCaptured(ArgumentCaptor captor, Class type) { + List captured = new ArrayList<>(); + for(T c: captor.getAllValues()) { + if(type.isInstance(c)) { + captured.add(c); + } + } + + return captured; + } + + private void verifyOneBatchedModification(ActorRef actorRef, Modification expected) { + List batchedModifications = captureBatchedModifications(actorRef); + assertEquals("Captured BatchedModifications count", 1, batchedModifications.size()); + + verifyBatchedModifications(batchedModifications.get(0), expected); + } + + private void verifyBatchedModifications(Object message, Modification... expected) { + assertEquals("Message type", BatchedModifications.class, message.getClass()); + BatchedModifications batchedModifications = (BatchedModifications)message; + assertEquals("BatchedModifications size", expected.length, batchedModifications.getModifications().size()); + for(int i = 0; i < batchedModifications.getModifications().size(); i++) { + Modification actual = batchedModifications.getModifications().get(i); + assertEquals("Modification type", expected[i].getClass(), actual.getClass()); + assertEquals("getPath", ((AbstractModification)expected[i]).getPath(), + ((AbstractModification)actual).getPath()); + if(actual instanceof WriteModification) { + assertEquals("getData", ((WriteModification)expected[i]).getData(), + ((WriteModification)actual).getData()); + } + } + } } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/messages/BatchedModificationsTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/messages/BatchedModificationsTest.java new file mode 100644 index 0000000000..15d2eea598 --- /dev/null +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/messages/BatchedModificationsTest.java @@ -0,0 +1,77 @@ +/* + * Copyright (c) 2015 Brocade Communications Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.controller.cluster.datastore.messages; + +import static org.junit.Assert.assertEquals; +import java.io.Serializable; +import org.apache.commons.lang.SerializationUtils; +import org.junit.Test; +import org.opendaylight.controller.cluster.datastore.DataStoreVersions; +import org.opendaylight.controller.cluster.datastore.modification.DeleteModification; +import org.opendaylight.controller.cluster.datastore.modification.MergeModification; +import org.opendaylight.controller.cluster.datastore.modification.WriteModification; +import org.opendaylight.controller.md.cluster.datastore.model.TestModel; +import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; +import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode; +import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes; +import org.opendaylight.yangtools.yang.data.impl.schema.builder.impl.ImmutableContainerNodeBuilder; + +/** + * Unit tests for BatchedModifications. + * + * @author Thomas Pantelis + */ +public class BatchedModificationsTest { + + @Test + public void testSerialization() { + YangInstanceIdentifier writePath = TestModel.TEST_PATH; + NormalizedNode writeData = ImmutableContainerNodeBuilder.create().withNodeIdentifier( + new YangInstanceIdentifier.NodeIdentifier(TestModel.TEST_QNAME)). + withChild(ImmutableNodes.leafNode(TestModel.DESC_QNAME, "foo")).build(); + + YangInstanceIdentifier mergePath = TestModel.OUTER_LIST_PATH; + NormalizedNode mergeData = ImmutableContainerNodeBuilder.create().withNodeIdentifier( + new YangInstanceIdentifier.NodeIdentifier(TestModel.OUTER_LIST_QNAME)).build(); + + YangInstanceIdentifier deletePath = TestModel.TEST_PATH; + + BatchedModifications batched = new BatchedModifications(DataStoreVersions.CURRENT_VERSION); + batched.addModification(new WriteModification(writePath, writeData)); + batched.addModification(new MergeModification(mergePath, mergeData)); + batched.addModification(new DeleteModification(deletePath)); + + BatchedModifications clone = (BatchedModifications) SerializationUtils.clone( + (Serializable) batched.toSerializable()); + + assertEquals("getVersion", DataStoreVersions.CURRENT_VERSION, clone.getVersion()); + + assertEquals("getModifications size", 3, clone.getModifications().size()); + + WriteModification write = (WriteModification)clone.getModifications().get(0); + assertEquals("getVersion", DataStoreVersions.CURRENT_VERSION, write.getVersion()); + assertEquals("getPath", writePath, write.getPath()); + assertEquals("getData", writeData, write.getData()); + + MergeModification merge = (MergeModification)clone.getModifications().get(1); + assertEquals("getVersion", DataStoreVersions.CURRENT_VERSION, merge.getVersion()); + assertEquals("getPath", mergePath, merge.getPath()); + assertEquals("getData", mergeData, merge.getData()); + + DeleteModification delete = (DeleteModification)clone.getModifications().get(2); + assertEquals("getVersion", DataStoreVersions.CURRENT_VERSION, delete.getVersion()); + assertEquals("getPath", deletePath, delete.getPath()); + } + + @Test + public void testBatchedModificationsReplySerialization() { + BatchedModificationsReply clone = (BatchedModificationsReply) SerializationUtils.clone( + (Serializable) new BatchedModificationsReply(100).toSerializable()); + assertEquals("getNumBatched", 100, clone.getNumBatched()); + } +} diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/messages/DeleteDataTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/messages/DeleteDataTest.java index e950b78ab7..97bade152e 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/messages/DeleteDataTest.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/messages/DeleteDataTest.java @@ -22,21 +22,22 @@ import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; * * @author Thomas Pantelis */ +@Deprecated public class DeleteDataTest { @Test public void testSerialization() { YangInstanceIdentifier path = TestModel.TEST_PATH; - DeleteData expected = new DeleteData(path); + DeleteData expected = new DeleteData(path, DataStoreVersions.CURRENT_VERSION); - Object serialized = expected.toSerializable(DataStoreVersions.CURRENT_VERSION); + Object serialized = expected.toSerializable(); assertEquals("Serialized type", DeleteData.class, serialized.getClass()); assertEquals("Version", DataStoreVersions.CURRENT_VERSION, ((DeleteData)serialized).getVersion()); Object clone = SerializationUtils.clone((Serializable) serialized); - assertEquals("Version", DataStoreVersions.CURRENT_VERSION, ((DeleteData)clone).getVersion()); DeleteData actual = DeleteData.fromSerializable(clone); + assertEquals("getVersion", DataStoreVersions.CURRENT_VERSION, actual.getVersion()); assertEquals("getPath", expected.getPath(), actual.getPath()); } @@ -58,9 +59,9 @@ public class DeleteDataTest { public void testSerializationWithHeliumR1Version() throws Exception { YangInstanceIdentifier path = TestModel.TEST_PATH; - DeleteData expected = new DeleteData(path); + DeleteData expected = new DeleteData(path, DataStoreVersions.HELIUM_1_VERSION); - Object serialized = expected.toSerializable(DataStoreVersions.HELIUM_1_VERSION); + Object serialized = expected.toSerializable(); assertEquals("Serialized type", ShardTransactionMessages.DeleteData.class, serialized.getClass()); DeleteData actual = DeleteData.fromSerializable(SerializationUtils.clone((Serializable) serialized)); diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/messages/MergeDataTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/messages/MergeDataTest.java index 5b40afdff8..011d22798e 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/messages/MergeDataTest.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/messages/MergeDataTest.java @@ -14,6 +14,7 @@ import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode; import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes; import org.opendaylight.yangtools.yang.data.impl.schema.builder.impl.ImmutableContainerNodeBuilder; +@Deprecated public class MergeDataTest { @Test @@ -23,15 +24,15 @@ public class MergeDataTest { new YangInstanceIdentifier.NodeIdentifier(TestModel.TEST_QNAME)). withChild(ImmutableNodes.leafNode(TestModel.DESC_QNAME, "foo")).build(); - MergeData expected = new MergeData(path, data); + MergeData expected = new MergeData(path, data, DataStoreVersions.CURRENT_VERSION); - Object serialized = expected.toSerializable(DataStoreVersions.CURRENT_VERSION); + Object serialized = expected.toSerializable(); assertEquals("Serialized type", MergeData.class, serialized.getClass()); assertEquals("Version", DataStoreVersions.CURRENT_VERSION, ((MergeData)serialized).getVersion()); Object clone = SerializationUtils.clone((Serializable) serialized); - assertEquals("Version", DataStoreVersions.CURRENT_VERSION, ((MergeData)clone).getVersion()); MergeData actual = MergeData.fromSerializable(clone); + assertEquals("Version", DataStoreVersions.CURRENT_VERSION, actual.getVersion()); assertEquals("getPath", expected.getPath(), actual.getPath()); assertEquals("getData", expected.getData(), actual.getData()); } @@ -58,9 +59,9 @@ public class MergeDataTest { new YangInstanceIdentifier.NodeIdentifier(TestModel.TEST_QNAME)). withChild(ImmutableNodes.leafNode(TestModel.DESC_QNAME, "foo")).build(); - MergeData expected = new MergeData(path, data); + MergeData expected = new MergeData(path, data, DataStoreVersions.HELIUM_1_VERSION); - Object serialized = expected.toSerializable(DataStoreVersions.HELIUM_1_VERSION); + Object serialized = expected.toSerializable(); assertEquals("Serialized type", ShardTransactionMessages.MergeData.class, serialized.getClass()); MergeData actual = MergeData.fromSerializable(SerializationUtils.clone((Serializable) serialized)); diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/messages/ReadDataReplyTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/messages/ReadDataReplyTest.java index 8ce73296c1..7ad45a6170 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/messages/ReadDataReplyTest.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/messages/ReadDataReplyTest.java @@ -32,13 +32,14 @@ public class ReadDataReplyTest { new YangInstanceIdentifier.NodeIdentifier(TestModel.TEST_QNAME)). withChild(ImmutableNodes.leafNode(TestModel.DESC_QNAME, "foo")).build(); - ReadDataReply expected = new ReadDataReply(data); + ReadDataReply expected = new ReadDataReply(data, DataStoreVersions.CURRENT_VERSION); - Object serialized = expected.toSerializable(DataStoreVersions.CURRENT_VERSION); + Object serialized = expected.toSerializable(); assertEquals("Serialized type", ReadDataReply.class, serialized.getClass()); ReadDataReply actual = ReadDataReply.fromSerializable(SerializationUtils.clone( (Serializable) serialized)); + assertEquals("getVersion", DataStoreVersions.CURRENT_VERSION, actual.getVersion()); assertEquals("getNormalizedNode", expected.getNormalizedNode(), actual.getNormalizedNode()); } @@ -60,9 +61,9 @@ public class ReadDataReplyTest { new YangInstanceIdentifier.NodeIdentifier(TestModel.TEST_QNAME)). withChild(ImmutableNodes.leafNode(TestModel.DESC_QNAME, "foo")).build(); - ReadDataReply expected = new ReadDataReply(data); + ReadDataReply expected = new ReadDataReply(data, DataStoreVersions.HELIUM_1_VERSION); - Object serialized = expected.toSerializable(DataStoreVersions.HELIUM_1_VERSION); + Object serialized = expected.toSerializable(); assertEquals("Serialized type", ShardTransactionMessages.ReadDataReply.class, serialized.getClass()); ReadDataReply actual = ReadDataReply.fromSerializable(SerializationUtils.clone( diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/messages/WriteDataTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/messages/WriteDataTest.java index 90a76f229e..8148c9ca95 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/messages/WriteDataTest.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/messages/WriteDataTest.java @@ -26,6 +26,7 @@ import org.opendaylight.yangtools.yang.data.impl.schema.builder.impl.ImmutableCo * * @author Thomas Pantelis */ +@Deprecated public class WriteDataTest { @Test @@ -35,15 +36,15 @@ public class WriteDataTest { new YangInstanceIdentifier.NodeIdentifier(TestModel.TEST_QNAME)). withChild(ImmutableNodes.leafNode(TestModel.DESC_QNAME, "foo")).build(); - WriteData expected = new WriteData(path, data); + WriteData expected = new WriteData(path, data, DataStoreVersions.CURRENT_VERSION); - Object serialized = expected.toSerializable(DataStoreVersions.CURRENT_VERSION); + Object serialized = expected.toSerializable(); assertEquals("Serialized type", WriteData.class, serialized.getClass()); assertEquals("Version", DataStoreVersions.CURRENT_VERSION, ((WriteData)serialized).getVersion()); Object clone = SerializationUtils.clone((Serializable) serialized); - assertEquals("Version", DataStoreVersions.CURRENT_VERSION, ((WriteData)clone).getVersion()); WriteData actual = WriteData.fromSerializable(clone); + assertEquals("Version", DataStoreVersions.CURRENT_VERSION, actual.getVersion()); assertEquals("getPath", expected.getPath(), actual.getPath()); assertEquals("getData", expected.getData(), actual.getData()); } @@ -69,9 +70,9 @@ public class WriteDataTest { new YangInstanceIdentifier.NodeIdentifier(TestModel.TEST_QNAME)). withChild(ImmutableNodes.leafNode(TestModel.DESC_QNAME, "foo")).build(); - WriteData expected = new WriteData(path, data); + WriteData expected = new WriteData(path, data, DataStoreVersions.HELIUM_1_VERSION); - Object serialized = expected.toSerializable(DataStoreVersions.HELIUM_1_VERSION); + Object serialized = expected.toSerializable(); assertEquals("Serialized type", ShardTransactionMessages.WriteData.class, serialized.getClass()); WriteData actual = WriteData.fromSerializable(SerializationUtils.clone((Serializable) serialized)); diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/modification/MutableCompositeModificationTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/modification/MutableCompositeModificationTest.java index b9d44b2586..e2acaa8d10 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/modification/MutableCompositeModificationTest.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/modification/MutableCompositeModificationTest.java @@ -7,6 +7,7 @@ import com.google.common.base.Stopwatch; import org.apache.commons.lang.SerializationUtils; import org.junit.Ignore; import org.junit.Test; +import org.opendaylight.controller.cluster.datastore.DataStoreVersions; import org.opendaylight.controller.md.cluster.datastore.model.TestModel; import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadWriteTransaction; import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; @@ -53,17 +54,22 @@ public class MutableCompositeModificationTest extends AbstractModificationTest { MutableCompositeModification clone = (MutableCompositeModification) SerializationUtils.clone(compositeModification); + assertEquals("getVersion", DataStoreVersions.CURRENT_VERSION, clone.getVersion()); + assertEquals("getModifications size", 3, clone.getModifications().size()); WriteModification write = (WriteModification)clone.getModifications().get(0); + assertEquals("getVersion", DataStoreVersions.CURRENT_VERSION, write.getVersion()); assertEquals("getPath", writePath, write.getPath()); assertEquals("getData", writeData, write.getData()); MergeModification merge = (MergeModification)clone.getModifications().get(1); + assertEquals("getVersion", DataStoreVersions.CURRENT_VERSION, merge.getVersion()); assertEquals("getPath", mergePath, merge.getPath()); assertEquals("getData", mergeData, merge.getData()); DeleteModification delete = (DeleteModification)clone.getModifications().get(2); + assertEquals("getVersion", DataStoreVersions.CURRENT_VERSION, delete.getVersion()); assertEquals("getPath", deletePath, delete.getPath()); } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/md/cluster/datastore/model/TestModel.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/md/cluster/datastore/model/TestModel.java index 67fa0960cb..9761ed8615 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/md/cluster/datastore/model/TestModel.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/md/cluster/datastore/model/TestModel.java @@ -31,7 +31,10 @@ public class TestModel { private static final String DATASTORE_TEST_YANG = "/odl-datastore-test.yang"; public static final YangInstanceIdentifier TEST_PATH = YangInstanceIdentifier.of(TEST_QNAME); - public static final YangInstanceIdentifier OUTER_LIST_PATH = YangInstanceIdentifier.builder(TEST_PATH).node(OUTER_LIST_QNAME).build(); + public static final YangInstanceIdentifier OUTER_LIST_PATH = YangInstanceIdentifier.builder(TEST_PATH). + node(OUTER_LIST_QNAME).build(); + public static final YangInstanceIdentifier INNER_LIST_PATH = YangInstanceIdentifier.builder(TEST_PATH). + node(OUTER_LIST_QNAME).node(INNER_LIST_QNAME).build(); public static final QName TWO_QNAME = QName.create(TEST_QNAME,"two"); public static final QName THREE_QNAME = QName.create(TEST_QNAME,"three");