From: Tom Pantelis Date: Sat, 7 Mar 2015 04:40:15 +0000 (-0500) Subject: Optimize TransactionProxy for write-only transactions X-Git-Tag: release/lithium~357 X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=commitdiff_plain;h=00cc355c0c58e999ffebd531bca3a507e150e441 Optimize TransactionProxy for write-only transactions For write-only transactions, modified TransactionProxy to prepare the Tx modifications directly on the Shard rather than having the Shard create a separate ShardTransaction actor. The BatchedModifications messages are sent directly to the shard. On ready, the last BatchedModifications is flagged as ready, thus also eliminating the ReadyTransaction message. In this manner, we eliminate the CreateTransaction and the ReadyTransaction messages and the the ShardTransaction actor. Several new fields were added to the BatchedModifications message: transactionID, transactionChainID, and ready. On the last, readied BatchedModifications message, the Shard returns the BatchedModificationsReply message with the cohort actor (the Shard itself). The BatchedModifications messages are handled by the ShardCommitCoordinator. I started creating a separate ShardTransactionCoordinator but realized I'd have to duplicate some storage plus the 2 coordinators should have to interface. The CohortEntry is used to store the prepared DOMStoreWriteTransaction. Perhaps ShardCommitCoordinator should be renamed to ShardTransactionCoordinator but that can be refactored later. I create a DOMTransactionFactory that is used by both the ShardCommitCoordinator and the Shard to create DOM transactions and update the mbean stats. Change-Id: I0d8126149f80854feb504ed9d8e49bcc7db253ce Signed-off-by: Tom Pantelis --- diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DOMTransactionFactory.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DOMTransactionFactory.java new file mode 100644 index 0000000000..f2436201d8 --- /dev/null +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DOMTransactionFactory.java @@ -0,0 +1,96 @@ +/* + * Copyright (c) 2015 Brocade Communications Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.controller.cluster.datastore; + +import java.util.HashMap; +import java.util.Map; +import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shard.ShardStats; +import org.opendaylight.controller.md.sal.dom.store.impl.InMemoryDOMDataStore; +import org.opendaylight.controller.sal.core.spi.data.DOMStoreTransaction; +import org.opendaylight.controller.sal.core.spi.data.DOMStoreTransactionChain; +import org.opendaylight.controller.sal.core.spi.data.DOMStoreTransactionFactory; +import org.slf4j.Logger; + +/** + * A factory for creating DOM transactions, either normal or chained. + * + * @author Thomas Pantelis + */ +public class DOMTransactionFactory { + + private final Map transactionChains = new HashMap<>(); + private final InMemoryDOMDataStore store; + private final ShardStats shardMBean; + private final Logger log; + private final String name; + + public DOMTransactionFactory(InMemoryDOMDataStore store, ShardStats shardMBean, Logger log, String name) { + this.store = store; + this.shardMBean = shardMBean; + this.log = log; + this.name = name; + } + + @SuppressWarnings("unchecked") + public T newTransaction(TransactionProxy.TransactionType type, + String transactionID, String transactionChainID) { + + DOMStoreTransactionFactory factory = store; + + if(!transactionChainID.isEmpty()) { + factory = transactionChains.get(transactionChainID); + if(factory == null) { + if(log.isDebugEnabled()) { + log.debug("{}: Creating transaction with ID {} from chain {}", name, transactionID, + transactionChainID); + } + + DOMStoreTransactionChain transactionChain = store.createTransactionChain(); + transactionChains.put(transactionChainID, transactionChain); + factory = transactionChain; + } + } else { + log.debug("{}: Creating transaction with ID {}", name, transactionID); + } + + T transaction = null; + switch(type) { + case READ_ONLY: + transaction = (T) factory.newReadOnlyTransaction(); + shardMBean.incrementReadOnlyTransactionCount(); + break; + case READ_WRITE: + transaction = (T) factory.newReadWriteTransaction(); + shardMBean.incrementReadWriteTransactionCount(); + break; + case WRITE_ONLY: + transaction = (T) factory.newWriteOnlyTransaction(); + shardMBean.incrementWriteOnlyTransactionCount(); + break; + } + + return transaction; + } + + public void closeTransactionChain(String transactionChainID) { + DOMStoreTransactionChain chain = + transactionChains.remove(transactionChainID); + + if(chain != null) { + chain.close(); + } + } + + public void closeAllTransactionChains() { + for(Map.Entry entry : transactionChains.entrySet()){ + entry.getValue().close(); + } + + transactionChains.clear(); + } +} diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DatastoreContext.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DatastoreContext.java index 7f8a4e779d..d5142c94a6 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DatastoreContext.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DatastoreContext.java @@ -58,6 +58,7 @@ public class DatastoreContext { private final DefaultConfigParamsImpl raftConfig = new DefaultConfigParamsImpl(); private String dataStoreType = UNKNOWN_DATA_STORE_TYPE; private int shardBatchedModificationCount = DEFAULT_SHARD_BATCHED_MODIFICATION_COUNT; + private boolean writeOnlyTransactionOptimizationsEnabled = false; private DatastoreContext() { setShardJournalRecoveryLogBatchSize(DEFAULT_JOURNAL_RECOVERY_BATCH_SIZE); @@ -82,6 +83,7 @@ public class DatastoreContext { this.transactionCreationInitialRateLimit = other.transactionCreationInitialRateLimit; this.dataStoreType = other.dataStoreType; this.shardBatchedModificationCount = other.shardBatchedModificationCount; + this.writeOnlyTransactionOptimizationsEnabled = other.writeOnlyTransactionOptimizationsEnabled; setShardJournalRecoveryLogBatchSize(other.raftConfig.getJournalRecoveryLogBatchSize()); setSnapshotBatchCount(other.raftConfig.getSnapshotBatchCount()); @@ -186,6 +188,10 @@ public class DatastoreContext { return shardBatchedModificationCount; } + public boolean isWriteOnlyTransactionOptimizationsEnabled() { + return writeOnlyTransactionOptimizationsEnabled; + } + public static class Builder { private final DatastoreContext datastoreContext; private int maxShardDataChangeExecutorPoolSize = @@ -326,6 +332,11 @@ public class DatastoreContext { return this; } + public Builder writeOnlyTransactionOptimizationsEnabled(boolean value) { + datastoreContext.writeOnlyTransactionOptimizationsEnabled = value; + return this; + } + public Builder maxShardDataChangeExecutorPoolSize(int maxShardDataChangeExecutorPoolSize) { this.maxShardDataChangeExecutorPoolSize = maxShardDataChangeExecutorPoolSize; return this; diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java index 9ec4f9cfdf..99bc9de6a2 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java @@ -43,6 +43,8 @@ import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shard.ShardStats import org.opendaylight.controller.cluster.datastore.messages.AbortTransaction; import org.opendaylight.controller.cluster.datastore.messages.AbortTransactionReply; import org.opendaylight.controller.cluster.datastore.messages.ActorInitialized; +import org.opendaylight.controller.cluster.datastore.messages.BatchedModifications; +import org.opendaylight.controller.cluster.datastore.messages.BatchedModificationsReply; import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransaction; import org.opendaylight.controller.cluster.datastore.messages.CloseTransactionChain; import org.opendaylight.controller.cluster.datastore.messages.CommitTransaction; @@ -76,8 +78,6 @@ import org.opendaylight.controller.md.sal.dom.store.impl.InMemoryDOMDataStore; import org.opendaylight.controller.md.sal.dom.store.impl.InMemoryDOMDataStoreFactory; import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort; import org.opendaylight.controller.sal.core.spi.data.DOMStoreTransaction; -import org.opendaylight.controller.sal.core.spi.data.DOMStoreTransactionChain; -import org.opendaylight.controller.sal.core.spi.data.DOMStoreTransactionFactory; import org.opendaylight.controller.sal.core.spi.data.DOMStoreWriteTransaction; import org.opendaylight.yangtools.concepts.ListenerRegistration; import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; @@ -105,7 +105,7 @@ public class Shard extends RaftActor { private final InMemoryDOMDataStore store; /// The name of this shard - private final ShardIdentifier name; + private final String name; private final ShardStats shardMBean; @@ -142,7 +142,7 @@ public class Shard extends RaftActor { private ShardRecoveryCoordinator recoveryCoordinator; private List currentLogRecoveryBatch; - private final Map transactionChains = new HashMap<>(); + private final DOMTransactionFactory transactionFactory; private final String txnDispatcherPath; @@ -151,7 +151,7 @@ public class Shard extends RaftActor { super(name.toString(), mapPeerAddresses(peerAddresses), Optional.of(datastoreContext.getShardRaftConfig())); - this.name = name; + this.name = name.toString(); this.datastoreContext = datastoreContext; this.schemaContext = schemaContext; this.dataPersistenceProvider = (datastoreContext.isPersistent()) @@ -178,8 +178,11 @@ public class Shard extends RaftActor { getContext().become(new MeteringBehavior(this)); } - commitCoordinator = new ShardCommitCoordinator(TimeUnit.SECONDS.convert(1, TimeUnit.MINUTES), - datastoreContext.getShardTransactionCommitQueueCapacity(), LOG, name.toString()); + transactionFactory = new DOMTransactionFactory(store, shardMBean, LOG, this.name); + + commitCoordinator = new ShardCommitCoordinator(transactionFactory, + TimeUnit.SECONDS.convert(5, TimeUnit.MINUTES), + datastoreContext.getShardTransactionCommitQueueCapacity(), self(), LOG, this.name); setTransactionCommitTimeout(); @@ -272,6 +275,8 @@ public class Shard extends RaftActor { try { if (CreateTransaction.SERIALIZABLE_CLASS.isInstance(message)) { handleCreateTransaction(message); + } else if (BatchedModifications.class.isInstance(message)) { + handleBatchedModifications((BatchedModifications)message); } else if (message instanceof ForwardedReadyTransaction) { handleForwardedReadyTransaction((ForwardedReadyTransaction) message); } else if (CanCommitTransaction.SERIALIZABLE_CLASS.isInstance(message)) { @@ -451,6 +456,47 @@ public class Shard extends RaftActor { commitCoordinator.handleCanCommit(canCommit, getSender(), self()); } + private void handleBatchedModifications(BatchedModifications batched) { + // This message is sent to prepare the modificationsa transaction directly on the Shard as an + // optimization to avoid the extra overhead of a separate ShardTransaction actor. On the last + // BatchedModifications message, the caller sets the ready flag in the message indicating + // modifications are complete. The reply contains the cohort actor path (this actor) for the caller + // to initiate the 3-phase commit. This also avoids the overhead of sending an additional + // ReadyTransaction message. + + // If we're not the leader then forward to the leader. This is a safety measure - we shouldn't + // normally get here if we're not the leader as the front-end (TransactionProxy) should determine + // the primary/leader shard. However with timing and caching on the front-end, there's a small + // window where it could have a stale leader during leadership transitions. + // + if(isLeader()) { + try { + BatchedModificationsReply reply = commitCoordinator.handleTransactionModifications(batched); + sender().tell(reply, self()); + } catch (Exception e) { + LOG.error("{}: Error handling BatchedModifications for Tx {}", persistenceId(), + batched.getTransactionID(), e); + getSender().tell(new akka.actor.Status.Failure(e), getSelf()); + } + } else { + ActorSelection leader = getLeader(); + if(leader != null) { + // TODO: what if this is not the first batch and leadership changed in between batched messages? + // We could check if the commitCoordinator already has a cached entry and forward all the previous + // batched modifications. + LOG.debug("{}: Forwarding BatchedModifications to leader {}", persistenceId(), leader); + leader.forward(batched, getContext()); + } else { + // TODO: rather than throwing an immediate exception, we could schedule a timer to try again to make + // it more resilient in case we're in the process of electing a new leader. + getSender().tell(new akka.actor.Status.Failure(new NoShardLeaderException(String.format( + "Could not find the leader for shard %s. This typically happens" + + " when the system is coming up or recovering and a leader is being elected. Try again" + + " later.", persistenceId()))), getSelf()); + } + } + } + private void handleForwardedReadyTransaction(ForwardedReadyTransaction ready) { LOG.debug("{}: Readying transaction {}, client version {}", persistenceId(), ready.getTransactionID(), ready.getTxnClientVersion()); @@ -459,7 +505,7 @@ public class Shard extends RaftActor { // commitCoordinator in preparation for the subsequent three phase commit initiated by // the front-end. commitCoordinator.transactionReady(ready.getTransactionID(), ready.getCohort(), - ready.getModification()); + (MutableCompositeModification) ready.getModification()); // Return our actor path as we'll handle the three phase commit, except if the Tx client // version < 1 (Helium-1 version). This means the Tx was initiated by a base Helium version @@ -536,56 +582,18 @@ public class Shard extends RaftActor { } private void closeTransactionChain(final CloseTransactionChain closeTransactionChain) { - DOMStoreTransactionChain chain = - transactionChains.remove(closeTransactionChain.getTransactionChainId()); - - if(chain != null) { - chain.close(); - } + transactionFactory.closeTransactionChain(closeTransactionChain.getTransactionChainId()); } private ActorRef createTypedTransactionActor(int transactionType, ShardTransactionIdentifier transactionId, String transactionChainId, short clientVersion ) { - DOMStoreTransactionFactory factory = store; - - if(!transactionChainId.isEmpty()) { - factory = transactionChains.get(transactionChainId); - if(factory == null){ - DOMStoreTransactionChain transactionChain = store.createTransactionChain(); - transactionChains.put(transactionChainId, transactionChain); - factory = transactionChain; - } - } - - if(this.schemaContext == null) { - throw new IllegalStateException("SchemaContext is not set"); - } + DOMStoreTransaction transaction = transactionFactory.newTransaction( + TransactionProxy.TransactionType.fromInt(transactionType), transactionId.toString(), + transactionChainId); - if (transactionType == TransactionProxy.TransactionType.WRITE_ONLY.ordinal()) { - - shardMBean.incrementWriteOnlyTransactionCount(); - - return createShardTransaction(factory.newWriteOnlyTransaction(), transactionId, clientVersion); - - } else if (transactionType == TransactionProxy.TransactionType.READ_WRITE.ordinal()) { - - shardMBean.incrementReadWriteTransactionCount(); - - return createShardTransaction(factory.newReadWriteTransaction(), transactionId, clientVersion); - - } else if (transactionType == TransactionProxy.TransactionType.READ_ONLY.ordinal()) { - - shardMBean.incrementReadOnlyTransactionCount(); - - return createShardTransaction(factory.newReadOnlyTransaction(), transactionId, clientVersion); - - } else { - throw new IllegalArgumentException( - "Shard="+name + ":CreateTransaction message has unidentified transaction type=" - + transactionType); - } + return createShardTransaction(transaction, transactionId, clientVersion); } private ActorRef createShardTransaction(DOMStoreTransaction transaction, ShardTransactionIdentifier transactionId, @@ -906,17 +914,14 @@ public class Shard extends RaftActor { } // If this actor is no longer the leader close all the transaction chains - if(!isLeader){ - for(Map.Entry entry : transactionChains.entrySet()){ - if(LOG.isDebugEnabled()) { - LOG.debug( - "{}: onStateChanged: Closing transaction chain {} because shard {} is no longer the leader", - persistenceId(), entry.getKey(), getId()); - } - entry.getValue().close(); + if(!isLeader) { + if(LOG.isDebugEnabled()) { + LOG.debug( + "{}: onStateChanged: Closing all transaction chains because shard {} is no longer the leader", + persistenceId(), getId()); } - transactionChains.clear(); + transactionFactory.closeAllTransactionChains(); } } @@ -926,7 +931,7 @@ public class Shard extends RaftActor { } @Override public String persistenceId() { - return this.name.toString(); + return this.name; } @VisibleForTesting @@ -934,6 +939,12 @@ public class Shard extends RaftActor { return dataPersistenceProvider; } + @VisibleForTesting + ShardCommitCoordinator getCommitCoordinator() { + return commitCoordinator; + } + + private static class ShardCreator implements Creator { private static final long serialVersionUID = 1L; diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardCommitCoordinator.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardCommitCoordinator.java index 5d0ca38d6a..54f15fcb4b 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardCommitCoordinator.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardCommitCoordinator.java @@ -9,17 +9,26 @@ package org.opendaylight.controller.cluster.datastore; import akka.actor.ActorRef; import akka.actor.Status; +import akka.serialization.Serialization; +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Preconditions; import com.google.common.cache.Cache; import com.google.common.cache.CacheBuilder; +import com.google.common.cache.RemovalCause; +import com.google.common.cache.RemovalListener; +import com.google.common.cache.RemovalNotification; import java.util.LinkedList; import java.util.Queue; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; +import org.opendaylight.controller.cluster.datastore.messages.BatchedModifications; +import org.opendaylight.controller.cluster.datastore.messages.BatchedModificationsReply; import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransaction; import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransactionReply; -import org.opendaylight.controller.cluster.datastore.modification.CompositeModification; import org.opendaylight.controller.cluster.datastore.modification.Modification; +import org.opendaylight.controller.cluster.datastore.modification.MutableCompositeModification; import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort; +import org.opendaylight.controller.sal.core.spi.data.DOMStoreWriteTransaction; import org.slf4j.Logger; /** @@ -29,10 +38,17 @@ import org.slf4j.Logger; */ public class ShardCommitCoordinator { + // Interface hook for unit tests to replace or decorate the DOMStoreThreePhaseCommitCohorts. + public interface CohortDecorator { + DOMStoreThreePhaseCommitCohort decorate(String transactionID, DOMStoreThreePhaseCommitCohort actual); + } + private final Cache cohortCache; private CohortEntry currentCohortEntry; + private final DOMTransactionFactory transactionFactory; + private final Queue queuedCohortEntries; private int queueCapacity; @@ -41,14 +57,33 @@ public class ShardCommitCoordinator { private final String name; - public ShardCommitCoordinator(long cacheExpiryTimeoutInSec, int queueCapacity, Logger log, - String name) { - cohortCache = CacheBuilder.newBuilder().expireAfterAccess( - cacheExpiryTimeoutInSec, TimeUnit.SECONDS).build(); + private final String shardActorPath; + + private final RemovalListener cacheRemovalListener = + new RemovalListener() { + @Override + public void onRemoval(RemovalNotification notification) { + if(notification.getCause() == RemovalCause.EXPIRED) { + log.warn("{}: Transaction {} was timed out of the cache", name, notification.getKey()); + } + } + }; + + // This is a hook for unit tests to replace or decorate the DOMStoreThreePhaseCommitCohorts. + private CohortDecorator cohortDecorator; + + public ShardCommitCoordinator(DOMTransactionFactory transactionFactory, + long cacheExpiryTimeoutInSec, int queueCapacity, ActorRef shardActor, Logger log, String name) { this.queueCapacity = queueCapacity; this.log = log; this.name = name; + this.transactionFactory = transactionFactory; + + shardActorPath = Serialization.serializedActorPath(shardActor); + + cohortCache = CacheBuilder.newBuilder().expireAfterAccess(cacheExpiryTimeoutInSec, TimeUnit.SECONDS). + removalListener(cacheRemovalListener).build(); // We use a LinkedList here to avoid synchronization overhead with concurrent queue impls // since this should only be accessed on the shard's dispatcher. @@ -60,19 +95,62 @@ public class ShardCommitCoordinator { } /** - * This method caches a cohort entry for the given transactions ID in preparation for the - * subsequent 3-phase commit. + * This method is called to ready a transaction that was prepared by ShardTransaction actor. It caches + * the prepared cohort entry for the given transactions ID in preparation for the subsequent 3-phase commit. * * @param transactionID the ID of the transaction * @param cohort the cohort to participate in the transaction commit - * @param modification the modification made by the transaction + * @param modification the modifications made by the transaction */ public void transactionReady(String transactionID, DOMStoreThreePhaseCommitCohort cohort, - Modification modification) { + MutableCompositeModification modification) { cohortCache.put(transactionID, new CohortEntry(transactionID, cohort, modification)); } + /** + * This method handles a BatchedModifications message for a transaction being prepared directly on the + * Shard actor instead of via a ShardTransaction actor. If there's no currently cached + * DOMStoreWriteTransaction, one is created. The batched modifications are applied to the write Tx. If + * the BatchedModifications is ready to commit then a DOMStoreThreePhaseCommitCohort is created. + * + * @param batched the BatchedModifications + * @param shardActor the transaction's shard actor + * + * @throws ExecutionException if an error occurs loading the cache + */ + public BatchedModificationsReply handleTransactionModifications(BatchedModifications batched) + throws ExecutionException { + CohortEntry cohortEntry = cohortCache.getIfPresent(batched.getTransactionID()); + if(cohortEntry == null) { + cohortEntry = new CohortEntry(batched.getTransactionID(), + transactionFactory.newTransaction( + TransactionProxy.TransactionType.WRITE_ONLY, batched.getTransactionID(), + batched.getTransactionChainID())); + cohortCache.put(batched.getTransactionID(), cohortEntry); + } + + if(log.isDebugEnabled()) { + log.debug("{}: Applying {} batched modifications for Tx {}", name, + batched.getModifications().size(), batched.getTransactionID()); + } + + cohortEntry.applyModifications(batched.getModifications()); + + String cohortPath = null; + if(batched.isReady()) { + if(log.isDebugEnabled()) { + log.debug("{}: Readying Tx {}, client version {}", name, + batched.getTransactionID(), batched.getVersion()); + } + + cohortEntry.ready(cohortDecorator); + cohortPath = shardActorPath; + } + + return new BatchedModificationsReply(batched.getModifications().size(), cohortPath); + } + /** * This method handles the canCommit phase for a transaction. * @@ -216,19 +294,33 @@ public class ShardCommitCoordinator { } } + @VisibleForTesting + void setCohortDecorator(CohortDecorator cohortDecorator) { + this.cohortDecorator = cohortDecorator; + } + + static class CohortEntry { private final String transactionID; - private final DOMStoreThreePhaseCommitCohort cohort; - private final Modification modification; + private DOMStoreThreePhaseCommitCohort cohort; + private final MutableCompositeModification compositeModification; + private final DOMStoreWriteTransaction transaction; private ActorRef canCommitSender; private ActorRef shard; private long lastAccessTime; + CohortEntry(String transactionID, DOMStoreWriteTransaction transaction) { + this.compositeModification = new MutableCompositeModification(); + this.transaction = transaction; + this.transactionID = transactionID; + } + CohortEntry(String transactionID, DOMStoreThreePhaseCommitCohort cohort, - Modification modification) { + MutableCompositeModification compositeModification) { this.transactionID = transactionID; this.cohort = cohort; - this.modification = modification; + this.compositeModification = compositeModification; + this.transaction = null; } void updateLastAccessTime() { @@ -247,8 +339,26 @@ public class ShardCommitCoordinator { return cohort; } - Modification getModification() { - return modification; + MutableCompositeModification getModification() { + return compositeModification; + } + + void applyModifications(Iterable modifications) { + for(Modification modification: modifications) { + compositeModification.addModification(modification); + modification.apply(transaction); + } + } + + void ready(CohortDecorator cohortDecorator) { + Preconditions.checkState(cohort == null, "cohort was already set"); + + cohort = transaction.ready(); + + if(cohortDecorator != null) { + // Call the hook for unit tests. + cohort = cohortDecorator.decorate(transactionID, cohort); + } } ActorRef getCanCommitSender() { @@ -268,10 +378,7 @@ public class ShardCommitCoordinator { } boolean hasModifications(){ - if(modification instanceof CompositeModification){ - return ((CompositeModification) modification).getModifications().size() > 0; - } - return true; + return compositeModification.getModifications().size() > 0; } } } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionContextImpl.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionContextImpl.java index c1f9c78e69..3a209630c3 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionContextImpl.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionContextImpl.java @@ -16,6 +16,7 @@ import com.google.common.util.concurrent.SettableFuture; import java.util.List; import org.opendaylight.controller.cluster.datastore.identifiers.TransactionIdentifier; import org.opendaylight.controller.cluster.datastore.messages.BatchedModifications; +import org.opendaylight.controller.cluster.datastore.messages.BatchedModificationsReply; import org.opendaylight.controller.cluster.datastore.messages.CloseTransaction; import org.opendaylight.controller.cluster.datastore.messages.DataExists; import org.opendaylight.controller.cluster.datastore.messages.DataExistsReply; @@ -40,8 +41,8 @@ import scala.concurrent.Future; public class TransactionContextImpl extends AbstractTransactionContext { private static final Logger LOG = LoggerFactory.getLogger(TransactionContextImpl.class); + private final String transactionChainId; private final ActorContext actorContext; - private final String transactionPath; private final ActorSelection actor; private final boolean isTxActorLocal; private final short remoteTransactionVersion; @@ -49,12 +50,12 @@ public class TransactionContextImpl extends AbstractTransactionContext { private final OperationCompleter operationCompleter; private BatchedModifications batchedModifications; - protected TransactionContextImpl(String transactionPath, ActorSelection actor, TransactionIdentifier identifier, - ActorContext actorContext, SchemaContext schemaContext, - boolean isTxActorLocal, short remoteTransactionVersion, OperationCompleter operationCompleter) { + protected TransactionContextImpl(ActorSelection actor, TransactionIdentifier identifier, + String transactionChainId, ActorContext actorContext, SchemaContext schemaContext, boolean isTxActorLocal, + short remoteTransactionVersion, OperationCompleter operationCompleter) { super(identifier); - this.transactionPath = transactionPath; this.actor = actor; + this.transactionChainId = transactionChainId; this.actorContext = actorContext; this.isTxActorLocal = isTxActorLocal; this.remoteTransactionVersion = remoteTransactionVersion; @@ -71,6 +72,10 @@ public class TransactionContextImpl extends AbstractTransactionContext { return actor; } + protected ActorContext getActorContext() { + return actorContext; + } + protected short getRemoteTransactionVersion() { return remoteTransactionVersion; } @@ -93,21 +98,24 @@ public class TransactionContextImpl extends AbstractTransactionContext { // Send the remaining batched modifications if any. - sendBatchedModifications(); + sendAndRecordBatchedModifications(); // Send the ReadyTransaction message to the Tx actor. - final Future replyFuture = executeOperationAsync(ReadyTransaction.INSTANCE); + Future readyReplyFuture = executeOperationAsync(ReadyTransaction.INSTANCE); + return combineRecordedOperationsFutures(readyReplyFuture); + } + + protected Future combineRecordedOperationsFutures(final Future withLastReplyFuture) { // Combine all the previously recorded put/merge/delete operation reply Futures and the // ReadyTransactionReply Future into one Future. If any one fails then the combined // Future will fail. We need all prior operations and the ready operation to succeed // in order to attempt commit. - List> futureList = - Lists.newArrayListWithCapacity(recordedOperationFutures.size() + 1); + List> futureList = Lists.newArrayListWithCapacity(recordedOperationFutures.size() + 1); futureList.addAll(recordedOperationFutures); - futureList.add(replyFuture); + futureList.add(withLastReplyFuture); Future> combinedFutures = akka.dispatch.Futures.sequence(futureList, actorContext.getClientDispatcher()); @@ -129,28 +137,15 @@ public class TransactionContextImpl extends AbstractTransactionContext { // de-serializing each reply. // Note the Future get call here won't block as it's complete. - Object serializedReadyReply = replyFuture.value().get().get(); + Object serializedReadyReply = withLastReplyFuture.value().get().get(); if (serializedReadyReply instanceof ReadyTransactionReply) { return actorContext.actorSelection(((ReadyTransactionReply)serializedReadyReply).getCohortPath()); - + } else if(serializedReadyReply instanceof BatchedModificationsReply) { + return actorContext.actorSelection(((BatchedModificationsReply)serializedReadyReply).getCohortPath()); } else if(serializedReadyReply.getClass().equals(ReadyTransactionReply.SERIALIZABLE_CLASS)) { ReadyTransactionReply reply = ReadyTransactionReply.fromSerializable(serializedReadyReply); - String cohortPath = reply.getCohortPath(); - - // In Helium we used to return the local path of the actor which represented - // a remote ThreePhaseCommitCohort. The local path would then be converted to - // a remote path using this resolvePath method. To maintain compatibility with - // a Helium node we need to continue to do this conversion. - // At some point in the future when upgrades from Helium are not supported - // we could remove this code to resolvePath and just use the cohortPath as the - // resolved cohortPath - if(TransactionContextImpl.this.remoteTransactionVersion < - DataStoreVersions.HELIUM_1_VERSION) { - cohortPath = actorContext.resolvePath(transactionPath, cohortPath); - } - + String cohortPath = deserializeCohortPath(reply.getCohortPath()); return actorContext.actorSelection(cohortPath); - } else { // Throwing an exception here will fail the Future. throw new IllegalArgumentException(String.format("%s: Invalid reply type %s", @@ -160,27 +155,51 @@ public class TransactionContextImpl extends AbstractTransactionContext { }, TransactionProxy.SAME_FAILURE_TRANSFORMER, actorContext.getClientDispatcher()); } + protected String deserializeCohortPath(String cohortPath) { + return cohortPath; + } + private void batchModification(Modification modification) { if(batchedModifications == null) { - batchedModifications = new BatchedModifications(remoteTransactionVersion); + batchedModifications = new BatchedModifications(identifier.toString(), remoteTransactionVersion, + transactionChainId); } batchedModifications.addModification(modification); if(batchedModifications.getModifications().size() >= actorContext.getDatastoreContext().getShardBatchedModificationCount()) { - sendBatchedModifications(); + sendAndRecordBatchedModifications(); } } - private void sendBatchedModifications() { + private void sendAndRecordBatchedModifications() { + Future sentFuture = sendBatchedModifications(); + if(sentFuture != null) { + recordedOperationFutures.add(sentFuture); + } + } + + protected Future sendBatchedModifications() { + return sendBatchedModifications(false); + } + + protected Future sendBatchedModifications(boolean ready) { + Future sent = null; if(batchedModifications != null) { - LOG.debug("Tx {} sending {} batched modifications", identifier, - batchedModifications.getModifications().size()); + if(LOG.isDebugEnabled()) { + LOG.debug("Tx {} sending {} batched modifications, ready: {}", identifier, + batchedModifications.getModifications().size(), ready); + } - recordedOperationFutures.add(executeOperationAsync(batchedModifications)); - batchedModifications = null; + batchedModifications.setReady(ready); + sent = executeOperationAsync(batchedModifications); + + batchedModifications = new BatchedModifications(identifier.toString(), remoteTransactionVersion, + transactionChainId); } + + return sent; } @Override @@ -212,7 +231,7 @@ public class TransactionContextImpl extends AbstractTransactionContext { // Send the remaining batched modifications if any. - sendBatchedModifications(); + sendAndRecordBatchedModifications(); // If there were any previous recorded put/merge/delete operation reply Futures then we // must wait for them to successfully complete. This is necessary to honor the read @@ -297,7 +316,7 @@ public class TransactionContextImpl extends AbstractTransactionContext { // Send the remaining batched modifications if any. - sendBatchedModifications(); + sendAndRecordBatchedModifications(); // If there were any previous recorded put/merge/delete operation reply Futures then we // must wait for them to successfully complete. This is necessary to honor the read diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionProxy.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionProxy.java index 64b9086c25..7f2f328135 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionProxy.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionProxy.java @@ -70,7 +70,19 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction { public static enum TransactionType { READ_ONLY, WRITE_ONLY, - READ_WRITE + READ_WRITE; + + public static TransactionType fromInt(int type) { + if(type == WRITE_ONLY.ordinal()) { + return WRITE_ONLY; + } else if(type == READ_WRITE.ordinal()) { + return READ_WRITE; + } else if(type == READ_ONLY.ordinal()) { + return READ_ONLY; + } else { + throw new IllegalArgumentException("In TransactionType enum value" + type); + } + } } static final Mapper SAME_FAILURE_TRANSFORMER = @@ -550,10 +562,20 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction { * Sets the target primary shard and initiates a CreateTransaction try. */ void setPrimaryShard(ActorSelection primaryShard) { - LOG.debug("Tx {} Primary shard found - trying create transaction", identifier); - this.primaryShard = primaryShard; - tryCreateTransaction(); + + if(transactionType == TransactionType.WRITE_ONLY && + actorContext.getDatastoreContext().isWriteOnlyTransactionOptimizationsEnabled()) { + LOG.debug("Tx {} Primary shard found - creating WRITE_ONLY transaction context", identifier); + + // For write-only Tx's we prepare the transaction modifications directly on the shard actor + // to avoid the overhead of creating a separate transaction actor. + // FIXME: can't assume the shard version is LITHIUM_VERSION - need to obtain it somehow. + executeTxOperatonsOnComplete(createValidTransactionContext(this.primaryShard, + this.primaryShard.path().toString(), DataStoreVersions.LITHIUM_VERSION)); + } else { + tryCreateTransaction(); + } } /** @@ -563,7 +585,7 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction { boolean invokeOperation = true; synchronized(txOperationsOnComplete) { if(transactionContext == null) { - LOG.debug("Tx {} Adding operation on complete {}", identifier); + LOG.debug("Tx {} Adding operation on complete", identifier); invokeOperation = false; txOperationsOnComplete.add(operation); @@ -590,6 +612,8 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction { * Performs a CreateTransaction try async. */ private void tryCreateTransaction() { + LOG.debug("Tx {} Primary shard found - trying create transaction", identifier); + Object serializedCreateMessage = new CreateTransaction(identifier.toString(), TransactionProxy.this.transactionType.ordinal(), getTransactionChainId()).toSerializable(); @@ -636,8 +660,7 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction { // TransactionContext until after we've executed all cached TransactionOperations. TransactionContext localTransactionContext; if(failure != null) { - LOG.debug("Tx {} Creating NoOpTransaction because of error: {}", identifier, - failure.getMessage()); + LOG.debug("Tx {} Creating NoOpTransaction because of error", identifier, failure); localTransactionContext = new NoOpTransactionContext(failure, identifier, operationLimiter); } else if (response.getClass().equals(CreateTransactionReply.SERIALIZABLE_CLASS)) { @@ -687,11 +710,14 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction { } private TransactionContext createValidTransactionContext(CreateTransactionReply reply) { - String transactionPath = reply.getTransactionPath(); - LOG.debug("Tx {} Received {}", identifier, reply); - ActorSelection transactionActor = actorContext.actorSelection(transactionPath); + return createValidTransactionContext(actorContext.actorSelection(reply.getTransactionPath()), + reply.getTransactionPath(), reply.getVersion()); + } + + private TransactionContext createValidTransactionContext(ActorSelection transactionActor, + String transactionPath, short remoteTransactionVersion) { if (transactionType == TransactionType.READ_ONLY) { // Read-only Tx's aren't explicitly closed by the client so we create a PhantomReference @@ -720,12 +746,17 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction { // Check if TxActor is created in the same node boolean isTxActorLocal = actorContext.isPathLocal(transactionPath); - if(reply.getVersion() >= DataStoreVersions.LITHIUM_VERSION) { - return new TransactionContextImpl(transactionPath, transactionActor, identifier, - actorContext, schemaContext, isTxActorLocal, reply.getVersion(), operationCompleter); - } else { + if(remoteTransactionVersion < DataStoreVersions.LITHIUM_VERSION) { return new PreLithiumTransactionContextImpl(transactionPath, transactionActor, identifier, - actorContext, schemaContext, isTxActorLocal, reply.getVersion(), operationCompleter); + transactionChainId, actorContext, schemaContext, isTxActorLocal, remoteTransactionVersion, + operationCompleter); + } else if (transactionType == TransactionType.WRITE_ONLY && + actorContext.getDatastoreContext().isWriteOnlyTransactionOptimizationsEnabled()) { + return new WriteOnlyTransactionContextImpl(transactionActor, identifier, transactionChainId, + actorContext, schemaContext, isTxActorLocal, remoteTransactionVersion, operationCompleter); + } else { + return new TransactionContextImpl(transactionActor, identifier, transactionChainId, + actorContext, schemaContext, isTxActorLocal, remoteTransactionVersion, operationCompleter); } } } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/WriteOnlyTransactionContextImpl.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/WriteOnlyTransactionContextImpl.java new file mode 100644 index 0000000000..3b4a190a9e --- /dev/null +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/WriteOnlyTransactionContextImpl.java @@ -0,0 +1,44 @@ +/* + * Copyright (c) 2015 Brocade Communications Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.controller.cluster.datastore; + +import akka.actor.ActorSelection; +import org.opendaylight.controller.cluster.datastore.identifiers.TransactionIdentifier; +import org.opendaylight.controller.cluster.datastore.utils.ActorContext; +import org.opendaylight.yangtools.yang.model.api.SchemaContext; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import scala.concurrent.Future; + +/** + * Context for a write-only transaction. + * + * @author Thomas Pantelis + */ +public class WriteOnlyTransactionContextImpl extends TransactionContextImpl { + private static final Logger LOG = LoggerFactory.getLogger(WriteOnlyTransactionContextImpl.class); + + public WriteOnlyTransactionContextImpl(ActorSelection actor, TransactionIdentifier identifier, + String transactionChainId, ActorContext actorContext, SchemaContext schemaContext, boolean isTxActorLocal, + short remoteTransactionVersion, OperationCompleter operationCompleter) { + super(actor, identifier, transactionChainId, actorContext, schemaContext, isTxActorLocal, + remoteTransactionVersion, operationCompleter); + } + + @Override + public Future readyTransaction() { + LOG.debug("Tx {} readyTransaction called with {} previous recorded operations pending", + identifier, recordedOperationFutures.size()); + + // Send the remaining batched modifications if any. + + Future lastModificationsFuture = sendBatchedModifications(true); + + return combineRecordedOperationsFutures(lastModificationsFuture); + } +} diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/compat/PreLithiumTransactionContextImpl.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/compat/PreLithiumTransactionContextImpl.java index e407c7cc47..ccfb329692 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/compat/PreLithiumTransactionContextImpl.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/compat/PreLithiumTransactionContextImpl.java @@ -8,16 +8,21 @@ package org.opendaylight.controller.cluster.datastore.compat; import akka.actor.ActorSelection; +import org.opendaylight.controller.cluster.datastore.DataStoreVersions; import org.opendaylight.controller.cluster.datastore.OperationCompleter; import org.opendaylight.controller.cluster.datastore.TransactionContextImpl; import org.opendaylight.controller.cluster.datastore.identifiers.TransactionIdentifier; import org.opendaylight.controller.cluster.datastore.messages.DeleteData; import org.opendaylight.controller.cluster.datastore.messages.MergeData; +import org.opendaylight.controller.cluster.datastore.messages.ReadyTransaction; import org.opendaylight.controller.cluster.datastore.messages.WriteData; import org.opendaylight.controller.cluster.datastore.utils.ActorContext; import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode; import org.opendaylight.yangtools.yang.model.api.SchemaContext; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import scala.concurrent.Future; /** * Implementation of TransactionContextImpl used when talking to a pre-Lithium controller that doesn't @@ -26,12 +31,16 @@ import org.opendaylight.yangtools.yang.model.api.SchemaContext; * @author Thomas Pantelis */ public class PreLithiumTransactionContextImpl extends TransactionContextImpl { + private static final Logger LOG = LoggerFactory.getLogger(PreLithiumTransactionContextImpl.class); + + private final String transactionPath; public PreLithiumTransactionContextImpl(String transactionPath, ActorSelection actor, TransactionIdentifier identifier, - ActorContext actorContext, SchemaContext schemaContext, boolean isTxActorLocal, + String transactionChainId, ActorContext actorContext, SchemaContext schemaContext, boolean isTxActorLocal, short remoteTransactionVersion, OperationCompleter operationCompleter) { - super(transactionPath, actor, identifier, actorContext, schemaContext, isTxActorLocal, - remoteTransactionVersion, operationCompleter); + super(actor, identifier, transactionChainId, actorContext, schemaContext, isTxActorLocal, + remoteTransactionVersion, operationCompleter); + this.transactionPath = transactionPath; } @Override @@ -51,4 +60,32 @@ public class PreLithiumTransactionContextImpl extends TransactionContextImpl { recordedOperationFutures.add(executeOperationAsync( new WriteData(path, data, getRemoteTransactionVersion()))); } + + @Override + public Future readyTransaction() { + LOG.debug("Tx {} readyTransaction called with {} previous recorded operations pending", + identifier, recordedOperationFutures.size()); + + // Send the ReadyTransaction message to the Tx actor. + + Future lastReplyFuture = executeOperationAsync(ReadyTransaction.INSTANCE); + + return combineRecordedOperationsFutures(lastReplyFuture); + } + + @Override + protected String deserializeCohortPath(String cohortPath) { + // In base Helium we used to return the local path of the actor which represented + // a remote ThreePhaseCommitCohort. The local path would then be converted to + // a remote path using this resolvePath method. To maintain compatibility with + // a Helium node we need to continue to do this conversion. + // At some point in the future when upgrades from Helium are not supported + // we could remove this code to resolvePath and just use the cohortPath as the + // resolved cohortPath + if(getRemoteTransactionVersion() < DataStoreVersions.HELIUM_1_VERSION) { + return getActorContext().resolvePath(transactionPath, cohortPath); + } + + return cohortPath; + } } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/identifiers/ShardTransactionIdentifier.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/identifiers/ShardTransactionIdentifier.java index d1f9495d86..fa1525c574 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/identifiers/ShardTransactionIdentifier.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/identifiers/ShardTransactionIdentifier.java @@ -12,10 +12,14 @@ import com.google.common.base.Preconditions; public class ShardTransactionIdentifier { private final String remoteTransactionId; + private final String stringRepresentation; public ShardTransactionIdentifier(String remoteTransactionId) { this.remoteTransactionId = Preconditions.checkNotNull(remoteTransactionId, "remoteTransactionId should not be null"); + + stringRepresentation = new StringBuilder(remoteTransactionId.length() + 6).append("shard-"). + append(remoteTransactionId).toString(); } public String getRemoteTransactionId() { @@ -46,9 +50,7 @@ public class ShardTransactionIdentifier { } @Override public String toString() { - final StringBuilder sb = new StringBuilder(); - sb.append("shard-").append(remoteTransactionId); - return sb.toString(); + return stringRepresentation; } } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/BatchedModifications.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/BatchedModifications.java index 670641f6ac..a9ce94b033 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/BatchedModifications.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/BatchedModifications.java @@ -7,6 +7,10 @@ */ package org.opendaylight.controller.cluster.datastore.messages; +import com.google.common.base.Preconditions; +import java.io.IOException; +import java.io.ObjectInput; +import java.io.ObjectOutput; import org.opendaylight.controller.cluster.datastore.modification.MutableCompositeModification; /** @@ -17,15 +21,61 @@ import org.opendaylight.controller.cluster.datastore.modification.MutableComposi public class BatchedModifications extends MutableCompositeModification implements SerializableMessage { private static final long serialVersionUID = 1L; + private boolean ready; + private String transactionID; + private String transactionChainID; + public BatchedModifications() { } - public BatchedModifications(short version) { + public BatchedModifications(String transactionID, short version, String transactionChainID) { super(version); + this.transactionID = Preconditions.checkNotNull(transactionID, "transactionID can't be null"); + this.transactionChainID = transactionChainID != null ? transactionChainID : ""; + } + + public boolean isReady() { + return ready; + } + + public void setReady(boolean ready) { + this.ready = ready; + } + + public String getTransactionID() { + return transactionID; + } + + public String getTransactionChainID() { + return transactionChainID; + } + + @Override + public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { + super.readExternal(in); + transactionID = in.readUTF(); + transactionChainID = in.readUTF(); + ready = in.readBoolean(); + } + + @Override + public void writeExternal(ObjectOutput out) throws IOException { + super.writeExternal(out); + out.writeUTF(transactionID); + out.writeUTF(transactionChainID); + out.writeBoolean(ready); } @Override public Object toSerializable() { return this; } + + @Override + public String toString() { + StringBuilder builder = new StringBuilder(); + builder.append("BatchedModifications [transactionID=").append(transactionID).append(", ready=").append(ready) + .append(", modifications size=").append(getModifications().size()).append("]"); + return builder.toString(); + } } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/BatchedModificationsReply.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/BatchedModificationsReply.java index 33c5733fdb..a10c6ac3fb 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/BatchedModificationsReply.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/BatchedModificationsReply.java @@ -19,7 +19,11 @@ import java.io.ObjectOutput; public class BatchedModificationsReply extends VersionedExternalizableMessage { private static final long serialVersionUID = 1L; + private static final byte COHORT_PATH_NOT_PRESENT = 0; + private static final byte COHORT_PATH_PRESENT = 1; + private int numBatched; + private String cohortPath; public BatchedModificationsReply() { } @@ -28,25 +32,52 @@ public class BatchedModificationsReply extends VersionedExternalizableMessage { this.numBatched = numBatched; } + public BatchedModificationsReply(int numBatched, String cohortPath) { + this.numBatched = numBatched; + this.cohortPath = cohortPath; + } public int getNumBatched() { return numBatched; } + public String getCohortPath() { + return cohortPath; + } + @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { super.readExternal(in); numBatched = in.readInt(); + + if(in.readByte() == COHORT_PATH_PRESENT) { + cohortPath = in.readUTF(); + } } @Override public void writeExternal(ObjectOutput out) throws IOException { super.writeExternal(out); out.writeInt(numBatched); + + if(cohortPath != null) { + out.writeByte(COHORT_PATH_PRESENT); + out.writeUTF(cohortPath); + } else { + out.writeByte(COHORT_PATH_NOT_PRESENT); + } } @Override public Object toSerializable() { return this; } + + @Override + public String toString() { + StringBuilder builder = new StringBuilder(); + builder.append("BatchedModificationsReply [numBatched=").append(numBatched).append(", cohortPath=") + .append(cohortPath).append("]"); + return builder.toString(); + } } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/VersionedExternalizableMessage.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/VersionedExternalizableMessage.java index 2a660fa4b2..b34737be54 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/VersionedExternalizableMessage.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/VersionedExternalizableMessage.java @@ -11,6 +11,7 @@ import java.io.Externalizable; import java.io.IOException; import java.io.ObjectInput; import java.io.ObjectOutput; +import org.opendaylight.controller.cluster.datastore.DataStoreVersions; /** * Abstract base class for a versioned Externalizable message. @@ -20,7 +21,7 @@ import java.io.ObjectOutput; public abstract class VersionedExternalizableMessage implements Externalizable, SerializableMessage { private static final long serialVersionUID = 1L; - private short version; + private short version = DataStoreVersions.CURRENT_VERSION; public VersionedExternalizableMessage() { } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/AbstractTransactionProxyTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/AbstractTransactionProxyTest.java index 4896b059c7..c6c5486ee3 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/AbstractTransactionProxyTest.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/AbstractTransactionProxyTest.java @@ -94,8 +94,7 @@ public abstract class AbstractTransactionProxyTest { protected final String memberName = "mock-member"; - protected final Builder dataStoreContextBuilder = DatastoreContext.newBuilder().operationTimeoutInSeconds(2). - shardBatchedModificationCount(1); + protected final Builder dataStoreContextBuilder = DatastoreContext.newBuilder().operationTimeoutInSeconds(2); @BeforeClass public static void setUpClass() throws IOException { @@ -251,6 +250,13 @@ public abstract class AbstractTransactionProxyTest { eq(actorSelection(actorRef)), isA(BatchedModifications.class)); } + protected void expectBatchedModificationsReady(ActorRef actorRef, int count) { + Future replyFuture = Futures.successful( + new BatchedModificationsReply(count, actorRef.path().toString())); + doReturn(replyFuture).when(mockActorContext).executeOperationAsync( + eq(actorSelection(actorRef)), isA(BatchedModifications.class)); + } + protected void expectBatchedModifications(int count) { doReturn(batchedModificationsReply(count)).when(mockActorContext).executeOperationAsync( any(ActorSelection.class), isA(BatchedModifications.class)); @@ -307,15 +313,21 @@ public abstract class AbstractTransactionProxyTest { protected ActorRef setupActorContextWithInitialCreateTransaction(ActorSystem actorSystem, TransactionType type, int transactionVersion, String prefix, ActorRef shardActorRef) { - ActorRef txActorRef = actorSystem.actorOf(Props.create(DoNothingActor.class)); - log.info("Created mock shard Tx actor {}", txActorRef); + ActorRef txActorRef; + if(type == TransactionType.WRITE_ONLY && transactionVersion >= DataStoreVersions.LITHIUM_VERSION && + dataStoreContextBuilder.build().isWriteOnlyTransactionOptimizationsEnabled()) { + txActorRef = shardActorRef; + } else { + txActorRef = actorSystem.actorOf(Props.create(DoNothingActor.class)); + log.info("Created mock shard Tx actor {}", txActorRef); - doReturn(actorSystem.actorSelection(txActorRef.path())).when(mockActorContext).actorSelection( - txActorRef.path().toString()); + doReturn(actorSystem.actorSelection(txActorRef.path())). + when(mockActorContext).actorSelection(txActorRef.path().toString()); - doReturn(Futures.successful(createTransactionReply(txActorRef, transactionVersion))).when(mockActorContext). - executeOperationAsync(eq(actorSystem.actorSelection(shardActorRef.path())), - eqCreateTransaction(prefix, type)); + doReturn(Futures.successful(createTransactionReply(txActorRef, transactionVersion))).when(mockActorContext). + executeOperationAsync(eq(actorSystem.actorSelection(shardActorRef.path())), + eqCreateTransaction(prefix, type)); + } return txActorRef; } @@ -358,17 +370,18 @@ public abstract class AbstractTransactionProxyTest { return captured; } - protected void verifyOneBatchedModification(ActorRef actorRef, Modification expected) { + protected void verifyOneBatchedModification(ActorRef actorRef, Modification expected, boolean expIsReady) { List batchedModifications = captureBatchedModifications(actorRef); assertEquals("Captured BatchedModifications count", 1, batchedModifications.size()); - verifyBatchedModifications(batchedModifications.get(0), expected); + verifyBatchedModifications(batchedModifications.get(0), expIsReady, expected); } - protected void verifyBatchedModifications(Object message, Modification... expected) { + protected void verifyBatchedModifications(Object message, boolean expIsReady, Modification... expected) { assertEquals("Message type", BatchedModifications.class, message.getClass()); BatchedModifications batchedModifications = (BatchedModifications)message; assertEquals("BatchedModifications size", expected.length, batchedModifications.getModifications().size()); + assertEquals("isReady", expIsReady, batchedModifications.isReady()); for(int i = 0; i < batchedModifications.getModifications().size(); i++) { Modification actual = batchedModifications.getModifications().get(i); assertEquals("Modification type", expected[i].getClass(), actual.getClass()); diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DistributedDataStoreIntegrationTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DistributedDataStoreIntegrationTest.java index 54a9e2dd94..a3c5eb4b00 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DistributedDataStoreIntegrationTest.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DistributedDataStoreIntegrationTest.java @@ -163,7 +163,8 @@ public class DistributedDataStoreIntegrationTest extends AbstractActorTest { // Create the write Tx - final DOMStoreWriteTransaction writeTx = dataStore.newWriteOnlyTransaction(); + // TODO - we'll want to test this with write-only as well when FindPrimary returns the leader shard. + final DOMStoreWriteTransaction writeTx = dataStore.newReadWriteTransaction(); assertNotNull("newReadWriteTransaction returned null", writeTx); // Do some modification operations and ready the Tx on a separate thread. @@ -473,7 +474,7 @@ public class DistributedDataStoreIntegrationTest extends AbstractActorTest { // Create the write Tx. - final DOMStoreWriteTransaction writeTx = dataStore.newWriteOnlyTransaction(); + final DOMStoreWriteTransaction writeTx = dataStore.newReadWriteTransaction(); assertNotNull("newReadWriteTransaction returned null", writeTx); // Do some modifications and ready the Tx on a separate thread. diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTest.java index d888d62cff..0fbe68665e 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTest.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTest.java @@ -10,6 +10,7 @@ import static org.mockito.Mockito.inOrder; import static org.mockito.Mockito.mock; import static org.opendaylight.controller.cluster.datastore.DataStoreVersions.CURRENT_VERSION; import akka.actor.ActorRef; +import akka.actor.ActorSelection; import akka.actor.PoisonPill; import akka.actor.Props; import akka.dispatch.Dispatchers; @@ -35,6 +36,7 @@ import java.util.Set; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReference; import org.junit.Test; import org.mockito.InOrder; @@ -42,6 +44,8 @@ import org.opendaylight.controller.cluster.DataPersistenceProvider; import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier; import org.opendaylight.controller.cluster.datastore.messages.AbortTransaction; import org.opendaylight.controller.cluster.datastore.messages.AbortTransactionReply; +import org.opendaylight.controller.cluster.datastore.messages.BatchedModifications; +import org.opendaylight.controller.cluster.datastore.messages.BatchedModificationsReply; import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransaction; import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransactionReply; import org.opendaylight.controller.cluster.datastore.messages.CommitTransaction; @@ -49,6 +53,8 @@ import org.opendaylight.controller.cluster.datastore.messages.CommitTransactionR import org.opendaylight.controller.cluster.datastore.messages.CreateTransaction; import org.opendaylight.controller.cluster.datastore.messages.ForwardedReadyTransaction; import org.opendaylight.controller.cluster.datastore.messages.PeerAddressResolved; +import org.opendaylight.controller.cluster.datastore.messages.ReadData; +import org.opendaylight.controller.cluster.datastore.messages.ReadDataReply; import org.opendaylight.controller.cluster.datastore.messages.ReadyTransactionReply; import org.opendaylight.controller.cluster.datastore.messages.RegisterChangeListener; import org.opendaylight.controller.cluster.datastore.messages.RegisterChangeListenerReply; @@ -89,6 +95,7 @@ import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCoh import org.opendaylight.controller.sal.core.spi.data.DOMStoreWriteTransaction; import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.PathArgument; +import org.opendaylight.yangtools.yang.data.api.schema.ContainerNode; import org.opendaylight.yangtools.yang.data.api.schema.DataContainerChild; import org.opendaylight.yangtools.yang.data.api.schema.MapEntryNode; import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode; @@ -98,6 +105,7 @@ import scala.concurrent.Future; import scala.concurrent.duration.FiniteDuration; public class ShardTest extends AbstractShardTest { + @Test public void testRegisterChangeListener() throws Exception { new ShardTestKit(getSystem()) {{ @@ -424,42 +432,42 @@ public class ShardTest extends AbstractShardTest { waitUntilLeader(shard); - // Setup 3 simulated transactions with mock cohorts backed by real cohorts. - - InMemoryDOMDataStore dataStore = shard.underlyingActor().getDataStore(); - - String transactionID1 = "tx1"; - MutableCompositeModification modification1 = new MutableCompositeModification(); - DOMStoreThreePhaseCommitCohort cohort1 = setupMockWriteTransaction("cohort1", dataStore, - TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME), modification1); + final String transactionID1 = "tx1"; + final String transactionID2 = "tx2"; + final String transactionID3 = "tx3"; - String transactionID2 = "tx2"; - MutableCompositeModification modification2 = new MutableCompositeModification(); - DOMStoreThreePhaseCommitCohort cohort2 = setupMockWriteTransaction("cohort2", dataStore, - TestModel.OUTER_LIST_PATH, - ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build(), - modification2); + final AtomicReference mockCohort1 = new AtomicReference<>(); + final AtomicReference mockCohort2 = new AtomicReference<>(); + final AtomicReference mockCohort3 = new AtomicReference<>(); + ShardCommitCoordinator.CohortDecorator cohortDecorator = new ShardCommitCoordinator.CohortDecorator() { + @Override + public DOMStoreThreePhaseCommitCohort decorate(String transactionID, DOMStoreThreePhaseCommitCohort actual) { + if(transactionID.equals(transactionID1)) { + mockCohort1.set(createDelegatingMockCohort("cohort1", actual)); + return mockCohort1.get(); + } else if(transactionID.equals(transactionID2)) { + mockCohort2.set(createDelegatingMockCohort("cohort2", actual)); + return mockCohort2.get(); + } else { + mockCohort3.set(createDelegatingMockCohort("cohort3", actual)); + return mockCohort3.get(); + } + } + }; - String transactionID3 = "tx3"; - MutableCompositeModification modification3 = new MutableCompositeModification(); - DOMStoreThreePhaseCommitCohort cohort3 = setupMockWriteTransaction("cohort3", dataStore, - YangInstanceIdentifier.builder(TestModel.OUTER_LIST_PATH) - .nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1).build(), - ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1), - modification3); + shard.underlyingActor().getCommitCoordinator().setCohortDecorator(cohortDecorator); long timeoutSec = 5; final FiniteDuration duration = FiniteDuration.create(timeoutSec, TimeUnit.SECONDS); final Timeout timeout = new Timeout(duration); - // Simulate the ForwardedReadyTransaction message for the first Tx that would be sent - // by the ShardTransaction. + // Send a BatchedModifications message for the first transaction. - shard.tell(new ForwardedReadyTransaction(transactionID1, CURRENT_VERSION, - cohort1, modification1, true), getRef()); - ReadyTransactionReply readyReply = ReadyTransactionReply.fromSerializable( - expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS)); - assertEquals("Cohort path", shard.path().toString(), readyReply.getCohortPath()); + shard.tell(newBatchedModifications(transactionID1, TestModel.TEST_PATH, + ImmutableNodes.containerNode(TestModel.TEST_QNAME), true), getRef()); + BatchedModificationsReply batchedReply = expectMsgClass(duration, BatchedModificationsReply.class); + assertEquals("getCohortPath", shard.path().toString(), batchedReply.getCohortPath()); + assertEquals("getNumBatched", 1, batchedReply.getNumBatched()); // Send the CanCommitTransaction message for the first Tx. @@ -468,15 +476,16 @@ public class ShardTest extends AbstractShardTest { expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS)); assertEquals("Can commit", true, canCommitReply.getCanCommit()); - // Send the ForwardedReadyTransaction for the next 2 Tx's. + // Send BatchedModifications for the next 2 Tx's. - shard.tell(new ForwardedReadyTransaction(transactionID2, CURRENT_VERSION, - cohort2, modification2, true), getRef()); - expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS); + shard.tell(newBatchedModifications(transactionID2, TestModel.OUTER_LIST_PATH, + ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build(), true), getRef()); + expectMsgClass(duration, BatchedModificationsReply.class); - shard.tell(new ForwardedReadyTransaction(transactionID3, CURRENT_VERSION, - cohort3, modification3, true), getRef()); - expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS); + shard.tell(newBatchedModifications(transactionID3, YangInstanceIdentifier.builder( + TestModel.OUTER_LIST_PATH).nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1).build(), + ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1), true), getRef()); + expectMsgClass(duration, BatchedModificationsReply.class); // Send the CanCommitTransaction message for the next 2 Tx's. These should get queued and // processed after the first Tx completes. @@ -569,16 +578,16 @@ public class ShardTest extends AbstractShardTest { assertEquals("Commits complete", true, done); - InOrder inOrder = inOrder(cohort1, cohort2, cohort3); - inOrder.verify(cohort1).canCommit(); - inOrder.verify(cohort1).preCommit(); - inOrder.verify(cohort1).commit(); - inOrder.verify(cohort2).canCommit(); - inOrder.verify(cohort2).preCommit(); - inOrder.verify(cohort2).commit(); - inOrder.verify(cohort3).canCommit(); - inOrder.verify(cohort3).preCommit(); - inOrder.verify(cohort3).commit(); + InOrder inOrder = inOrder(mockCohort1.get(), mockCohort2.get(), mockCohort3.get()); + inOrder.verify(mockCohort1.get()).canCommit(); + inOrder.verify(mockCohort1.get()).preCommit(); + inOrder.verify(mockCohort1.get()).commit(); + inOrder.verify(mockCohort2.get()).canCommit(); + inOrder.verify(mockCohort2.get()).preCommit(); + inOrder.verify(mockCohort2.get()).commit(); + inOrder.verify(mockCohort3.get()).canCommit(); + inOrder.verify(mockCohort3.get()).preCommit(); + inOrder.verify(mockCohort3.get()).commit(); // Verify data in the data store. @@ -601,34 +610,62 @@ public class ShardTest extends AbstractShardTest { }}; } + private BatchedModifications newBatchedModifications(String transactionID, YangInstanceIdentifier path, + NormalizedNode data, boolean ready) { + return newBatchedModifications(transactionID, null, path, data, ready); + } + + private BatchedModifications newBatchedModifications(String transactionID, String transactionChainID, + YangInstanceIdentifier path, NormalizedNode data, boolean ready) { + BatchedModifications batched = new BatchedModifications(transactionID, CURRENT_VERSION, transactionChainID); + batched.addModification(new WriteModification(path, data)); + batched.setReady(ready); + return batched; + } + + @SuppressWarnings("unchecked") @Test - public void testCommitWithPersistenceDisabled() throws Throwable { - dataStoreContextBuilder.persistent(false); + public void testMultipleBatchedModifications() throws Throwable { new ShardTestKit(getSystem()) {{ final TestActorRef shard = TestActorRef.create(getSystem(), newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()), - "testCommitPhaseFailure"); + "testMultipleBatchedModifications"); waitUntilLeader(shard); - InMemoryDOMDataStore dataStore = shard.underlyingActor().getDataStore(); + final String transactionID = "tx"; + FiniteDuration duration = duration("5 seconds"); - // Setup a simulated transactions with a mock cohort. + final AtomicReference mockCohort = new AtomicReference<>(); + ShardCommitCoordinator.CohortDecorator cohortDecorator = new ShardCommitCoordinator.CohortDecorator() { + @Override + public DOMStoreThreePhaseCommitCohort decorate(String txID, DOMStoreThreePhaseCommitCohort actual) { + if(mockCohort.get() == null) { + mockCohort.set(createDelegatingMockCohort("cohort", actual)); + } - String transactionID = "tx"; - MutableCompositeModification modification = new MutableCompositeModification(); - NormalizedNode containerNode = ImmutableNodes.containerNode(TestModel.TEST_QNAME); - DOMStoreThreePhaseCommitCohort cohort = setupMockWriteTransaction("cohort", dataStore, - TestModel.TEST_PATH, containerNode, modification); + return mockCohort.get(); + } + }; - FiniteDuration duration = duration("5 seconds"); + shard.underlyingActor().getCommitCoordinator().setCohortDecorator(cohortDecorator); + + // Send a BatchedModifications to start a transaction. + + shard.tell(newBatchedModifications(transactionID, TestModel.TEST_PATH, + ImmutableNodes.containerNode(TestModel.TEST_QNAME), false), getRef()); + expectMsgClass(duration, BatchedModificationsReply.class); + + // Send a couple more BatchedModifications. - // Simulate the ForwardedReadyTransaction messages that would be sent - // by the ShardTransaction. + shard.tell(newBatchedModifications(transactionID, TestModel.OUTER_LIST_PATH, + ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build(), false), getRef()); + expectMsgClass(duration, BatchedModificationsReply.class); - shard.tell(new ForwardedReadyTransaction(transactionID, CURRENT_VERSION, - cohort, modification, true), getRef()); - expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS); + shard.tell(newBatchedModifications(transactionID, YangInstanceIdentifier.builder( + TestModel.OUTER_LIST_PATH).nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1).build(), + ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1), true), getRef()); + expectMsgClass(duration, BatchedModificationsReply.class); // Send the CanCommitTransaction message. @@ -642,10 +679,153 @@ public class ShardTest extends AbstractShardTest { shard.tell(new CommitTransaction(transactionID).toSerializable(), getRef()); expectMsgClass(duration, CommitTransactionReply.SERIALIZABLE_CLASS); - InOrder inOrder = inOrder(cohort); - inOrder.verify(cohort).canCommit(); - inOrder.verify(cohort).preCommit(); - inOrder.verify(cohort).commit(); + InOrder inOrder = inOrder(mockCohort.get()); + inOrder.verify(mockCohort.get()).canCommit(); + inOrder.verify(mockCohort.get()).preCommit(); + inOrder.verify(mockCohort.get()).commit(); + + // Verify data in the data store. + + NormalizedNode outerList = readStore(shard, TestModel.OUTER_LIST_PATH); + assertNotNull(TestModel.OUTER_LIST_QNAME.getLocalName() + " not found", outerList); + assertTrue(TestModel.OUTER_LIST_QNAME.getLocalName() + " value is not Iterable", + outerList.getValue() instanceof Iterable); + Object entry = ((Iterable)outerList.getValue()).iterator().next(); + assertTrue(TestModel.OUTER_LIST_QNAME.getLocalName() + " entry is not MapEntryNode", + entry instanceof MapEntryNode); + MapEntryNode mapEntry = (MapEntryNode)entry; + Optional> idLeaf = + mapEntry.getChild(new YangInstanceIdentifier.NodeIdentifier(TestModel.ID_QNAME)); + assertTrue("Missing leaf " + TestModel.ID_QNAME.getLocalName(), idLeaf.isPresent()); + assertEquals(TestModel.ID_QNAME.getLocalName() + " value", 1, idLeaf.get().getValue()); + + shard.tell(PoisonPill.getInstance(), ActorRef.noSender()); + }}; + } + + @Test + public void testBatchedModificationsOnTransactionChain() throws Throwable { + new ShardTestKit(getSystem()) {{ + final TestActorRef shard = TestActorRef.create(getSystem(), + newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()), + "testBatchedModificationsOnTransactionChain"); + + waitUntilLeader(shard); + + String transactionChainID = "txChain"; + String transactionID1 = "tx1"; + String transactionID2 = "tx2"; + + FiniteDuration duration = duration("5 seconds"); + + // Send a BatchedModifications to start a chained write transaction and ready it. + + ContainerNode containerNode = ImmutableNodes.containerNode(TestModel.TEST_QNAME); + YangInstanceIdentifier path = TestModel.TEST_PATH; + shard.tell(newBatchedModifications(transactionID1, transactionChainID, path, + containerNode, true), getRef()); + expectMsgClass(duration, BatchedModificationsReply.class); + + // Create a read Tx on the same chain. + + shard.tell(new CreateTransaction(transactionID2, TransactionProxy.TransactionType.READ_ONLY.ordinal() , + transactionChainID).toSerializable(), getRef()); + + CreateTransactionReply createReply = expectMsgClass(duration("3 seconds"), CreateTransactionReply.class); + + getSystem().actorSelection(createReply.getTransactionActorPath()).tell(new ReadData(path), getRef()); + ReadDataReply readReply = expectMsgClass(duration("3 seconds"), ReadDataReply.class); + assertEquals("Read node", containerNode, readReply.getNormalizedNode()); + + // Commit the write transaction. + + shard.tell(new CanCommitTransaction(transactionID1).toSerializable(), getRef()); + CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable( + expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS)); + assertEquals("Can commit", true, canCommitReply.getCanCommit()); + + shard.tell(new CommitTransaction(transactionID1).toSerializable(), getRef()); + expectMsgClass(duration, CommitTransactionReply.SERIALIZABLE_CLASS); + + // Verify data in the data store. + + NormalizedNode actualNode = readStore(shard, path); + assertEquals("Stored node", containerNode, actualNode); + + shard.tell(PoisonPill.getInstance(), ActorRef.noSender()); + }}; + } + + @Test + public void testOnBatchedModificationsWhenNotLeader() { + final AtomicBoolean overrideLeaderCalls = new AtomicBoolean(); + new ShardTestKit(getSystem()) {{ + Creator creator = new Creator() { + @Override + public Shard create() throws Exception { + return new Shard(shardID, Collections.emptyMap(), + newDatastoreContext(), SCHEMA_CONTEXT) { + @Override + protected boolean isLeader() { + return overrideLeaderCalls.get() ? false : super.isLeader(); + } + + @Override + protected ActorSelection getLeader() { + return overrideLeaderCalls.get() ? getSystem().actorSelection(getRef().path()) : + super.getLeader(); + } + }; + } + }; + + TestActorRef shard = TestActorRef.create(getSystem(), + Props.create(new DelegatingShardCreator(creator)), "testOnBatchedModificationsWhenNotLeader"); + + waitUntilLeader(shard); + + overrideLeaderCalls.set(true); + + BatchedModifications batched = new BatchedModifications("tx", DataStoreVersions.CURRENT_VERSION, ""); + + shard.tell(batched, ActorRef.noSender()); + + expectMsgEquals(batched); + + shard.tell(PoisonPill.getInstance(), ActorRef.noSender()); + }}; + } + + @Test + public void testCommitWithPersistenceDisabled() throws Throwable { + dataStoreContextBuilder.persistent(false); + new ShardTestKit(getSystem()) {{ + final TestActorRef shard = TestActorRef.create(getSystem(), + newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()), + "testCommitWithPersistenceDisabled"); + + waitUntilLeader(shard); + + String transactionID = "tx"; + FiniteDuration duration = duration("5 seconds"); + + // Send a BatchedModifications to start a transaction. + + NormalizedNode containerNode = ImmutableNodes.containerNode(TestModel.TEST_QNAME); + shard.tell(newBatchedModifications(transactionID, TestModel.TEST_PATH, containerNode, true), getRef()); + expectMsgClass(duration, BatchedModificationsReply.class); + + // Send the CanCommitTransaction message. + + shard.tell(new CanCommitTransaction(transactionID).toSerializable(), getRef()); + CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable( + expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS)); + assertEquals("Can commit", true, canCommitReply.getCanCommit()); + + // Send the CanCommitTransaction message. + + shard.tell(new CommitTransaction(transactionID).toSerializable(), getRef()); + expectMsgClass(duration, CommitTransactionReply.SERIALIZABLE_CLASS); NormalizedNode actualNode = readStore(shard, TestModel.TEST_PATH); assertEquals(TestModel.TEST_QNAME.getLocalName(), containerNode, actualNode); @@ -774,34 +954,40 @@ public class ShardTest extends AbstractShardTest { waitUntilLeader(shard); - // Setup 2 simulated transactions with mock cohorts. The first one fails in the - // commit phase. + // Setup 2 mock cohorts. The first one fails in the commit phase. - String transactionID1 = "tx1"; - MutableCompositeModification modification1 = new MutableCompositeModification(); - DOMStoreThreePhaseCommitCohort cohort1 = mock(DOMStoreThreePhaseCommitCohort.class, "cohort1"); + final String transactionID1 = "tx1"; + final DOMStoreThreePhaseCommitCohort cohort1 = mock(DOMStoreThreePhaseCommitCohort.class, "cohort1"); doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort1).canCommit(); doReturn(Futures.immediateFuture(null)).when(cohort1).preCommit(); doReturn(Futures.immediateFailedFuture(new IllegalStateException("mock"))).when(cohort1).commit(); - String transactionID2 = "tx2"; - MutableCompositeModification modification2 = new MutableCompositeModification(); - DOMStoreThreePhaseCommitCohort cohort2 = mock(DOMStoreThreePhaseCommitCohort.class, "cohort2"); + final String transactionID2 = "tx2"; + final DOMStoreThreePhaseCommitCohort cohort2 = mock(DOMStoreThreePhaseCommitCohort.class, "cohort2"); doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort2).canCommit(); + ShardCommitCoordinator.CohortDecorator cohortDecorator = new ShardCommitCoordinator.CohortDecorator() { + @Override + public DOMStoreThreePhaseCommitCohort decorate(String transactionID, + DOMStoreThreePhaseCommitCohort actual) { + return transactionID1.equals(transactionID) ? cohort1 : cohort2; + } + }; + + shard.underlyingActor().getCommitCoordinator().setCohortDecorator(cohortDecorator); + FiniteDuration duration = duration("5 seconds"); final Timeout timeout = new Timeout(duration); - // Simulate the ForwardedReadyTransaction messages that would be sent - // by the ShardTransaction. + // Send BatchedModifications to start and ready each transaction. - shard.tell(new ForwardedReadyTransaction(transactionID1, CURRENT_VERSION, - cohort1, modification1, true), getRef()); - expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS); + shard.tell(newBatchedModifications(transactionID1, TestModel.TEST_PATH, + ImmutableNodes.containerNode(TestModel.TEST_QNAME), true), getRef()); + expectMsgClass(duration, BatchedModificationsReply.class); - shard.tell(new ForwardedReadyTransaction(transactionID2, CURRENT_VERSION, - cohort2, modification2, true), getRef()); - expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS); + shard.tell(newBatchedModifications(transactionID2, TestModel.TEST_PATH, + ImmutableNodes.containerNode(TestModel.TEST_QNAME), true), getRef()); + expectMsgClass(duration, BatchedModificationsReply.class); // Send the CanCommitTransaction message for the first Tx. @@ -854,19 +1040,27 @@ public class ShardTest extends AbstractShardTest { waitUntilLeader(shard); String transactionID = "tx1"; - MutableCompositeModification modification = new MutableCompositeModification(); - DOMStoreThreePhaseCommitCohort cohort = mock(DOMStoreThreePhaseCommitCohort.class, "cohort1"); + final DOMStoreThreePhaseCommitCohort cohort = mock(DOMStoreThreePhaseCommitCohort.class, "cohort1"); doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort).canCommit(); doReturn(Futures.immediateFailedFuture(new IllegalStateException("mock"))).when(cohort).preCommit(); + ShardCommitCoordinator.CohortDecorator cohortDecorator = new ShardCommitCoordinator.CohortDecorator() { + @Override + public DOMStoreThreePhaseCommitCohort decorate(String transactionID, + DOMStoreThreePhaseCommitCohort actual) { + return cohort; + } + }; + + shard.underlyingActor().getCommitCoordinator().setCohortDecorator(cohortDecorator); + FiniteDuration duration = duration("5 seconds"); - // Simulate the ForwardedReadyTransaction messages that would be sent - // by the ShardTransaction. + // Send BatchedModifications to start and ready a transaction. - shard.tell(new ForwardedReadyTransaction(transactionID, CURRENT_VERSION, - cohort, modification, true), getRef()); - expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS); + shard.tell(newBatchedModifications(transactionID, TestModel.TEST_PATH, + ImmutableNodes.containerNode(TestModel.TEST_QNAME), true), getRef()); + expectMsgClass(duration, BatchedModificationsReply.class); // Send the CanCommitTransaction message. @@ -901,16 +1095,24 @@ public class ShardTest extends AbstractShardTest { final FiniteDuration duration = duration("5 seconds"); String transactionID = "tx1"; - MutableCompositeModification modification = new MutableCompositeModification(); - DOMStoreThreePhaseCommitCohort cohort = mock(DOMStoreThreePhaseCommitCohort.class, "cohort1"); + final DOMStoreThreePhaseCommitCohort cohort = mock(DOMStoreThreePhaseCommitCohort.class, "cohort1"); doReturn(Futures.immediateFailedFuture(new IllegalStateException("mock"))).when(cohort).canCommit(); - // Simulate the ForwardedReadyTransaction messages that would be sent - // by the ShardTransaction. + ShardCommitCoordinator.CohortDecorator cohortDecorator = new ShardCommitCoordinator.CohortDecorator() { + @Override + public DOMStoreThreePhaseCommitCohort decorate(String transactionID, + DOMStoreThreePhaseCommitCohort actual) { + return cohort; + } + }; + + shard.underlyingActor().getCommitCoordinator().setCohortDecorator(cohortDecorator); + + // Send BatchedModifications to start and ready a transaction. - shard.tell(new ForwardedReadyTransaction(transactionID, CURRENT_VERSION, - cohort, modification, true), getRef()); - expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS); + shard.tell(newBatchedModifications(transactionID, TestModel.TEST_PATH, + ImmutableNodes.containerNode(TestModel.TEST_QNAME), true), getRef()); + expectMsgClass(duration, BatchedModificationsReply.class); // Send the CanCommitTransaction message. @@ -954,14 +1156,9 @@ public class ShardTest extends AbstractShardTest { } }; - MutableCompositeModification modification = new MutableCompositeModification(); - DOMStoreThreePhaseCommitCohort cohort = setupMockWriteTransaction("cohort1", dataStore, - TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME), - modification, preCommit); - - shard.tell(new ForwardedReadyTransaction(transactionID, CURRENT_VERSION, - cohort, modification, true), getRef()); - expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS); + shard.tell(newBatchedModifications(transactionID, TestModel.TEST_PATH, + ImmutableNodes.containerNode(TestModel.TEST_QNAME), true), getRef()); + expectMsgClass(duration, BatchedModificationsReply.class); shard.tell(new CanCommitTransaction(transactionID).toSerializable(), getRef()); CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable( @@ -995,42 +1192,26 @@ public class ShardTest extends AbstractShardTest { final FiniteDuration duration = duration("5 seconds"); - InMemoryDOMDataStore dataStore = shard.underlyingActor().getDataStore(); - writeToStore(shard, TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME)); writeToStore(shard, TestModel.OUTER_LIST_PATH, ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build()); - // Create 1st Tx - will timeout + // Create and ready the 1st Tx - will timeout String transactionID1 = "tx1"; - MutableCompositeModification modification1 = new MutableCompositeModification(); - DOMStoreThreePhaseCommitCohort cohort1 = setupMockWriteTransaction("cohort1", dataStore, - YangInstanceIdentifier.builder(TestModel.OUTER_LIST_PATH) - .nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1).build(), - ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1), - modification1); + shard.tell(newBatchedModifications(transactionID1, YangInstanceIdentifier.builder( + TestModel.OUTER_LIST_PATH).nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1).build(), + ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1), true), getRef()); + expectMsgClass(duration, BatchedModificationsReply.class); - // Create 2nd Tx + // Create and ready the 2nd Tx - String transactionID2 = "tx3"; - MutableCompositeModification modification2 = new MutableCompositeModification(); + String transactionID2 = "tx2"; YangInstanceIdentifier listNodePath = YangInstanceIdentifier.builder(TestModel.OUTER_LIST_PATH) - .nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 2).build(); - DOMStoreThreePhaseCommitCohort cohort2 = setupMockWriteTransaction("cohort3", dataStore, - listNodePath, - ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 2), - modification2); - - // Ready the Tx's - - shard.tell(new ForwardedReadyTransaction(transactionID1, CURRENT_VERSION, - cohort1, modification1, true), getRef()); - expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS); - - shard.tell(new ForwardedReadyTransaction(transactionID2, CURRENT_VERSION, - cohort2, modification2, true), getRef()); - expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS); + .nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 2).build(); + shard.tell(newBatchedModifications(transactionID2, listNodePath, + ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 2), true), getRef()); + expectMsgClass(duration, BatchedModificationsReply.class); // canCommit 1st Tx. We don't send the commit so it should timeout. @@ -1067,38 +1248,23 @@ public class ShardTest extends AbstractShardTest { final FiniteDuration duration = duration("5 seconds"); - InMemoryDOMDataStore dataStore = shard.underlyingActor().getDataStore(); - String transactionID1 = "tx1"; - MutableCompositeModification modification1 = new MutableCompositeModification(); - DOMStoreThreePhaseCommitCohort cohort1 = setupMockWriteTransaction("cohort1", dataStore, - TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME), modification1); - String transactionID2 = "tx2"; - MutableCompositeModification modification2 = new MutableCompositeModification(); - DOMStoreThreePhaseCommitCohort cohort2 = setupMockWriteTransaction("cohort2", dataStore, - TestModel.OUTER_LIST_PATH, - ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build(), - modification2); - String transactionID3 = "tx3"; - MutableCompositeModification modification3 = new MutableCompositeModification(); - DOMStoreThreePhaseCommitCohort cohort3 = setupMockWriteTransaction("cohort3", dataStore, - TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME), modification3); - // Ready the Tx's + // Send a BatchedModifications to start transactions and ready them. - shard.tell(new ForwardedReadyTransaction(transactionID1, CURRENT_VERSION, - cohort1, modification1, true), getRef()); - expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS); + shard.tell(newBatchedModifications(transactionID1, TestModel.TEST_PATH, + ImmutableNodes.containerNode(TestModel.TEST_QNAME), true), getRef()); + expectMsgClass(duration, BatchedModificationsReply.class); - shard.tell(new ForwardedReadyTransaction(transactionID2, CURRENT_VERSION, - cohort2, modification2, true), getRef()); - expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS); + shard.tell(newBatchedModifications(transactionID2,TestModel.OUTER_LIST_PATH, + ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build(), true), getRef()); + expectMsgClass(duration, BatchedModificationsReply.class); - shard.tell(new ForwardedReadyTransaction(transactionID3, CURRENT_VERSION, - cohort3, modification3, true), getRef()); - expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS); + shard.tell(newBatchedModifications(transactionID3, TestModel.TEST_PATH, + ImmutableNodes.containerNode(TestModel.TEST_QNAME), true), getRef()); + expectMsgClass(duration, BatchedModificationsReply.class); // canCommit 1st Tx. @@ -1143,30 +1309,37 @@ public class ShardTest extends AbstractShardTest { // Setup 2 simulated transactions with mock cohorts. The first one will be aborted. - String transactionID1 = "tx1"; - MutableCompositeModification modification1 = new MutableCompositeModification(); - DOMStoreThreePhaseCommitCohort cohort1 = mock(DOMStoreThreePhaseCommitCohort.class, "cohort1"); + final String transactionID1 = "tx1"; + final DOMStoreThreePhaseCommitCohort cohort1 = mock(DOMStoreThreePhaseCommitCohort.class, "cohort1"); doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort1).canCommit(); doReturn(Futures.immediateFuture(null)).when(cohort1).abort(); - String transactionID2 = "tx2"; - MutableCompositeModification modification2 = new MutableCompositeModification(); - DOMStoreThreePhaseCommitCohort cohort2 = mock(DOMStoreThreePhaseCommitCohort.class, "cohort2"); + final String transactionID2 = "tx2"; + final DOMStoreThreePhaseCommitCohort cohort2 = mock(DOMStoreThreePhaseCommitCohort.class, "cohort2"); doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort2).canCommit(); FiniteDuration duration = duration("5 seconds"); final Timeout timeout = new Timeout(duration); - // Simulate the ForwardedReadyTransaction messages that would be sent - // by the ShardTransaction. + ShardCommitCoordinator.CohortDecorator cohortDecorator = new ShardCommitCoordinator.CohortDecorator() { + @Override + public DOMStoreThreePhaseCommitCohort decorate(String transactionID, + DOMStoreThreePhaseCommitCohort actual) { + return transactionID1.equals(transactionID) ? cohort1 : cohort2; + } + }; + + shard.underlyingActor().getCommitCoordinator().setCohortDecorator(cohortDecorator); - shard.tell(new ForwardedReadyTransaction(transactionID1, CURRENT_VERSION, - cohort1, modification1, true), getRef()); - expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS); + // Send BatchedModifications to start and ready each transaction. - shard.tell(new ForwardedReadyTransaction(transactionID2, CURRENT_VERSION, - cohort2, modification2, true), getRef()); - expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS); + shard.tell(newBatchedModifications(transactionID1, TestModel.TEST_PATH, + ImmutableNodes.containerNode(TestModel.TEST_QNAME), true), getRef()); + expectMsgClass(duration, BatchedModificationsReply.class); + + shard.tell(newBatchedModifications(transactionID2, TestModel.TEST_PATH, + ImmutableNodes.containerNode(TestModel.TEST_QNAME), true), getRef()); + expectMsgClass(duration, BatchedModificationsReply.class); // Send the CanCommitTransaction message for the first Tx. @@ -1380,6 +1553,7 @@ public class ShardTest extends AbstractShardTest { shard2.tell(PoisonPill.getInstance(), ActorRef.noSender()); }}; + } @Test diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTransactionTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTransactionTest.java index c6b5cb4402..8ebb145728 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTransactionTest.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTransactionTest.java @@ -372,7 +372,7 @@ public class ShardTransactionTest extends AbstractActorTest { YangInstanceIdentifier deletePath = TestModel.TEST_PATH; - BatchedModifications batched = new BatchedModifications(DataStoreVersions.CURRENT_VERSION); + BatchedModifications batched = new BatchedModifications("tx1", DataStoreVersions.CURRENT_VERSION, null); batched.addModification(new WriteModification(writePath, writeData)); batched.addModification(new MergeModification(mergePath, mergeData)); batched.addModification(new DeleteModification(deletePath)); diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/TransactionChainProxyTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/TransactionChainProxyTest.java index 4f00ed5f4b..acba775445 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/TransactionChainProxyTest.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/TransactionChainProxyTest.java @@ -29,8 +29,11 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; import org.junit.Assert; import org.junit.Test; +import org.opendaylight.controller.cluster.datastore.messages.BatchedModifications; +import org.opendaylight.controller.cluster.datastore.messages.BatchedModificationsReply; import org.opendaylight.controller.cluster.datastore.messages.ReadyTransaction; import org.opendaylight.controller.cluster.datastore.modification.WriteModification; +import org.opendaylight.controller.cluster.datastore.shardstrategy.DefaultShardStrategy; import org.opendaylight.controller.cluster.datastore.utils.ActorContext; import org.opendaylight.controller.md.cluster.datastore.model.TestModel; import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadTransaction; @@ -111,6 +114,74 @@ public class TransactionChainProxyTest extends AbstractTransactionProxyTest { verify(mockActorContext, times(0)).acquireTxCreationPermit(); } + /** + * Tests 2 successive chained write-only transactions and verifies the second transaction isn't + * initiated until the first one completes its read future. + */ + @Test + public void testChainedWriteOnlyTransactions() throws Exception { + dataStoreContextBuilder.writeOnlyTransactionOptimizationsEnabled(true); + + TransactionChainProxy txChainProxy = new TransactionChainProxy(mockActorContext); + + ActorRef txActorRef1 = setupActorContextWithoutInitialCreateTransaction(getSystem()); + + Promise batchedReplyPromise1 = akka.dispatch.Futures.promise(); + doReturn(batchedReplyPromise1.future()).when(mockActorContext).executeOperationAsync( + eq(actorSelection(txActorRef1)), isA(BatchedModifications.class)); + + DOMStoreWriteTransaction writeTx1 = txChainProxy.newWriteOnlyTransaction(); + + NormalizedNode writeNode1 = ImmutableNodes.containerNode(TestModel.TEST_QNAME); + writeTx1.write(TestModel.TEST_PATH, writeNode1); + + writeTx1.ready(); + + verify(mockActorContext, times(1)).findPrimaryShardAsync(eq(DefaultShardStrategy.DEFAULT_SHARD)); + + verifyOneBatchedModification(txActorRef1, new WriteModification(TestModel.TEST_PATH, writeNode1), true); + + ActorRef txActorRef2 = setupActorContextWithoutInitialCreateTransaction(getSystem()); + + expectBatchedModifications(txActorRef2, 1); + + final NormalizedNode writeNode2 = ImmutableNodes.containerNode(TestModel.OUTER_LIST_QNAME); + + final DOMStoreWriteTransaction writeTx2 = txChainProxy.newWriteOnlyTransaction(); + + final AtomicReference caughtEx = new AtomicReference<>(); + final CountDownLatch write2Complete = new CountDownLatch(1); + new Thread() { + @Override + public void run() { + try { + writeTx2.write(TestModel.OUTER_LIST_PATH, writeNode2); + } catch (Exception e) { + caughtEx.set(e); + } finally { + write2Complete.countDown(); + } + } + }.start(); + + assertEquals("Tx 2 write should've completed", true, write2Complete.await(5, TimeUnit.SECONDS)); + + if(caughtEx.get() != null) { + throw caughtEx.get(); + } + + try { + verify(mockActorContext, times(1)).findPrimaryShardAsync(eq(DefaultShardStrategy.DEFAULT_SHARD)); + } catch (AssertionError e) { + fail("Tx 2 should not have initiated until the Tx 1's ready future completed"); + } + + batchedReplyPromise1.success(new BatchedModificationsReply(1, txActorRef1.path().toString())); + + // Tx 2 should've proceeded to find the primary shard. + verify(mockActorContext, timeout(5000).times(2)).findPrimaryShardAsync(eq(DefaultShardStrategy.DEFAULT_SHARD)); + } + /** * Tests 2 successive chained read-write transactions and verifies the second transaction isn't * initiated until the first one completes its read future. @@ -134,7 +205,7 @@ public class TransactionChainProxyTest extends AbstractTransactionProxyTest { writeTx1.ready(); - verifyOneBatchedModification(txActorRef1, new WriteModification(TestModel.TEST_PATH, writeNode1)); + verifyOneBatchedModification(txActorRef1, new WriteModification(TestModel.TEST_PATH, writeNode1), false); String tx2MemberName = "tx2MemberName"; doReturn(tx2MemberName).when(mockActorContext).getCurrentMemberName(); diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/TransactionProxyTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/TransactionProxyTest.java index 8278d3cffc..ac2c079641 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/TransactionProxyTest.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/TransactionProxyTest.java @@ -9,6 +9,7 @@ import static org.mockito.Matchers.anyString; import static org.mockito.Matchers.eq; import static org.mockito.Matchers.isA; import static org.mockito.Mockito.doReturn; +import static org.mockito.Mockito.never; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.opendaylight.controller.cluster.datastore.TransactionProxy.TransactionType.READ_ONLY; @@ -31,6 +32,7 @@ import org.junit.Assert; import org.junit.Test; import org.mockito.InOrder; import org.mockito.Mockito; +import org.opendaylight.controller.cluster.datastore.TransactionProxy.TransactionType; import org.opendaylight.controller.cluster.datastore.exceptions.PrimaryNotFoundException; import org.opendaylight.controller.cluster.datastore.exceptions.TimeoutException; import org.opendaylight.controller.cluster.datastore.messages.BatchedModifications; @@ -384,24 +386,18 @@ public class TransactionProxyTest extends AbstractTransactionProxyTest { @Test public void testWrite() throws Exception { + dataStoreContextBuilder.shardBatchedModificationCount(1); ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), WRITE_ONLY); NormalizedNode nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME); expectBatchedModifications(actorRef, 1); - expectReadyTransaction(actorRef); TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, WRITE_ONLY); transactionProxy.write(TestModel.TEST_PATH, nodeToWrite); - // This sends the batched modification. - transactionProxy.ready(); - - verifyOneBatchedModification(actorRef, new WriteModification(TestModel.TEST_PATH, nodeToWrite)); - - verifyRecordingOperationFutures(transactionProxy.getRecordedOperationFutures(), - BatchedModificationsReply.class); + verifyOneBatchedModification(actorRef, new WriteModification(TestModel.TEST_PATH, nodeToWrite), false); } @Test @@ -456,7 +452,7 @@ public class TransactionProxyTest extends AbstractTransactionProxyTest { // This sends the batched modification. transactionProxy.ready(); - verifyOneBatchedModification(actorRef, new WriteModification(TestModel.TEST_PATH, nodeToWrite)); + verifyOneBatchedModification(actorRef, new WriteModification(TestModel.TEST_PATH, nodeToWrite), false); verifyRecordingOperationFutures(transactionProxy.getRecordedOperationFutures(), BatchedModificationsReply.class); @@ -479,48 +475,36 @@ public class TransactionProxyTest extends AbstractTransactionProxyTest { @Test public void testMerge() throws Exception { + dataStoreContextBuilder.shardBatchedModificationCount(1); ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), WRITE_ONLY); NormalizedNode nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME); expectBatchedModifications(actorRef, 1); - expectReadyTransaction(actorRef); TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, WRITE_ONLY); transactionProxy.merge(TestModel.TEST_PATH, nodeToWrite); - // This sends the batched modification. - transactionProxy.ready(); - - verifyOneBatchedModification(actorRef, new MergeModification(TestModel.TEST_PATH, nodeToWrite)); - - verifyRecordingOperationFutures(transactionProxy.getRecordedOperationFutures(), - BatchedModificationsReply.class); + verifyOneBatchedModification(actorRef, new MergeModification(TestModel.TEST_PATH, nodeToWrite), false); } @Test public void testDelete() throws Exception { + dataStoreContextBuilder.shardBatchedModificationCount(1); ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), WRITE_ONLY); expectBatchedModifications(actorRef, 1); - expectReadyTransaction(actorRef); TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, WRITE_ONLY); transactionProxy.delete(TestModel.TEST_PATH); - // This sends the batched modification. - transactionProxy.ready(); - - verifyOneBatchedModification(actorRef, new DeleteModification(TestModel.TEST_PATH)); - - verifyRecordingOperationFutures(transactionProxy.getRecordedOperationFutures(), - BatchedModificationsReply.class); + verifyOneBatchedModification(actorRef, new DeleteModification(TestModel.TEST_PATH), false); } @Test - public void testReady() throws Exception { + public void testReadyWithReadWrite() throws Exception { ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_WRITE); NormalizedNode nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME); @@ -550,18 +534,91 @@ public class TransactionProxyTest extends AbstractTransactionProxyTest { verify(mockActorContext).executeOperationAsync(eq(actorSelection(actorRef)), isA(BatchedModifications.class)); + + verify(mockActorContext).executeOperationAsync(eq(actorSelection(actorRef)), + isA(ReadyTransaction.SERIALIZABLE_CLASS)); + } + + @Test + public void testReadyWithWriteOnlyAndLastBatchPending() throws Exception { + dataStoreContextBuilder.writeOnlyTransactionOptimizationsEnabled(true); + + ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), WRITE_ONLY); + + NormalizedNode nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME); + + expectBatchedModificationsReady(actorRef, 1); + + TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, WRITE_ONLY); + + transactionProxy.write(TestModel.TEST_PATH, nodeToWrite); + + DOMStoreThreePhaseCommitCohort ready = transactionProxy.ready(); + + assertTrue(ready instanceof ThreePhaseCommitCohortProxy); + + ThreePhaseCommitCohortProxy proxy = (ThreePhaseCommitCohortProxy) ready; + + verifyRecordingOperationFutures(transactionProxy.getRecordedOperationFutures()); + + verifyCohortFutures(proxy, getSystem().actorSelection(actorRef.path())); + + List batchedModifications = captureBatchedModifications(actorRef); + assertEquals("Captured BatchedModifications count", 1, batchedModifications.size()); + + verifyBatchedModifications(batchedModifications.get(0), true, + new WriteModification(TestModel.TEST_PATH, nodeToWrite)); + + verify(mockActorContext, never()).executeOperationAsync(eq(actorSelection(actorRef)), + isA(ReadyTransaction.SERIALIZABLE_CLASS)); + } + + @Test + public void testReadyWithWriteOnlyAndLastBatchEmpty() throws Exception { + dataStoreContextBuilder.shardBatchedModificationCount(1).writeOnlyTransactionOptimizationsEnabled(true); + ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), WRITE_ONLY); + + NormalizedNode nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME); + + expectBatchedModificationsReady(actorRef, 1); + + TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, WRITE_ONLY); + + transactionProxy.write(TestModel.TEST_PATH, nodeToWrite); + + DOMStoreThreePhaseCommitCohort ready = transactionProxy.ready(); + + assertTrue(ready instanceof ThreePhaseCommitCohortProxy); + + ThreePhaseCommitCohortProxy proxy = (ThreePhaseCommitCohortProxy) ready; + + verifyRecordingOperationFutures(transactionProxy.getRecordedOperationFutures(), + BatchedModificationsReply.class); + + verifyCohortFutures(proxy, getSystem().actorSelection(actorRef.path())); + + List batchedModifications = captureBatchedModifications(actorRef); + assertEquals("Captured BatchedModifications count", 2, batchedModifications.size()); + + verifyBatchedModifications(batchedModifications.get(0), false, + new WriteModification(TestModel.TEST_PATH, nodeToWrite)); + + verifyBatchedModifications(batchedModifications.get(1), true); + + verify(mockActorContext, never()).executeOperationAsync(eq(actorSelection(actorRef)), + isA(ReadyTransaction.SERIALIZABLE_CLASS)); } @Test public void testReadyWithRecordingOperationFailure() throws Exception { + dataStoreContextBuilder.shardBatchedModificationCount(1).writeOnlyTransactionOptimizationsEnabled(true); + ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), WRITE_ONLY); NormalizedNode nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME); expectFailedBatchedModifications(actorRef); - expectReadyTransaction(actorRef); - doReturn(false).when(mockActorContext).isPathLocal(actorRef.path().toString()); TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, WRITE_ONLY); @@ -581,15 +638,13 @@ public class TransactionProxyTest extends AbstractTransactionProxyTest { @Test public void testReadyWithReplyFailure() throws Exception { + dataStoreContextBuilder.writeOnlyTransactionOptimizationsEnabled(true); + ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), WRITE_ONLY); NormalizedNode nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME); - expectBatchedModifications(actorRef, 1); - - doReturn(Futures.failed(new TestException())).when(mockActorContext). - executeOperationAsync(eq(actorSelection(actorRef)), - isA(ReadyTransaction.SERIALIZABLE_CLASS)); + expectFailedBatchedModifications(actorRef); TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, WRITE_ONLY); @@ -601,9 +656,6 @@ public class TransactionProxyTest extends AbstractTransactionProxyTest { ThreePhaseCommitCohortProxy proxy = (ThreePhaseCommitCohortProxy) ready; - verifyRecordingOperationFutures(transactionProxy.getRecordedOperationFutures(), - BatchedModificationsReply.class); - verifyCohortFutures(proxy, TestException.class); } @@ -634,15 +686,16 @@ public class TransactionProxyTest extends AbstractTransactionProxyTest { @Test public void testReadyWithInvalidReplyMessageType() throws Exception { + dataStoreContextBuilder.writeOnlyTransactionOptimizationsEnabled(true); ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), WRITE_ONLY); NormalizedNode nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME); - expectBatchedModifications(actorRef, 1); + //expectBatchedModifications(actorRef, 1); doReturn(Futures.successful(new Object())).when(mockActorContext). executeOperationAsync(eq(actorSelection(actorRef)), - isA(ReadyTransaction.SERIALIZABLE_CLASS)); + isA(BatchedModifications.class)); TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, WRITE_ONLY); @@ -657,17 +710,6 @@ public class TransactionProxyTest extends AbstractTransactionProxyTest { verifyCohortFutures(proxy, IllegalArgumentException.class); } - @Test - public void testUnusedTransaction() throws Exception { - TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, READ_WRITE); - - DOMStoreThreePhaseCommitCohort ready = transactionProxy.ready(); - - assertEquals("canCommit", true, ready.canCommit().get()); - ready.preCommit().get(); - ready.commit().get(); - } - @Test public void testGetIdentifier() { setupActorContextWithInitialCreateTransaction(getSystem(), READ_ONLY); @@ -711,24 +753,8 @@ public class TransactionProxyTest extends AbstractTransactionProxyTest { */ @Test public void testLocalTxActorRead() throws Exception { - ActorSystem actorSystem = getSystem(); - ActorRef shardActorRef = actorSystem.actorOf(Props.create(DoNothingActor.class)); - - doReturn(actorSystem.actorSelection(shardActorRef.path())). - when(mockActorContext).actorSelection(shardActorRef.path().toString()); - - doReturn(Futures.successful(actorSystem.actorSelection(shardActorRef.path()))). - when(mockActorContext).findPrimaryShardAsync(eq(DefaultShardStrategy.DEFAULT_SHARD)); - - String actorPath = "akka.tcp://system@127.0.0.1:2550/user/tx-actor"; - CreateTransactionReply createTransactionReply = CreateTransactionReply.newBuilder() - .setTransactionId("txn-1").setTransactionActorPath(actorPath).build(); - - doReturn(Futures.successful(createTransactionReply)).when(mockActorContext). - executeOperationAsync(eq(actorSystem.actorSelection(shardActorRef.path())), - eqCreateTransaction(memberName, READ_ONLY)); - - doReturn(true).when(mockActorContext).isPathLocal(actorPath); + setupActorContextWithInitialCreateTransaction(getSystem(), READ_ONLY); + doReturn(true).when(mockActorContext).isPathLocal(anyString()); TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,READ_ONLY); @@ -764,40 +790,20 @@ public class TransactionProxyTest extends AbstractTransactionProxyTest { @Test public void testLocalTxActorReady() throws Exception { - ActorSystem actorSystem = getSystem(); - ActorRef shardActorRef = actorSystem.actorOf(Props.create(DoNothingActor.class)); - - doReturn(actorSystem.actorSelection(shardActorRef.path())). - when(mockActorContext).actorSelection(shardActorRef.path().toString()); - - doReturn(Futures.successful(actorSystem.actorSelection(shardActorRef.path()))). - when(mockActorContext).findPrimaryShardAsync(eq(DefaultShardStrategy.DEFAULT_SHARD)); - - String actorPath = "akka.tcp://system@127.0.0.1:2550/user/tx-actor"; - CreateTransactionReply createTransactionReply = CreateTransactionReply.newBuilder(). - setTransactionId("txn-1").setTransactionActorPath(actorPath). - setMessageVersion(DataStoreVersions.CURRENT_VERSION).build(); - - doReturn(Futures.successful(createTransactionReply)).when(mockActorContext). - executeOperationAsync(eq(actorSystem.actorSelection(shardActorRef.path())), - eqCreateTransaction(memberName, WRITE_ONLY)); - - doReturn(true).when(mockActorContext).isPathLocal(actorPath); + ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_WRITE); + doReturn(true).when(mockActorContext).isPathLocal(anyString()); doReturn(batchedModificationsReply(1)).when(mockActorContext).executeOperationAsync( any(ActorSelection.class), isA(BatchedModifications.class)); - TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, WRITE_ONLY); + TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, READ_WRITE); NormalizedNode nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME); transactionProxy.write(TestModel.TEST_PATH, nodeToWrite); - verifyRecordingOperationFutures(transactionProxy.getRecordedOperationFutures(), - BatchedModificationsReply.class); - // testing ready - doReturn(readyTxReply(shardActorRef.path().toString())).when(mockActorContext).executeOperationAsync( - any(ActorSelection.class), isA(ReadyTransaction.class)); + doReturn(readyTxReply(actorRef.path().toString())).when(mockActorContext).executeOperationAsync( + eq(actorSelection(actorRef)), isA(ReadyTransaction.class)); DOMStoreThreePhaseCommitCohort ready = transactionProxy.ready(); @@ -805,7 +811,7 @@ public class TransactionProxyTest extends AbstractTransactionProxyTest { ThreePhaseCommitCohortProxy proxy = (ThreePhaseCommitCohortProxy) ready; - verifyCohortFutures(proxy, getSystem().actorSelection(shardActorRef.path())); + verifyCohortFutures(proxy, getSystem().actorSelection(actorRef.path())); } private static interface TransactionProxyOperation { @@ -875,20 +881,23 @@ public class TransactionProxyTest extends AbstractTransactionProxyTest { doReturn(Futures.successful(actorSystem.actorSelection(shardActorRef.path()))). when(mockActorContext).findPrimaryShardAsync(eq(DefaultShardStrategy.DEFAULT_SHARD)); } else { - doReturn(Futures.failed(new Exception("not found"))) + doReturn(Futures.failed(new PrimaryNotFoundException("test"))) .when(mockActorContext).findPrimaryShardAsync(eq(DefaultShardStrategy.DEFAULT_SHARD)); } - String actorPath = "akka.tcp://system@127.0.0.1:2550/user/tx-actor"; + ActorRef txActorRef = actorSystem.actorOf(Props.create(DoNothingActor.class)); + String actorPath = txActorRef.path().toString(); CreateTransactionReply createTransactionReply = CreateTransactionReply.newBuilder(). setTransactionId("txn-1").setTransactionActorPath(actorPath). setMessageVersion(DataStoreVersions.CURRENT_VERSION).build(); + doReturn(actorSystem.actorSelection(actorPath)).when(mockActorContext).actorSelection(actorPath); + doReturn(Futures.successful(createTransactionReply)).when(mockActorContext). executeOperationAsync(eq(actorSystem.actorSelection(shardActorRef.path())), eqCreateTransaction(memberName, READ_WRITE)); - doReturn(true).when(mockActorContext).isPathLocal(actorPath); + doReturn(true).when(mockActorContext).isPathLocal(anyString()); TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, READ_WRITE); @@ -921,6 +930,7 @@ public class TransactionProxyTest extends AbstractTransactionProxyTest { @Test public void testWriteThrottlingWhenShardFound(){ + dataStoreContextBuilder.shardBatchedModificationCount(1); throttleOperation(new TransactionProxyOperation() { @Override public void run(TransactionProxy transactionProxy) { @@ -938,6 +948,7 @@ public class TransactionProxyTest extends AbstractTransactionProxyTest { @Test public void testWriteThrottlingWhenShardNotFound(){ // Confirm that there is no throttling when the Shard is not found + dataStoreContextBuilder.shardBatchedModificationCount(1); completeOperation(new TransactionProxyOperation() { @Override public void run(TransactionProxy transactionProxy) { @@ -956,6 +967,7 @@ public class TransactionProxyTest extends AbstractTransactionProxyTest { @Test public void testWriteCompletion(){ + dataStoreContextBuilder.shardBatchedModificationCount(1); completeOperation(new TransactionProxyOperation() { @Override public void run(TransactionProxy transactionProxy) { @@ -972,7 +984,7 @@ public class TransactionProxyTest extends AbstractTransactionProxyTest { @Test public void testMergeThrottlingWhenShardFound(){ - + dataStoreContextBuilder.shardBatchedModificationCount(1); throttleOperation(new TransactionProxyOperation() { @Override public void run(TransactionProxy transactionProxy) { @@ -989,7 +1001,7 @@ public class TransactionProxyTest extends AbstractTransactionProxyTest { @Test public void testMergeThrottlingWhenShardNotFound(){ - + dataStoreContextBuilder.shardBatchedModificationCount(1); completeOperation(new TransactionProxyOperation() { @Override public void run(TransactionProxy transactionProxy) { @@ -1006,6 +1018,7 @@ public class TransactionProxyTest extends AbstractTransactionProxyTest { @Test public void testMergeCompletion(){ + dataStoreContextBuilder.shardBatchedModificationCount(1); completeOperation(new TransactionProxyOperation() { @Override public void run(TransactionProxy transactionProxy) { @@ -1054,6 +1067,7 @@ public class TransactionProxyTest extends AbstractTransactionProxyTest { @Test public void testDeleteCompletion(){ + dataStoreContextBuilder.shardBatchedModificationCount(1); completeOperation(new TransactionProxyOperation() { @Override public void run(TransactionProxy transactionProxy) { @@ -1210,13 +1224,11 @@ public class TransactionProxyTest extends AbstractTransactionProxyTest { }, 2, true); } - @Test - public void testModificationOperationBatching() throws Throwable { + private void testModificationOperationBatching(TransactionType type) throws Exception { int shardBatchedModificationCount = 3; - doReturn(dataStoreContextBuilder.shardBatchedModificationCount(shardBatchedModificationCount).build()). - when(mockActorContext).getDatastoreContext(); + dataStoreContextBuilder.shardBatchedModificationCount(shardBatchedModificationCount); - ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_WRITE); + ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), type); expectBatchedModifications(actorRef, shardBatchedModificationCount); @@ -1243,7 +1255,7 @@ public class TransactionProxyTest extends AbstractTransactionProxyTest { YangInstanceIdentifier deletePath1 = TestModel.TEST_PATH; YangInstanceIdentifier deletePath2 = TestModel.OUTER_LIST_PATH; - TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, READ_WRITE); + TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, type); transactionProxy.write(writePath1, writeNode1); transactionProxy.write(writePath2, writeNode2); @@ -1260,24 +1272,46 @@ public class TransactionProxyTest extends AbstractTransactionProxyTest { List batchedModifications = captureBatchedModifications(actorRef); assertEquals("Captured BatchedModifications count", 3, batchedModifications.size()); - verifyBatchedModifications(batchedModifications.get(0), new WriteModification(writePath1, writeNode1), + verifyBatchedModifications(batchedModifications.get(0), false, new WriteModification(writePath1, writeNode1), new WriteModification(writePath2, writeNode2), new DeleteModification(deletePath1)); - verifyBatchedModifications(batchedModifications.get(1), new MergeModification(mergePath1, mergeNode1), + verifyBatchedModifications(batchedModifications.get(1), false, new MergeModification(mergePath1, mergeNode1), new MergeModification(mergePath2, mergeNode2), new WriteModification(writePath3, writeNode3)); - verifyBatchedModifications(batchedModifications.get(2), new MergeModification(mergePath3, mergeNode3), + boolean optimizedWriteOnly = type == WRITE_ONLY && dataStoreContextBuilder.build().isWriteOnlyTransactionOptimizationsEnabled(); + verifyBatchedModifications(batchedModifications.get(2), optimizedWriteOnly, new MergeModification(mergePath3, mergeNode3), new DeleteModification(deletePath2)); - verifyRecordingOperationFutures(transactionProxy.getRecordedOperationFutures(), - BatchedModificationsReply.class, BatchedModificationsReply.class, BatchedModificationsReply.class); + if(optimizedWriteOnly) { + verifyRecordingOperationFutures(transactionProxy.getRecordedOperationFutures(), + BatchedModificationsReply.class, BatchedModificationsReply.class); + } else { + verifyRecordingOperationFutures(transactionProxy.getRecordedOperationFutures(), + BatchedModificationsReply.class, BatchedModificationsReply.class, BatchedModificationsReply.class); + } + } + + @Test + public void testReadWriteModificationOperationBatching() throws Throwable { + testModificationOperationBatching(READ_WRITE); + } + + @Test + public void testWriteOnlyModificationOperationBatching() throws Throwable { + testModificationOperationBatching(WRITE_ONLY); + } + + @Test + public void testOptimizedWriteOnlyModificationOperationBatching() throws Throwable { + dataStoreContextBuilder.writeOnlyTransactionOptimizationsEnabled(true); + testModificationOperationBatching(WRITE_ONLY); } @Test public void testModificationOperationBatchingWithInterleavedReads() throws Throwable { + int shardBatchedModificationCount = 10; - doReturn(dataStoreContextBuilder.shardBatchedModificationCount(shardBatchedModificationCount).build()). - when(mockActorContext).getDatastoreContext(); + dataStoreContextBuilder.shardBatchedModificationCount(shardBatchedModificationCount); ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_WRITE); @@ -1333,13 +1367,13 @@ public class TransactionProxyTest extends AbstractTransactionProxyTest { List batchedModifications = captureBatchedModifications(actorRef); assertEquals("Captured BatchedModifications count", 3, batchedModifications.size()); - verifyBatchedModifications(batchedModifications.get(0), new WriteModification(writePath1, writeNode1), + verifyBatchedModifications(batchedModifications.get(0), false, new WriteModification(writePath1, writeNode1), new WriteModification(writePath2, writeNode2)); - verifyBatchedModifications(batchedModifications.get(1), new MergeModification(mergePath1, mergeNode1), + verifyBatchedModifications(batchedModifications.get(1), false, new MergeModification(mergePath1, mergeNode1), new MergeModification(mergePath2, mergeNode2)); - verifyBatchedModifications(batchedModifications.get(2), new DeleteModification(deletePath)); + verifyBatchedModifications(batchedModifications.get(2), false, new DeleteModification(deletePath)); InOrder inOrder = Mockito.inOrder(mockActorContext); inOrder.verify(mockActorContext).executeOperationAsync( diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/compat/PreLithiumTransactionProxyTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/compat/PreLithiumTransactionProxyTest.java index 08c32c9a54..2980f83564 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/compat/PreLithiumTransactionProxyTest.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/compat/PreLithiumTransactionProxyTest.java @@ -15,10 +15,12 @@ import static org.mockito.Matchers.isA; import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.verify; import static org.opendaylight.controller.cluster.datastore.TransactionProxy.TransactionType.READ_WRITE; +import static org.opendaylight.controller.cluster.datastore.TransactionProxy.TransactionType.WRITE_ONLY; import akka.actor.ActorRef; import akka.dispatch.Futures; import com.google.common.base.Optional; import java.util.concurrent.TimeUnit; +import org.junit.Ignore; import org.junit.Test; import org.mockito.ArgumentMatcher; import org.mockito.Mockito; @@ -154,4 +156,36 @@ public class PreLithiumTransactionProxyTest extends AbstractTransactionProxyTest verify(mockActorContext, Mockito.never()).resolvePath(eq(actorRef.path().toString()), eq(actorRef.path().toString())); } + + @Test + @Ignore + // FIXME: disabled until we can get the primary shard version from the ShardManager as we now skip + // creating transaction actors for write-only Tx's. + public void testWriteOnlyCompatibilityWithHeliumR2Version() throws Exception { + short version = DataStoreVersions.HELIUM_2_VERSION; + ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), WRITE_ONLY, version); + + NormalizedNode testNode = ImmutableNodes.containerNode(TestModel.TEST_QNAME); + + doReturn(Futures.successful(new WriteDataReply().toSerializable(version))).when(mockActorContext). + executeOperationAsync(eq(actorSelection(actorRef)), eqLegacyWriteData(testNode)); + + doReturn(readySerializedTxReply(actorRef.path().toString())).when(mockActorContext).executeOperationAsync( + eq(actorSelection(actorRef)), isA(ReadyTransaction.SERIALIZABLE_CLASS)); + + doReturn(actorRef.path().toString()).when(mockActorContext).resolvePath(eq(actorRef.path().toString()), + eq(actorRef.path().toString())); + + TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, WRITE_ONLY); + + transactionProxy.write(TestModel.TEST_PATH, testNode); + + DOMStoreThreePhaseCommitCohort ready = transactionProxy.ready(); + + assertTrue(ready instanceof ThreePhaseCommitCohortProxy); + + ThreePhaseCommitCohortProxy proxy = (ThreePhaseCommitCohortProxy) ready; + + verifyCohortFutures(proxy, getSystem().actorSelection(actorRef.path())); + } } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/messages/BatchedModificationsTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/messages/BatchedModificationsTest.java index 15d2eea598..c4027ad2a5 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/messages/BatchedModificationsTest.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/messages/BatchedModificationsTest.java @@ -41,15 +41,19 @@ public class BatchedModificationsTest { YangInstanceIdentifier deletePath = TestModel.TEST_PATH; - BatchedModifications batched = new BatchedModifications(DataStoreVersions.CURRENT_VERSION); + BatchedModifications batched = new BatchedModifications("tx1", DataStoreVersions.CURRENT_VERSION, "txChain"); batched.addModification(new WriteModification(writePath, writeData)); batched.addModification(new MergeModification(mergePath, mergeData)); batched.addModification(new DeleteModification(deletePath)); + batched.setReady(true); BatchedModifications clone = (BatchedModifications) SerializationUtils.clone( (Serializable) batched.toSerializable()); assertEquals("getVersion", DataStoreVersions.CURRENT_VERSION, clone.getVersion()); + assertEquals("getTransactionID", "tx1", clone.getTransactionID()); + assertEquals("getTransactionChainID", "txChain", clone.getTransactionChainID()); + assertEquals("isReady", true, clone.isReady()); assertEquals("getModifications size", 3, clone.getModifications().size()); @@ -66,6 +70,20 @@ public class BatchedModificationsTest { DeleteModification delete = (DeleteModification)clone.getModifications().get(2); assertEquals("getVersion", DataStoreVersions.CURRENT_VERSION, delete.getVersion()); assertEquals("getPath", deletePath, delete.getPath()); + + // Test with different params. + + batched = new BatchedModifications("tx2", (short)10, null); + + clone = (BatchedModifications) SerializationUtils.clone((Serializable) batched.toSerializable()); + + assertEquals("getVersion", 10, clone.getVersion()); + assertEquals("getTransactionID", "tx2", clone.getTransactionID()); + assertEquals("getTransactionChainID", "", clone.getTransactionChainID()); + assertEquals("isReady", false, clone.isReady()); + + assertEquals("getModifications size", 0, clone.getModifications().size()); + } @Test @@ -73,5 +91,11 @@ public class BatchedModificationsTest { BatchedModificationsReply clone = (BatchedModificationsReply) SerializationUtils.clone( (Serializable) new BatchedModificationsReply(100).toSerializable()); assertEquals("getNumBatched", 100, clone.getNumBatched()); + assertEquals("getCohortPath", null, clone.getCohortPath()); + + clone = (BatchedModificationsReply) SerializationUtils.clone( + (Serializable) new BatchedModificationsReply(50, "cohort path").toSerializable()); + assertEquals("getNumBatched", 50, clone.getNumBatched()); + assertEquals("getCohortPath", "cohort path", clone.getCohortPath()); } }