--- /dev/null
+/*
+ * Copyright (c) 2015 Brocade Communications Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.datastore;
+
+import java.util.HashMap;
+import java.util.Map;
+import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shard.ShardStats;
+import org.opendaylight.controller.md.sal.dom.store.impl.InMemoryDOMDataStore;
+import org.opendaylight.controller.sal.core.spi.data.DOMStoreTransaction;
+import org.opendaylight.controller.sal.core.spi.data.DOMStoreTransactionChain;
+import org.opendaylight.controller.sal.core.spi.data.DOMStoreTransactionFactory;
+import org.slf4j.Logger;
+
+/**
+ * A factory for creating DOM transactions, either normal or chained.
+ *
+ * @author Thomas Pantelis
+ */
+public class DOMTransactionFactory {
+
+ private final Map<String, DOMStoreTransactionChain> transactionChains = new HashMap<>();
+ private final InMemoryDOMDataStore store;
+ private final ShardStats shardMBean;
+ private final Logger log;
+ private final String name;
+
+ public DOMTransactionFactory(InMemoryDOMDataStore store, ShardStats shardMBean, Logger log, String name) {
+ this.store = store;
+ this.shardMBean = shardMBean;
+ this.log = log;
+ this.name = name;
+ }
+
+ @SuppressWarnings("unchecked")
+ public <T extends DOMStoreTransaction> T newTransaction(TransactionProxy.TransactionType type,
+ String transactionID, String transactionChainID) {
+
+ DOMStoreTransactionFactory factory = store;
+
+ if(!transactionChainID.isEmpty()) {
+ factory = transactionChains.get(transactionChainID);
+ if(factory == null) {
+ if(log.isDebugEnabled()) {
+ log.debug("{}: Creating transaction with ID {} from chain {}", name, transactionID,
+ transactionChainID);
+ }
+
+ DOMStoreTransactionChain transactionChain = store.createTransactionChain();
+ transactionChains.put(transactionChainID, transactionChain);
+ factory = transactionChain;
+ }
+ } else {
+ log.debug("{}: Creating transaction with ID {}", name, transactionID);
+ }
+
+ T transaction = null;
+ switch(type) {
+ case READ_ONLY:
+ transaction = (T) factory.newReadOnlyTransaction();
+ shardMBean.incrementReadOnlyTransactionCount();
+ break;
+ case READ_WRITE:
+ transaction = (T) factory.newReadWriteTransaction();
+ shardMBean.incrementReadWriteTransactionCount();
+ break;
+ case WRITE_ONLY:
+ transaction = (T) factory.newWriteOnlyTransaction();
+ shardMBean.incrementWriteOnlyTransactionCount();
+ break;
+ }
+
+ return transaction;
+ }
+
+ public void closeTransactionChain(String transactionChainID) {
+ DOMStoreTransactionChain chain =
+ transactionChains.remove(transactionChainID);
+
+ if(chain != null) {
+ chain.close();
+ }
+ }
+
+ public void closeAllTransactionChains() {
+ for(Map.Entry<String, DOMStoreTransactionChain> entry : transactionChains.entrySet()){
+ entry.getValue().close();
+ }
+
+ transactionChains.clear();
+ }
+}
private final DefaultConfigParamsImpl raftConfig = new DefaultConfigParamsImpl();
private String dataStoreType = UNKNOWN_DATA_STORE_TYPE;
private int shardBatchedModificationCount = DEFAULT_SHARD_BATCHED_MODIFICATION_COUNT;
+ private boolean writeOnlyTransactionOptimizationsEnabled = false;
private DatastoreContext() {
setShardJournalRecoveryLogBatchSize(DEFAULT_JOURNAL_RECOVERY_BATCH_SIZE);
this.transactionCreationInitialRateLimit = other.transactionCreationInitialRateLimit;
this.dataStoreType = other.dataStoreType;
this.shardBatchedModificationCount = other.shardBatchedModificationCount;
+ this.writeOnlyTransactionOptimizationsEnabled = other.writeOnlyTransactionOptimizationsEnabled;
setShardJournalRecoveryLogBatchSize(other.raftConfig.getJournalRecoveryLogBatchSize());
setSnapshotBatchCount(other.raftConfig.getSnapshotBatchCount());
return shardBatchedModificationCount;
}
+ public boolean isWriteOnlyTransactionOptimizationsEnabled() {
+ return writeOnlyTransactionOptimizationsEnabled;
+ }
+
public static class Builder {
private final DatastoreContext datastoreContext;
private int maxShardDataChangeExecutorPoolSize =
return this;
}
+ public Builder writeOnlyTransactionOptimizationsEnabled(boolean value) {
+ datastoreContext.writeOnlyTransactionOptimizationsEnabled = value;
+ return this;
+ }
+
public Builder maxShardDataChangeExecutorPoolSize(int maxShardDataChangeExecutorPoolSize) {
this.maxShardDataChangeExecutorPoolSize = maxShardDataChangeExecutorPoolSize;
return this;
import org.opendaylight.controller.cluster.datastore.messages.AbortTransaction;
import org.opendaylight.controller.cluster.datastore.messages.AbortTransactionReply;
import org.opendaylight.controller.cluster.datastore.messages.ActorInitialized;
+import org.opendaylight.controller.cluster.datastore.messages.BatchedModifications;
+import org.opendaylight.controller.cluster.datastore.messages.BatchedModificationsReply;
import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransaction;
import org.opendaylight.controller.cluster.datastore.messages.CloseTransactionChain;
import org.opendaylight.controller.cluster.datastore.messages.CommitTransaction;
import org.opendaylight.controller.md.sal.dom.store.impl.InMemoryDOMDataStoreFactory;
import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
import org.opendaylight.controller.sal.core.spi.data.DOMStoreTransaction;
-import org.opendaylight.controller.sal.core.spi.data.DOMStoreTransactionChain;
-import org.opendaylight.controller.sal.core.spi.data.DOMStoreTransactionFactory;
import org.opendaylight.controller.sal.core.spi.data.DOMStoreWriteTransaction;
import org.opendaylight.yangtools.concepts.ListenerRegistration;
import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
private final InMemoryDOMDataStore store;
/// The name of this shard
- private final ShardIdentifier name;
+ private final String name;
private final ShardStats shardMBean;
private ShardRecoveryCoordinator recoveryCoordinator;
private List<Object> currentLogRecoveryBatch;
- private final Map<String, DOMStoreTransactionChain> transactionChains = new HashMap<>();
+ private final DOMTransactionFactory transactionFactory;
private final String txnDispatcherPath;
super(name.toString(), mapPeerAddresses(peerAddresses),
Optional.of(datastoreContext.getShardRaftConfig()));
- this.name = name;
+ this.name = name.toString();
this.datastoreContext = datastoreContext;
this.schemaContext = schemaContext;
this.dataPersistenceProvider = (datastoreContext.isPersistent())
getContext().become(new MeteringBehavior(this));
}
- commitCoordinator = new ShardCommitCoordinator(TimeUnit.SECONDS.convert(1, TimeUnit.MINUTES),
- datastoreContext.getShardTransactionCommitQueueCapacity(), LOG, name.toString());
+ transactionFactory = new DOMTransactionFactory(store, shardMBean, LOG, this.name);
+
+ commitCoordinator = new ShardCommitCoordinator(transactionFactory,
+ TimeUnit.SECONDS.convert(5, TimeUnit.MINUTES),
+ datastoreContext.getShardTransactionCommitQueueCapacity(), self(), LOG, this.name);
setTransactionCommitTimeout();
try {
if (CreateTransaction.SERIALIZABLE_CLASS.isInstance(message)) {
handleCreateTransaction(message);
+ } else if (BatchedModifications.class.isInstance(message)) {
+ handleBatchedModifications((BatchedModifications)message);
} else if (message instanceof ForwardedReadyTransaction) {
handleForwardedReadyTransaction((ForwardedReadyTransaction) message);
} else if (CanCommitTransaction.SERIALIZABLE_CLASS.isInstance(message)) {
commitCoordinator.handleCanCommit(canCommit, getSender(), self());
}
+ private void handleBatchedModifications(BatchedModifications batched) {
+ // This message is sent to prepare the modificationsa transaction directly on the Shard as an
+ // optimization to avoid the extra overhead of a separate ShardTransaction actor. On the last
+ // BatchedModifications message, the caller sets the ready flag in the message indicating
+ // modifications are complete. The reply contains the cohort actor path (this actor) for the caller
+ // to initiate the 3-phase commit. This also avoids the overhead of sending an additional
+ // ReadyTransaction message.
+
+ // If we're not the leader then forward to the leader. This is a safety measure - we shouldn't
+ // normally get here if we're not the leader as the front-end (TransactionProxy) should determine
+ // the primary/leader shard. However with timing and caching on the front-end, there's a small
+ // window where it could have a stale leader during leadership transitions.
+ //
+ if(isLeader()) {
+ try {
+ BatchedModificationsReply reply = commitCoordinator.handleTransactionModifications(batched);
+ sender().tell(reply, self());
+ } catch (Exception e) {
+ LOG.error("{}: Error handling BatchedModifications for Tx {}", persistenceId(),
+ batched.getTransactionID(), e);
+ getSender().tell(new akka.actor.Status.Failure(e), getSelf());
+ }
+ } else {
+ ActorSelection leader = getLeader();
+ if(leader != null) {
+ // TODO: what if this is not the first batch and leadership changed in between batched messages?
+ // We could check if the commitCoordinator already has a cached entry and forward all the previous
+ // batched modifications.
+ LOG.debug("{}: Forwarding BatchedModifications to leader {}", persistenceId(), leader);
+ leader.forward(batched, getContext());
+ } else {
+ // TODO: rather than throwing an immediate exception, we could schedule a timer to try again to make
+ // it more resilient in case we're in the process of electing a new leader.
+ getSender().tell(new akka.actor.Status.Failure(new NoShardLeaderException(String.format(
+ "Could not find the leader for shard %s. This typically happens" +
+ " when the system is coming up or recovering and a leader is being elected. Try again" +
+ " later.", persistenceId()))), getSelf());
+ }
+ }
+ }
+
private void handleForwardedReadyTransaction(ForwardedReadyTransaction ready) {
LOG.debug("{}: Readying transaction {}, client version {}", persistenceId(),
ready.getTransactionID(), ready.getTxnClientVersion());
// commitCoordinator in preparation for the subsequent three phase commit initiated by
// the front-end.
commitCoordinator.transactionReady(ready.getTransactionID(), ready.getCohort(),
- ready.getModification());
+ (MutableCompositeModification) ready.getModification());
// Return our actor path as we'll handle the three phase commit, except if the Tx client
// version < 1 (Helium-1 version). This means the Tx was initiated by a base Helium version
}
private void closeTransactionChain(final CloseTransactionChain closeTransactionChain) {
- DOMStoreTransactionChain chain =
- transactionChains.remove(closeTransactionChain.getTransactionChainId());
-
- if(chain != null) {
- chain.close();
- }
+ transactionFactory.closeTransactionChain(closeTransactionChain.getTransactionChainId());
}
private ActorRef createTypedTransactionActor(int transactionType,
ShardTransactionIdentifier transactionId, String transactionChainId,
short clientVersion ) {
- DOMStoreTransactionFactory factory = store;
-
- if(!transactionChainId.isEmpty()) {
- factory = transactionChains.get(transactionChainId);
- if(factory == null){
- DOMStoreTransactionChain transactionChain = store.createTransactionChain();
- transactionChains.put(transactionChainId, transactionChain);
- factory = transactionChain;
- }
- }
-
- if(this.schemaContext == null) {
- throw new IllegalStateException("SchemaContext is not set");
- }
+ DOMStoreTransaction transaction = transactionFactory.newTransaction(
+ TransactionProxy.TransactionType.fromInt(transactionType), transactionId.toString(),
+ transactionChainId);
- if (transactionType == TransactionProxy.TransactionType.WRITE_ONLY.ordinal()) {
-
- shardMBean.incrementWriteOnlyTransactionCount();
-
- return createShardTransaction(factory.newWriteOnlyTransaction(), transactionId, clientVersion);
-
- } else if (transactionType == TransactionProxy.TransactionType.READ_WRITE.ordinal()) {
-
- shardMBean.incrementReadWriteTransactionCount();
-
- return createShardTransaction(factory.newReadWriteTransaction(), transactionId, clientVersion);
-
- } else if (transactionType == TransactionProxy.TransactionType.READ_ONLY.ordinal()) {
-
- shardMBean.incrementReadOnlyTransactionCount();
-
- return createShardTransaction(factory.newReadOnlyTransaction(), transactionId, clientVersion);
-
- } else {
- throw new IllegalArgumentException(
- "Shard="+name + ":CreateTransaction message has unidentified transaction type="
- + transactionType);
- }
+ return createShardTransaction(transaction, transactionId, clientVersion);
}
private ActorRef createShardTransaction(DOMStoreTransaction transaction, ShardTransactionIdentifier transactionId,
}
// If this actor is no longer the leader close all the transaction chains
- if(!isLeader){
- for(Map.Entry<String, DOMStoreTransactionChain> entry : transactionChains.entrySet()){
- if(LOG.isDebugEnabled()) {
- LOG.debug(
- "{}: onStateChanged: Closing transaction chain {} because shard {} is no longer the leader",
- persistenceId(), entry.getKey(), getId());
- }
- entry.getValue().close();
+ if(!isLeader) {
+ if(LOG.isDebugEnabled()) {
+ LOG.debug(
+ "{}: onStateChanged: Closing all transaction chains because shard {} is no longer the leader",
+ persistenceId(), getId());
}
- transactionChains.clear();
+ transactionFactory.closeAllTransactionChains();
}
}
}
@Override public String persistenceId() {
- return this.name.toString();
+ return this.name;
}
@VisibleForTesting
return dataPersistenceProvider;
}
+ @VisibleForTesting
+ ShardCommitCoordinator getCommitCoordinator() {
+ return commitCoordinator;
+ }
+
+
private static class ShardCreator implements Creator<Shard> {
private static final long serialVersionUID = 1L;
import akka.actor.ActorRef;
import akka.actor.Status;
+import akka.serialization.Serialization;
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;
+import com.google.common.cache.RemovalCause;
+import com.google.common.cache.RemovalListener;
+import com.google.common.cache.RemovalNotification;
import java.util.LinkedList;
import java.util.Queue;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
+import org.opendaylight.controller.cluster.datastore.messages.BatchedModifications;
+import org.opendaylight.controller.cluster.datastore.messages.BatchedModificationsReply;
import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransaction;
import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransactionReply;
-import org.opendaylight.controller.cluster.datastore.modification.CompositeModification;
import org.opendaylight.controller.cluster.datastore.modification.Modification;
+import org.opendaylight.controller.cluster.datastore.modification.MutableCompositeModification;
import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
+import org.opendaylight.controller.sal.core.spi.data.DOMStoreWriteTransaction;
import org.slf4j.Logger;
/**
*/
public class ShardCommitCoordinator {
+ // Interface hook for unit tests to replace or decorate the DOMStoreThreePhaseCommitCohorts.
+ public interface CohortDecorator {
+ DOMStoreThreePhaseCommitCohort decorate(String transactionID, DOMStoreThreePhaseCommitCohort actual);
+ }
+
private final Cache<String, CohortEntry> cohortCache;
private CohortEntry currentCohortEntry;
+ private final DOMTransactionFactory transactionFactory;
+
private final Queue<CohortEntry> queuedCohortEntries;
private int queueCapacity;
private final String name;
- public ShardCommitCoordinator(long cacheExpiryTimeoutInSec, int queueCapacity, Logger log,
- String name) {
- cohortCache = CacheBuilder.newBuilder().expireAfterAccess(
- cacheExpiryTimeoutInSec, TimeUnit.SECONDS).build();
+ private final String shardActorPath;
+
+ private final RemovalListener<String, CohortEntry> cacheRemovalListener =
+ new RemovalListener<String, CohortEntry>() {
+ @Override
+ public void onRemoval(RemovalNotification<String, CohortEntry> notification) {
+ if(notification.getCause() == RemovalCause.EXPIRED) {
+ log.warn("{}: Transaction {} was timed out of the cache", name, notification.getKey());
+ }
+ }
+ };
+
+ // This is a hook for unit tests to replace or decorate the DOMStoreThreePhaseCommitCohorts.
+ private CohortDecorator cohortDecorator;
+
+ public ShardCommitCoordinator(DOMTransactionFactory transactionFactory,
+ long cacheExpiryTimeoutInSec, int queueCapacity, ActorRef shardActor, Logger log, String name) {
this.queueCapacity = queueCapacity;
this.log = log;
this.name = name;
+ this.transactionFactory = transactionFactory;
+
+ shardActorPath = Serialization.serializedActorPath(shardActor);
+
+ cohortCache = CacheBuilder.newBuilder().expireAfterAccess(cacheExpiryTimeoutInSec, TimeUnit.SECONDS).
+ removalListener(cacheRemovalListener).build();
// We use a LinkedList here to avoid synchronization overhead with concurrent queue impls
// since this should only be accessed on the shard's dispatcher.
}
/**
- * This method caches a cohort entry for the given transactions ID in preparation for the
- * subsequent 3-phase commit.
+ * This method is called to ready a transaction that was prepared by ShardTransaction actor. It caches
+ * the prepared cohort entry for the given transactions ID in preparation for the subsequent 3-phase commit.
*
* @param transactionID the ID of the transaction
* @param cohort the cohort to participate in the transaction commit
- * @param modification the modification made by the transaction
+ * @param modification the modifications made by the transaction
*/
public void transactionReady(String transactionID, DOMStoreThreePhaseCommitCohort cohort,
- Modification modification) {
+ MutableCompositeModification modification) {
cohortCache.put(transactionID, new CohortEntry(transactionID, cohort, modification));
}
+ /**
+ * This method handles a BatchedModifications message for a transaction being prepared directly on the
+ * Shard actor instead of via a ShardTransaction actor. If there's no currently cached
+ * DOMStoreWriteTransaction, one is created. The batched modifications are applied to the write Tx. If
+ * the BatchedModifications is ready to commit then a DOMStoreThreePhaseCommitCohort is created.
+ *
+ * @param batched the BatchedModifications
+ * @param shardActor the transaction's shard actor
+ *
+ * @throws ExecutionException if an error occurs loading the cache
+ */
+ public BatchedModificationsReply handleTransactionModifications(BatchedModifications batched)
+ throws ExecutionException {
+ CohortEntry cohortEntry = cohortCache.getIfPresent(batched.getTransactionID());
+ if(cohortEntry == null) {
+ cohortEntry = new CohortEntry(batched.getTransactionID(),
+ transactionFactory.<DOMStoreWriteTransaction>newTransaction(
+ TransactionProxy.TransactionType.WRITE_ONLY, batched.getTransactionID(),
+ batched.getTransactionChainID()));
+ cohortCache.put(batched.getTransactionID(), cohortEntry);
+ }
+
+ if(log.isDebugEnabled()) {
+ log.debug("{}: Applying {} batched modifications for Tx {}", name,
+ batched.getModifications().size(), batched.getTransactionID());
+ }
+
+ cohortEntry.applyModifications(batched.getModifications());
+
+ String cohortPath = null;
+ if(batched.isReady()) {
+ if(log.isDebugEnabled()) {
+ log.debug("{}: Readying Tx {}, client version {}", name,
+ batched.getTransactionID(), batched.getVersion());
+ }
+
+ cohortEntry.ready(cohortDecorator);
+ cohortPath = shardActorPath;
+ }
+
+ return new BatchedModificationsReply(batched.getModifications().size(), cohortPath);
+ }
+
/**
* This method handles the canCommit phase for a transaction.
*
}
}
+ @VisibleForTesting
+ void setCohortDecorator(CohortDecorator cohortDecorator) {
+ this.cohortDecorator = cohortDecorator;
+ }
+
+
static class CohortEntry {
private final String transactionID;
- private final DOMStoreThreePhaseCommitCohort cohort;
- private final Modification modification;
+ private DOMStoreThreePhaseCommitCohort cohort;
+ private final MutableCompositeModification compositeModification;
+ private final DOMStoreWriteTransaction transaction;
private ActorRef canCommitSender;
private ActorRef shard;
private long lastAccessTime;
+ CohortEntry(String transactionID, DOMStoreWriteTransaction transaction) {
+ this.compositeModification = new MutableCompositeModification();
+ this.transaction = transaction;
+ this.transactionID = transactionID;
+ }
+
CohortEntry(String transactionID, DOMStoreThreePhaseCommitCohort cohort,
- Modification modification) {
+ MutableCompositeModification compositeModification) {
this.transactionID = transactionID;
this.cohort = cohort;
- this.modification = modification;
+ this.compositeModification = compositeModification;
+ this.transaction = null;
}
void updateLastAccessTime() {
return cohort;
}
- Modification getModification() {
- return modification;
+ MutableCompositeModification getModification() {
+ return compositeModification;
+ }
+
+ void applyModifications(Iterable<Modification> modifications) {
+ for(Modification modification: modifications) {
+ compositeModification.addModification(modification);
+ modification.apply(transaction);
+ }
+ }
+
+ void ready(CohortDecorator cohortDecorator) {
+ Preconditions.checkState(cohort == null, "cohort was already set");
+
+ cohort = transaction.ready();
+
+ if(cohortDecorator != null) {
+ // Call the hook for unit tests.
+ cohort = cohortDecorator.decorate(transactionID, cohort);
+ }
}
ActorRef getCanCommitSender() {
}
boolean hasModifications(){
- if(modification instanceof CompositeModification){
- return ((CompositeModification) modification).getModifications().size() > 0;
- }
- return true;
+ return compositeModification.getModifications().size() > 0;
}
}
}
import java.util.List;
import org.opendaylight.controller.cluster.datastore.identifiers.TransactionIdentifier;
import org.opendaylight.controller.cluster.datastore.messages.BatchedModifications;
+import org.opendaylight.controller.cluster.datastore.messages.BatchedModificationsReply;
import org.opendaylight.controller.cluster.datastore.messages.CloseTransaction;
import org.opendaylight.controller.cluster.datastore.messages.DataExists;
import org.opendaylight.controller.cluster.datastore.messages.DataExistsReply;
public class TransactionContextImpl extends AbstractTransactionContext {
private static final Logger LOG = LoggerFactory.getLogger(TransactionContextImpl.class);
+ private final String transactionChainId;
private final ActorContext actorContext;
- private final String transactionPath;
private final ActorSelection actor;
private final boolean isTxActorLocal;
private final short remoteTransactionVersion;
private final OperationCompleter operationCompleter;
private BatchedModifications batchedModifications;
- protected TransactionContextImpl(String transactionPath, ActorSelection actor, TransactionIdentifier identifier,
- ActorContext actorContext, SchemaContext schemaContext,
- boolean isTxActorLocal, short remoteTransactionVersion, OperationCompleter operationCompleter) {
+ protected TransactionContextImpl(ActorSelection actor, TransactionIdentifier identifier,
+ String transactionChainId, ActorContext actorContext, SchemaContext schemaContext, boolean isTxActorLocal,
+ short remoteTransactionVersion, OperationCompleter operationCompleter) {
super(identifier);
- this.transactionPath = transactionPath;
this.actor = actor;
+ this.transactionChainId = transactionChainId;
this.actorContext = actorContext;
this.isTxActorLocal = isTxActorLocal;
this.remoteTransactionVersion = remoteTransactionVersion;
return actor;
}
+ protected ActorContext getActorContext() {
+ return actorContext;
+ }
+
protected short getRemoteTransactionVersion() {
return remoteTransactionVersion;
}
// Send the remaining batched modifications if any.
- sendBatchedModifications();
+ sendAndRecordBatchedModifications();
// Send the ReadyTransaction message to the Tx actor.
- final Future<Object> replyFuture = executeOperationAsync(ReadyTransaction.INSTANCE);
+ Future<Object> readyReplyFuture = executeOperationAsync(ReadyTransaction.INSTANCE);
+ return combineRecordedOperationsFutures(readyReplyFuture);
+ }
+
+ protected Future<ActorSelection> combineRecordedOperationsFutures(final Future<Object> withLastReplyFuture) {
// Combine all the previously recorded put/merge/delete operation reply Futures and the
// ReadyTransactionReply Future into one Future. If any one fails then the combined
// Future will fail. We need all prior operations and the ready operation to succeed
// in order to attempt commit.
- List<Future<Object>> futureList =
- Lists.newArrayListWithCapacity(recordedOperationFutures.size() + 1);
+ List<Future<Object>> futureList = Lists.newArrayListWithCapacity(recordedOperationFutures.size() + 1);
futureList.addAll(recordedOperationFutures);
- futureList.add(replyFuture);
+ futureList.add(withLastReplyFuture);
Future<Iterable<Object>> combinedFutures = akka.dispatch.Futures.sequence(futureList,
actorContext.getClientDispatcher());
// de-serializing each reply.
// Note the Future get call here won't block as it's complete.
- Object serializedReadyReply = replyFuture.value().get().get();
+ Object serializedReadyReply = withLastReplyFuture.value().get().get();
if (serializedReadyReply instanceof ReadyTransactionReply) {
return actorContext.actorSelection(((ReadyTransactionReply)serializedReadyReply).getCohortPath());
-
+ } else if(serializedReadyReply instanceof BatchedModificationsReply) {
+ return actorContext.actorSelection(((BatchedModificationsReply)serializedReadyReply).getCohortPath());
} else if(serializedReadyReply.getClass().equals(ReadyTransactionReply.SERIALIZABLE_CLASS)) {
ReadyTransactionReply reply = ReadyTransactionReply.fromSerializable(serializedReadyReply);
- String cohortPath = reply.getCohortPath();
-
- // In Helium we used to return the local path of the actor which represented
- // a remote ThreePhaseCommitCohort. The local path would then be converted to
- // a remote path using this resolvePath method. To maintain compatibility with
- // a Helium node we need to continue to do this conversion.
- // At some point in the future when upgrades from Helium are not supported
- // we could remove this code to resolvePath and just use the cohortPath as the
- // resolved cohortPath
- if(TransactionContextImpl.this.remoteTransactionVersion <
- DataStoreVersions.HELIUM_1_VERSION) {
- cohortPath = actorContext.resolvePath(transactionPath, cohortPath);
- }
-
+ String cohortPath = deserializeCohortPath(reply.getCohortPath());
return actorContext.actorSelection(cohortPath);
-
} else {
// Throwing an exception here will fail the Future.
throw new IllegalArgumentException(String.format("%s: Invalid reply type %s",
}, TransactionProxy.SAME_FAILURE_TRANSFORMER, actorContext.getClientDispatcher());
}
+ protected String deserializeCohortPath(String cohortPath) {
+ return cohortPath;
+ }
+
private void batchModification(Modification modification) {
if(batchedModifications == null) {
- batchedModifications = new BatchedModifications(remoteTransactionVersion);
+ batchedModifications = new BatchedModifications(identifier.toString(), remoteTransactionVersion,
+ transactionChainId);
}
batchedModifications.addModification(modification);
if(batchedModifications.getModifications().size() >=
actorContext.getDatastoreContext().getShardBatchedModificationCount()) {
- sendBatchedModifications();
+ sendAndRecordBatchedModifications();
}
}
- private void sendBatchedModifications() {
+ private void sendAndRecordBatchedModifications() {
+ Future<Object> sentFuture = sendBatchedModifications();
+ if(sentFuture != null) {
+ recordedOperationFutures.add(sentFuture);
+ }
+ }
+
+ protected Future<Object> sendBatchedModifications() {
+ return sendBatchedModifications(false);
+ }
+
+ protected Future<Object> sendBatchedModifications(boolean ready) {
+ Future<Object> sent = null;
if(batchedModifications != null) {
- LOG.debug("Tx {} sending {} batched modifications", identifier,
- batchedModifications.getModifications().size());
+ if(LOG.isDebugEnabled()) {
+ LOG.debug("Tx {} sending {} batched modifications, ready: {}", identifier,
+ batchedModifications.getModifications().size(), ready);
+ }
- recordedOperationFutures.add(executeOperationAsync(batchedModifications));
- batchedModifications = null;
+ batchedModifications.setReady(ready);
+ sent = executeOperationAsync(batchedModifications);
+
+ batchedModifications = new BatchedModifications(identifier.toString(), remoteTransactionVersion,
+ transactionChainId);
}
+
+ return sent;
}
@Override
// Send the remaining batched modifications if any.
- sendBatchedModifications();
+ sendAndRecordBatchedModifications();
// If there were any previous recorded put/merge/delete operation reply Futures then we
// must wait for them to successfully complete. This is necessary to honor the read
// Send the remaining batched modifications if any.
- sendBatchedModifications();
+ sendAndRecordBatchedModifications();
// If there were any previous recorded put/merge/delete operation reply Futures then we
// must wait for them to successfully complete. This is necessary to honor the read
public static enum TransactionType {
READ_ONLY,
WRITE_ONLY,
- READ_WRITE
+ READ_WRITE;
+
+ public static TransactionType fromInt(int type) {
+ if(type == WRITE_ONLY.ordinal()) {
+ return WRITE_ONLY;
+ } else if(type == READ_WRITE.ordinal()) {
+ return READ_WRITE;
+ } else if(type == READ_ONLY.ordinal()) {
+ return READ_ONLY;
+ } else {
+ throw new IllegalArgumentException("In TransactionType enum value" + type);
+ }
+ }
}
static final Mapper<Throwable, Throwable> SAME_FAILURE_TRANSFORMER =
* Sets the target primary shard and initiates a CreateTransaction try.
*/
void setPrimaryShard(ActorSelection primaryShard) {
- LOG.debug("Tx {} Primary shard found - trying create transaction", identifier);
-
this.primaryShard = primaryShard;
- tryCreateTransaction();
+
+ if(transactionType == TransactionType.WRITE_ONLY &&
+ actorContext.getDatastoreContext().isWriteOnlyTransactionOptimizationsEnabled()) {
+ LOG.debug("Tx {} Primary shard found - creating WRITE_ONLY transaction context", identifier);
+
+ // For write-only Tx's we prepare the transaction modifications directly on the shard actor
+ // to avoid the overhead of creating a separate transaction actor.
+ // FIXME: can't assume the shard version is LITHIUM_VERSION - need to obtain it somehow.
+ executeTxOperatonsOnComplete(createValidTransactionContext(this.primaryShard,
+ this.primaryShard.path().toString(), DataStoreVersions.LITHIUM_VERSION));
+ } else {
+ tryCreateTransaction();
+ }
}
/**
boolean invokeOperation = true;
synchronized(txOperationsOnComplete) {
if(transactionContext == null) {
- LOG.debug("Tx {} Adding operation on complete {}", identifier);
+ LOG.debug("Tx {} Adding operation on complete", identifier);
invokeOperation = false;
txOperationsOnComplete.add(operation);
* Performs a CreateTransaction try async.
*/
private void tryCreateTransaction() {
+ LOG.debug("Tx {} Primary shard found - trying create transaction", identifier);
+
Object serializedCreateMessage = new CreateTransaction(identifier.toString(),
TransactionProxy.this.transactionType.ordinal(),
getTransactionChainId()).toSerializable();
// TransactionContext until after we've executed all cached TransactionOperations.
TransactionContext localTransactionContext;
if(failure != null) {
- LOG.debug("Tx {} Creating NoOpTransaction because of error: {}", identifier,
- failure.getMessage());
+ LOG.debug("Tx {} Creating NoOpTransaction because of error", identifier, failure);
localTransactionContext = new NoOpTransactionContext(failure, identifier, operationLimiter);
} else if (response.getClass().equals(CreateTransactionReply.SERIALIZABLE_CLASS)) {
}
private TransactionContext createValidTransactionContext(CreateTransactionReply reply) {
- String transactionPath = reply.getTransactionPath();
-
LOG.debug("Tx {} Received {}", identifier, reply);
- ActorSelection transactionActor = actorContext.actorSelection(transactionPath);
+ return createValidTransactionContext(actorContext.actorSelection(reply.getTransactionPath()),
+ reply.getTransactionPath(), reply.getVersion());
+ }
+
+ private TransactionContext createValidTransactionContext(ActorSelection transactionActor,
+ String transactionPath, short remoteTransactionVersion) {
if (transactionType == TransactionType.READ_ONLY) {
// Read-only Tx's aren't explicitly closed by the client so we create a PhantomReference
// Check if TxActor is created in the same node
boolean isTxActorLocal = actorContext.isPathLocal(transactionPath);
- if(reply.getVersion() >= DataStoreVersions.LITHIUM_VERSION) {
- return new TransactionContextImpl(transactionPath, transactionActor, identifier,
- actorContext, schemaContext, isTxActorLocal, reply.getVersion(), operationCompleter);
- } else {
+ if(remoteTransactionVersion < DataStoreVersions.LITHIUM_VERSION) {
return new PreLithiumTransactionContextImpl(transactionPath, transactionActor, identifier,
- actorContext, schemaContext, isTxActorLocal, reply.getVersion(), operationCompleter);
+ transactionChainId, actorContext, schemaContext, isTxActorLocal, remoteTransactionVersion,
+ operationCompleter);
+ } else if (transactionType == TransactionType.WRITE_ONLY &&
+ actorContext.getDatastoreContext().isWriteOnlyTransactionOptimizationsEnabled()) {
+ return new WriteOnlyTransactionContextImpl(transactionActor, identifier, transactionChainId,
+ actorContext, schemaContext, isTxActorLocal, remoteTransactionVersion, operationCompleter);
+ } else {
+ return new TransactionContextImpl(transactionActor, identifier, transactionChainId,
+ actorContext, schemaContext, isTxActorLocal, remoteTransactionVersion, operationCompleter);
}
}
}
--- /dev/null
+/*
+ * Copyright (c) 2015 Brocade Communications Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.datastore;
+
+import akka.actor.ActorSelection;
+import org.opendaylight.controller.cluster.datastore.identifiers.TransactionIdentifier;
+import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
+import org.opendaylight.yangtools.yang.model.api.SchemaContext;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import scala.concurrent.Future;
+
+/**
+ * Context for a write-only transaction.
+ *
+ * @author Thomas Pantelis
+ */
+public class WriteOnlyTransactionContextImpl extends TransactionContextImpl {
+ private static final Logger LOG = LoggerFactory.getLogger(WriteOnlyTransactionContextImpl.class);
+
+ public WriteOnlyTransactionContextImpl(ActorSelection actor, TransactionIdentifier identifier,
+ String transactionChainId, ActorContext actorContext, SchemaContext schemaContext, boolean isTxActorLocal,
+ short remoteTransactionVersion, OperationCompleter operationCompleter) {
+ super(actor, identifier, transactionChainId, actorContext, schemaContext, isTxActorLocal,
+ remoteTransactionVersion, operationCompleter);
+ }
+
+ @Override
+ public Future<ActorSelection> readyTransaction() {
+ LOG.debug("Tx {} readyTransaction called with {} previous recorded operations pending",
+ identifier, recordedOperationFutures.size());
+
+ // Send the remaining batched modifications if any.
+
+ Future<Object> lastModificationsFuture = sendBatchedModifications(true);
+
+ return combineRecordedOperationsFutures(lastModificationsFuture);
+ }
+}
package org.opendaylight.controller.cluster.datastore.compat;
import akka.actor.ActorSelection;
+import org.opendaylight.controller.cluster.datastore.DataStoreVersions;
import org.opendaylight.controller.cluster.datastore.OperationCompleter;
import org.opendaylight.controller.cluster.datastore.TransactionContextImpl;
import org.opendaylight.controller.cluster.datastore.identifiers.TransactionIdentifier;
import org.opendaylight.controller.cluster.datastore.messages.DeleteData;
import org.opendaylight.controller.cluster.datastore.messages.MergeData;
+import org.opendaylight.controller.cluster.datastore.messages.ReadyTransaction;
import org.opendaylight.controller.cluster.datastore.messages.WriteData;
import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
import org.opendaylight.yangtools.yang.model.api.SchemaContext;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import scala.concurrent.Future;
/**
* Implementation of TransactionContextImpl used when talking to a pre-Lithium controller that doesn't
* @author Thomas Pantelis
*/
public class PreLithiumTransactionContextImpl extends TransactionContextImpl {
+ private static final Logger LOG = LoggerFactory.getLogger(PreLithiumTransactionContextImpl.class);
+
+ private final String transactionPath;
public PreLithiumTransactionContextImpl(String transactionPath, ActorSelection actor, TransactionIdentifier identifier,
- ActorContext actorContext, SchemaContext schemaContext, boolean isTxActorLocal,
+ String transactionChainId, ActorContext actorContext, SchemaContext schemaContext, boolean isTxActorLocal,
short remoteTransactionVersion, OperationCompleter operationCompleter) {
- super(transactionPath, actor, identifier, actorContext, schemaContext, isTxActorLocal,
- remoteTransactionVersion, operationCompleter);
+ super(actor, identifier, transactionChainId, actorContext, schemaContext, isTxActorLocal,
+ remoteTransactionVersion, operationCompleter);
+ this.transactionPath = transactionPath;
}
@Override
recordedOperationFutures.add(executeOperationAsync(
new WriteData(path, data, getRemoteTransactionVersion())));
}
+
+ @Override
+ public Future<ActorSelection> readyTransaction() {
+ LOG.debug("Tx {} readyTransaction called with {} previous recorded operations pending",
+ identifier, recordedOperationFutures.size());
+
+ // Send the ReadyTransaction message to the Tx actor.
+
+ Future<Object> lastReplyFuture = executeOperationAsync(ReadyTransaction.INSTANCE);
+
+ return combineRecordedOperationsFutures(lastReplyFuture);
+ }
+
+ @Override
+ protected String deserializeCohortPath(String cohortPath) {
+ // In base Helium we used to return the local path of the actor which represented
+ // a remote ThreePhaseCommitCohort. The local path would then be converted to
+ // a remote path using this resolvePath method. To maintain compatibility with
+ // a Helium node we need to continue to do this conversion.
+ // At some point in the future when upgrades from Helium are not supported
+ // we could remove this code to resolvePath and just use the cohortPath as the
+ // resolved cohortPath
+ if(getRemoteTransactionVersion() < DataStoreVersions.HELIUM_1_VERSION) {
+ return getActorContext().resolvePath(transactionPath, cohortPath);
+ }
+
+ return cohortPath;
+ }
}
public class ShardTransactionIdentifier {
private final String remoteTransactionId;
+ private final String stringRepresentation;
public ShardTransactionIdentifier(String remoteTransactionId) {
this.remoteTransactionId = Preconditions.checkNotNull(remoteTransactionId,
"remoteTransactionId should not be null");
+
+ stringRepresentation = new StringBuilder(remoteTransactionId.length() + 6).append("shard-").
+ append(remoteTransactionId).toString();
}
public String getRemoteTransactionId() {
}
@Override public String toString() {
- final StringBuilder sb = new StringBuilder();
- sb.append("shard-").append(remoteTransactionId);
- return sb.toString();
+ return stringRepresentation;
}
}
*/
package org.opendaylight.controller.cluster.datastore.messages;
+import com.google.common.base.Preconditions;
+import java.io.IOException;
+import java.io.ObjectInput;
+import java.io.ObjectOutput;
import org.opendaylight.controller.cluster.datastore.modification.MutableCompositeModification;
/**
public class BatchedModifications extends MutableCompositeModification implements SerializableMessage {
private static final long serialVersionUID = 1L;
+ private boolean ready;
+ private String transactionID;
+ private String transactionChainID;
+
public BatchedModifications() {
}
- public BatchedModifications(short version) {
+ public BatchedModifications(String transactionID, short version, String transactionChainID) {
super(version);
+ this.transactionID = Preconditions.checkNotNull(transactionID, "transactionID can't be null");
+ this.transactionChainID = transactionChainID != null ? transactionChainID : "";
+ }
+
+ public boolean isReady() {
+ return ready;
+ }
+
+ public void setReady(boolean ready) {
+ this.ready = ready;
+ }
+
+ public String getTransactionID() {
+ return transactionID;
+ }
+
+ public String getTransactionChainID() {
+ return transactionChainID;
+ }
+
+ @Override
+ public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
+ super.readExternal(in);
+ transactionID = in.readUTF();
+ transactionChainID = in.readUTF();
+ ready = in.readBoolean();
+ }
+
+ @Override
+ public void writeExternal(ObjectOutput out) throws IOException {
+ super.writeExternal(out);
+ out.writeUTF(transactionID);
+ out.writeUTF(transactionChainID);
+ out.writeBoolean(ready);
}
@Override
public Object toSerializable() {
return this;
}
+
+ @Override
+ public String toString() {
+ StringBuilder builder = new StringBuilder();
+ builder.append("BatchedModifications [transactionID=").append(transactionID).append(", ready=").append(ready)
+ .append(", modifications size=").append(getModifications().size()).append("]");
+ return builder.toString();
+ }
}
public class BatchedModificationsReply extends VersionedExternalizableMessage {
private static final long serialVersionUID = 1L;
+ private static final byte COHORT_PATH_NOT_PRESENT = 0;
+ private static final byte COHORT_PATH_PRESENT = 1;
+
private int numBatched;
+ private String cohortPath;
public BatchedModificationsReply() {
}
this.numBatched = numBatched;
}
+ public BatchedModificationsReply(int numBatched, String cohortPath) {
+ this.numBatched = numBatched;
+ this.cohortPath = cohortPath;
+ }
public int getNumBatched() {
return numBatched;
}
+ public String getCohortPath() {
+ return cohortPath;
+ }
+
@Override
public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
super.readExternal(in);
numBatched = in.readInt();
+
+ if(in.readByte() == COHORT_PATH_PRESENT) {
+ cohortPath = in.readUTF();
+ }
}
@Override
public void writeExternal(ObjectOutput out) throws IOException {
super.writeExternal(out);
out.writeInt(numBatched);
+
+ if(cohortPath != null) {
+ out.writeByte(COHORT_PATH_PRESENT);
+ out.writeUTF(cohortPath);
+ } else {
+ out.writeByte(COHORT_PATH_NOT_PRESENT);
+ }
}
@Override
public Object toSerializable() {
return this;
}
+
+ @Override
+ public String toString() {
+ StringBuilder builder = new StringBuilder();
+ builder.append("BatchedModificationsReply [numBatched=").append(numBatched).append(", cohortPath=")
+ .append(cohortPath).append("]");
+ return builder.toString();
+ }
}
import java.io.IOException;
import java.io.ObjectInput;
import java.io.ObjectOutput;
+import org.opendaylight.controller.cluster.datastore.DataStoreVersions;
/**
* Abstract base class for a versioned Externalizable message.
public abstract class VersionedExternalizableMessage implements Externalizable, SerializableMessage {
private static final long serialVersionUID = 1L;
- private short version;
+ private short version = DataStoreVersions.CURRENT_VERSION;
public VersionedExternalizableMessage() {
}
protected final String memberName = "mock-member";
- protected final Builder dataStoreContextBuilder = DatastoreContext.newBuilder().operationTimeoutInSeconds(2).
- shardBatchedModificationCount(1);
+ protected final Builder dataStoreContextBuilder = DatastoreContext.newBuilder().operationTimeoutInSeconds(2);
@BeforeClass
public static void setUpClass() throws IOException {
eq(actorSelection(actorRef)), isA(BatchedModifications.class));
}
+ protected void expectBatchedModificationsReady(ActorRef actorRef, int count) {
+ Future<BatchedModificationsReply> replyFuture = Futures.successful(
+ new BatchedModificationsReply(count, actorRef.path().toString()));
+ doReturn(replyFuture).when(mockActorContext).executeOperationAsync(
+ eq(actorSelection(actorRef)), isA(BatchedModifications.class));
+ }
+
protected void expectBatchedModifications(int count) {
doReturn(batchedModificationsReply(count)).when(mockActorContext).executeOperationAsync(
any(ActorSelection.class), isA(BatchedModifications.class));
protected ActorRef setupActorContextWithInitialCreateTransaction(ActorSystem actorSystem,
TransactionType type, int transactionVersion, String prefix, ActorRef shardActorRef) {
- ActorRef txActorRef = actorSystem.actorOf(Props.create(DoNothingActor.class));
- log.info("Created mock shard Tx actor {}", txActorRef);
+ ActorRef txActorRef;
+ if(type == TransactionType.WRITE_ONLY && transactionVersion >= DataStoreVersions.LITHIUM_VERSION &&
+ dataStoreContextBuilder.build().isWriteOnlyTransactionOptimizationsEnabled()) {
+ txActorRef = shardActorRef;
+ } else {
+ txActorRef = actorSystem.actorOf(Props.create(DoNothingActor.class));
+ log.info("Created mock shard Tx actor {}", txActorRef);
- doReturn(actorSystem.actorSelection(txActorRef.path())).when(mockActorContext).actorSelection(
- txActorRef.path().toString());
+ doReturn(actorSystem.actorSelection(txActorRef.path())).
+ when(mockActorContext).actorSelection(txActorRef.path().toString());
- doReturn(Futures.successful(createTransactionReply(txActorRef, transactionVersion))).when(mockActorContext).
- executeOperationAsync(eq(actorSystem.actorSelection(shardActorRef.path())),
- eqCreateTransaction(prefix, type));
+ doReturn(Futures.successful(createTransactionReply(txActorRef, transactionVersion))).when(mockActorContext).
+ executeOperationAsync(eq(actorSystem.actorSelection(shardActorRef.path())),
+ eqCreateTransaction(prefix, type));
+ }
return txActorRef;
}
return captured;
}
- protected void verifyOneBatchedModification(ActorRef actorRef, Modification expected) {
+ protected void verifyOneBatchedModification(ActorRef actorRef, Modification expected, boolean expIsReady) {
List<BatchedModifications> batchedModifications = captureBatchedModifications(actorRef);
assertEquals("Captured BatchedModifications count", 1, batchedModifications.size());
- verifyBatchedModifications(batchedModifications.get(0), expected);
+ verifyBatchedModifications(batchedModifications.get(0), expIsReady, expected);
}
- protected void verifyBatchedModifications(Object message, Modification... expected) {
+ protected void verifyBatchedModifications(Object message, boolean expIsReady, Modification... expected) {
assertEquals("Message type", BatchedModifications.class, message.getClass());
BatchedModifications batchedModifications = (BatchedModifications)message;
assertEquals("BatchedModifications size", expected.length, batchedModifications.getModifications().size());
+ assertEquals("isReady", expIsReady, batchedModifications.isReady());
for(int i = 0; i < batchedModifications.getModifications().size(); i++) {
Modification actual = batchedModifications.getModifications().get(i);
assertEquals("Modification type", expected[i].getClass(), actual.getClass());
// Create the write Tx
- final DOMStoreWriteTransaction writeTx = dataStore.newWriteOnlyTransaction();
+ // TODO - we'll want to test this with write-only as well when FindPrimary returns the leader shard.
+ final DOMStoreWriteTransaction writeTx = dataStore.newReadWriteTransaction();
assertNotNull("newReadWriteTransaction returned null", writeTx);
// Do some modification operations and ready the Tx on a separate thread.
// Create the write Tx.
- final DOMStoreWriteTransaction writeTx = dataStore.newWriteOnlyTransaction();
+ final DOMStoreWriteTransaction writeTx = dataStore.newReadWriteTransaction();
assertNotNull("newReadWriteTransaction returned null", writeTx);
// Do some modifications and ready the Tx on a separate thread.
import static org.mockito.Mockito.mock;
import static org.opendaylight.controller.cluster.datastore.DataStoreVersions.CURRENT_VERSION;
import akka.actor.ActorRef;
+import akka.actor.ActorSelection;
import akka.actor.PoisonPill;
import akka.actor.Props;
import akka.dispatch.Dispatchers;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import org.junit.Test;
import org.mockito.InOrder;
import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
import org.opendaylight.controller.cluster.datastore.messages.AbortTransaction;
import org.opendaylight.controller.cluster.datastore.messages.AbortTransactionReply;
+import org.opendaylight.controller.cluster.datastore.messages.BatchedModifications;
+import org.opendaylight.controller.cluster.datastore.messages.BatchedModificationsReply;
import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransaction;
import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransactionReply;
import org.opendaylight.controller.cluster.datastore.messages.CommitTransaction;
import org.opendaylight.controller.cluster.datastore.messages.CreateTransaction;
import org.opendaylight.controller.cluster.datastore.messages.ForwardedReadyTransaction;
import org.opendaylight.controller.cluster.datastore.messages.PeerAddressResolved;
+import org.opendaylight.controller.cluster.datastore.messages.ReadData;
+import org.opendaylight.controller.cluster.datastore.messages.ReadDataReply;
import org.opendaylight.controller.cluster.datastore.messages.ReadyTransactionReply;
import org.opendaylight.controller.cluster.datastore.messages.RegisterChangeListener;
import org.opendaylight.controller.cluster.datastore.messages.RegisterChangeListenerReply;
import org.opendaylight.controller.sal.core.spi.data.DOMStoreWriteTransaction;
import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.PathArgument;
+import org.opendaylight.yangtools.yang.data.api.schema.ContainerNode;
import org.opendaylight.yangtools.yang.data.api.schema.DataContainerChild;
import org.opendaylight.yangtools.yang.data.api.schema.MapEntryNode;
import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
import scala.concurrent.duration.FiniteDuration;
public class ShardTest extends AbstractShardTest {
+
@Test
public void testRegisterChangeListener() throws Exception {
new ShardTestKit(getSystem()) {{
waitUntilLeader(shard);
- // Setup 3 simulated transactions with mock cohorts backed by real cohorts.
-
- InMemoryDOMDataStore dataStore = shard.underlyingActor().getDataStore();
-
- String transactionID1 = "tx1";
- MutableCompositeModification modification1 = new MutableCompositeModification();
- DOMStoreThreePhaseCommitCohort cohort1 = setupMockWriteTransaction("cohort1", dataStore,
- TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME), modification1);
+ final String transactionID1 = "tx1";
+ final String transactionID2 = "tx2";
+ final String transactionID3 = "tx3";
- String transactionID2 = "tx2";
- MutableCompositeModification modification2 = new MutableCompositeModification();
- DOMStoreThreePhaseCommitCohort cohort2 = setupMockWriteTransaction("cohort2", dataStore,
- TestModel.OUTER_LIST_PATH,
- ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build(),
- modification2);
+ final AtomicReference<DOMStoreThreePhaseCommitCohort> mockCohort1 = new AtomicReference<>();
+ final AtomicReference<DOMStoreThreePhaseCommitCohort> mockCohort2 = new AtomicReference<>();
+ final AtomicReference<DOMStoreThreePhaseCommitCohort> mockCohort3 = new AtomicReference<>();
+ ShardCommitCoordinator.CohortDecorator cohortDecorator = new ShardCommitCoordinator.CohortDecorator() {
+ @Override
+ public DOMStoreThreePhaseCommitCohort decorate(String transactionID, DOMStoreThreePhaseCommitCohort actual) {
+ if(transactionID.equals(transactionID1)) {
+ mockCohort1.set(createDelegatingMockCohort("cohort1", actual));
+ return mockCohort1.get();
+ } else if(transactionID.equals(transactionID2)) {
+ mockCohort2.set(createDelegatingMockCohort("cohort2", actual));
+ return mockCohort2.get();
+ } else {
+ mockCohort3.set(createDelegatingMockCohort("cohort3", actual));
+ return mockCohort3.get();
+ }
+ }
+ };
- String transactionID3 = "tx3";
- MutableCompositeModification modification3 = new MutableCompositeModification();
- DOMStoreThreePhaseCommitCohort cohort3 = setupMockWriteTransaction("cohort3", dataStore,
- YangInstanceIdentifier.builder(TestModel.OUTER_LIST_PATH)
- .nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1).build(),
- ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1),
- modification3);
+ shard.underlyingActor().getCommitCoordinator().setCohortDecorator(cohortDecorator);
long timeoutSec = 5;
final FiniteDuration duration = FiniteDuration.create(timeoutSec, TimeUnit.SECONDS);
final Timeout timeout = new Timeout(duration);
- // Simulate the ForwardedReadyTransaction message for the first Tx that would be sent
- // by the ShardTransaction.
+ // Send a BatchedModifications message for the first transaction.
- shard.tell(new ForwardedReadyTransaction(transactionID1, CURRENT_VERSION,
- cohort1, modification1, true), getRef());
- ReadyTransactionReply readyReply = ReadyTransactionReply.fromSerializable(
- expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS));
- assertEquals("Cohort path", shard.path().toString(), readyReply.getCohortPath());
+ shard.tell(newBatchedModifications(transactionID1, TestModel.TEST_PATH,
+ ImmutableNodes.containerNode(TestModel.TEST_QNAME), true), getRef());
+ BatchedModificationsReply batchedReply = expectMsgClass(duration, BatchedModificationsReply.class);
+ assertEquals("getCohortPath", shard.path().toString(), batchedReply.getCohortPath());
+ assertEquals("getNumBatched", 1, batchedReply.getNumBatched());
// Send the CanCommitTransaction message for the first Tx.
expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS));
assertEquals("Can commit", true, canCommitReply.getCanCommit());
- // Send the ForwardedReadyTransaction for the next 2 Tx's.
+ // Send BatchedModifications for the next 2 Tx's.
- shard.tell(new ForwardedReadyTransaction(transactionID2, CURRENT_VERSION,
- cohort2, modification2, true), getRef());
- expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS);
+ shard.tell(newBatchedModifications(transactionID2, TestModel.OUTER_LIST_PATH,
+ ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build(), true), getRef());
+ expectMsgClass(duration, BatchedModificationsReply.class);
- shard.tell(new ForwardedReadyTransaction(transactionID3, CURRENT_VERSION,
- cohort3, modification3, true), getRef());
- expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS);
+ shard.tell(newBatchedModifications(transactionID3, YangInstanceIdentifier.builder(
+ TestModel.OUTER_LIST_PATH).nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1).build(),
+ ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1), true), getRef());
+ expectMsgClass(duration, BatchedModificationsReply.class);
// Send the CanCommitTransaction message for the next 2 Tx's. These should get queued and
// processed after the first Tx completes.
assertEquals("Commits complete", true, done);
- InOrder inOrder = inOrder(cohort1, cohort2, cohort3);
- inOrder.verify(cohort1).canCommit();
- inOrder.verify(cohort1).preCommit();
- inOrder.verify(cohort1).commit();
- inOrder.verify(cohort2).canCommit();
- inOrder.verify(cohort2).preCommit();
- inOrder.verify(cohort2).commit();
- inOrder.verify(cohort3).canCommit();
- inOrder.verify(cohort3).preCommit();
- inOrder.verify(cohort3).commit();
+ InOrder inOrder = inOrder(mockCohort1.get(), mockCohort2.get(), mockCohort3.get());
+ inOrder.verify(mockCohort1.get()).canCommit();
+ inOrder.verify(mockCohort1.get()).preCommit();
+ inOrder.verify(mockCohort1.get()).commit();
+ inOrder.verify(mockCohort2.get()).canCommit();
+ inOrder.verify(mockCohort2.get()).preCommit();
+ inOrder.verify(mockCohort2.get()).commit();
+ inOrder.verify(mockCohort3.get()).canCommit();
+ inOrder.verify(mockCohort3.get()).preCommit();
+ inOrder.verify(mockCohort3.get()).commit();
// Verify data in the data store.
}};
}
+ private BatchedModifications newBatchedModifications(String transactionID, YangInstanceIdentifier path,
+ NormalizedNode<?, ?> data, boolean ready) {
+ return newBatchedModifications(transactionID, null, path, data, ready);
+ }
+
+ private BatchedModifications newBatchedModifications(String transactionID, String transactionChainID,
+ YangInstanceIdentifier path, NormalizedNode<?, ?> data, boolean ready) {
+ BatchedModifications batched = new BatchedModifications(transactionID, CURRENT_VERSION, transactionChainID);
+ batched.addModification(new WriteModification(path, data));
+ batched.setReady(ready);
+ return batched;
+ }
+
+ @SuppressWarnings("unchecked")
@Test
- public void testCommitWithPersistenceDisabled() throws Throwable {
- dataStoreContextBuilder.persistent(false);
+ public void testMultipleBatchedModifications() throws Throwable {
new ShardTestKit(getSystem()) {{
final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
- "testCommitPhaseFailure");
+ "testMultipleBatchedModifications");
waitUntilLeader(shard);
- InMemoryDOMDataStore dataStore = shard.underlyingActor().getDataStore();
+ final String transactionID = "tx";
+ FiniteDuration duration = duration("5 seconds");
- // Setup a simulated transactions with a mock cohort.
+ final AtomicReference<DOMStoreThreePhaseCommitCohort> mockCohort = new AtomicReference<>();
+ ShardCommitCoordinator.CohortDecorator cohortDecorator = new ShardCommitCoordinator.CohortDecorator() {
+ @Override
+ public DOMStoreThreePhaseCommitCohort decorate(String txID, DOMStoreThreePhaseCommitCohort actual) {
+ if(mockCohort.get() == null) {
+ mockCohort.set(createDelegatingMockCohort("cohort", actual));
+ }
- String transactionID = "tx";
- MutableCompositeModification modification = new MutableCompositeModification();
- NormalizedNode<?, ?> containerNode = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
- DOMStoreThreePhaseCommitCohort cohort = setupMockWriteTransaction("cohort", dataStore,
- TestModel.TEST_PATH, containerNode, modification);
+ return mockCohort.get();
+ }
+ };
- FiniteDuration duration = duration("5 seconds");
+ shard.underlyingActor().getCommitCoordinator().setCohortDecorator(cohortDecorator);
+
+ // Send a BatchedModifications to start a transaction.
+
+ shard.tell(newBatchedModifications(transactionID, TestModel.TEST_PATH,
+ ImmutableNodes.containerNode(TestModel.TEST_QNAME), false), getRef());
+ expectMsgClass(duration, BatchedModificationsReply.class);
+
+ // Send a couple more BatchedModifications.
- // Simulate the ForwardedReadyTransaction messages that would be sent
- // by the ShardTransaction.
+ shard.tell(newBatchedModifications(transactionID, TestModel.OUTER_LIST_PATH,
+ ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build(), false), getRef());
+ expectMsgClass(duration, BatchedModificationsReply.class);
- shard.tell(new ForwardedReadyTransaction(transactionID, CURRENT_VERSION,
- cohort, modification, true), getRef());
- expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS);
+ shard.tell(newBatchedModifications(transactionID, YangInstanceIdentifier.builder(
+ TestModel.OUTER_LIST_PATH).nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1).build(),
+ ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1), true), getRef());
+ expectMsgClass(duration, BatchedModificationsReply.class);
// Send the CanCommitTransaction message.
shard.tell(new CommitTransaction(transactionID).toSerializable(), getRef());
expectMsgClass(duration, CommitTransactionReply.SERIALIZABLE_CLASS);
- InOrder inOrder = inOrder(cohort);
- inOrder.verify(cohort).canCommit();
- inOrder.verify(cohort).preCommit();
- inOrder.verify(cohort).commit();
+ InOrder inOrder = inOrder(mockCohort.get());
+ inOrder.verify(mockCohort.get()).canCommit();
+ inOrder.verify(mockCohort.get()).preCommit();
+ inOrder.verify(mockCohort.get()).commit();
+
+ // Verify data in the data store.
+
+ NormalizedNode<?, ?> outerList = readStore(shard, TestModel.OUTER_LIST_PATH);
+ assertNotNull(TestModel.OUTER_LIST_QNAME.getLocalName() + " not found", outerList);
+ assertTrue(TestModel.OUTER_LIST_QNAME.getLocalName() + " value is not Iterable",
+ outerList.getValue() instanceof Iterable);
+ Object entry = ((Iterable<Object>)outerList.getValue()).iterator().next();
+ assertTrue(TestModel.OUTER_LIST_QNAME.getLocalName() + " entry is not MapEntryNode",
+ entry instanceof MapEntryNode);
+ MapEntryNode mapEntry = (MapEntryNode)entry;
+ Optional<DataContainerChild<? extends PathArgument, ?>> idLeaf =
+ mapEntry.getChild(new YangInstanceIdentifier.NodeIdentifier(TestModel.ID_QNAME));
+ assertTrue("Missing leaf " + TestModel.ID_QNAME.getLocalName(), idLeaf.isPresent());
+ assertEquals(TestModel.ID_QNAME.getLocalName() + " value", 1, idLeaf.get().getValue());
+
+ shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
+ }};
+ }
+
+ @Test
+ public void testBatchedModificationsOnTransactionChain() throws Throwable {
+ new ShardTestKit(getSystem()) {{
+ final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
+ newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
+ "testBatchedModificationsOnTransactionChain");
+
+ waitUntilLeader(shard);
+
+ String transactionChainID = "txChain";
+ String transactionID1 = "tx1";
+ String transactionID2 = "tx2";
+
+ FiniteDuration duration = duration("5 seconds");
+
+ // Send a BatchedModifications to start a chained write transaction and ready it.
+
+ ContainerNode containerNode = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
+ YangInstanceIdentifier path = TestModel.TEST_PATH;
+ shard.tell(newBatchedModifications(transactionID1, transactionChainID, path,
+ containerNode, true), getRef());
+ expectMsgClass(duration, BatchedModificationsReply.class);
+
+ // Create a read Tx on the same chain.
+
+ shard.tell(new CreateTransaction(transactionID2, TransactionProxy.TransactionType.READ_ONLY.ordinal() ,
+ transactionChainID).toSerializable(), getRef());
+
+ CreateTransactionReply createReply = expectMsgClass(duration("3 seconds"), CreateTransactionReply.class);
+
+ getSystem().actorSelection(createReply.getTransactionActorPath()).tell(new ReadData(path), getRef());
+ ReadDataReply readReply = expectMsgClass(duration("3 seconds"), ReadDataReply.class);
+ assertEquals("Read node", containerNode, readReply.getNormalizedNode());
+
+ // Commit the write transaction.
+
+ shard.tell(new CanCommitTransaction(transactionID1).toSerializable(), getRef());
+ CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable(
+ expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS));
+ assertEquals("Can commit", true, canCommitReply.getCanCommit());
+
+ shard.tell(new CommitTransaction(transactionID1).toSerializable(), getRef());
+ expectMsgClass(duration, CommitTransactionReply.SERIALIZABLE_CLASS);
+
+ // Verify data in the data store.
+
+ NormalizedNode<?, ?> actualNode = readStore(shard, path);
+ assertEquals("Stored node", containerNode, actualNode);
+
+ shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
+ }};
+ }
+
+ @Test
+ public void testOnBatchedModificationsWhenNotLeader() {
+ final AtomicBoolean overrideLeaderCalls = new AtomicBoolean();
+ new ShardTestKit(getSystem()) {{
+ Creator<Shard> creator = new Creator<Shard>() {
+ @Override
+ public Shard create() throws Exception {
+ return new Shard(shardID, Collections.<ShardIdentifier,String>emptyMap(),
+ newDatastoreContext(), SCHEMA_CONTEXT) {
+ @Override
+ protected boolean isLeader() {
+ return overrideLeaderCalls.get() ? false : super.isLeader();
+ }
+
+ @Override
+ protected ActorSelection getLeader() {
+ return overrideLeaderCalls.get() ? getSystem().actorSelection(getRef().path()) :
+ super.getLeader();
+ }
+ };
+ }
+ };
+
+ TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
+ Props.create(new DelegatingShardCreator(creator)), "testOnBatchedModificationsWhenNotLeader");
+
+ waitUntilLeader(shard);
+
+ overrideLeaderCalls.set(true);
+
+ BatchedModifications batched = new BatchedModifications("tx", DataStoreVersions.CURRENT_VERSION, "");
+
+ shard.tell(batched, ActorRef.noSender());
+
+ expectMsgEquals(batched);
+
+ shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
+ }};
+ }
+
+ @Test
+ public void testCommitWithPersistenceDisabled() throws Throwable {
+ dataStoreContextBuilder.persistent(false);
+ new ShardTestKit(getSystem()) {{
+ final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
+ newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
+ "testCommitWithPersistenceDisabled");
+
+ waitUntilLeader(shard);
+
+ String transactionID = "tx";
+ FiniteDuration duration = duration("5 seconds");
+
+ // Send a BatchedModifications to start a transaction.
+
+ NormalizedNode<?, ?> containerNode = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
+ shard.tell(newBatchedModifications(transactionID, TestModel.TEST_PATH, containerNode, true), getRef());
+ expectMsgClass(duration, BatchedModificationsReply.class);
+
+ // Send the CanCommitTransaction message.
+
+ shard.tell(new CanCommitTransaction(transactionID).toSerializable(), getRef());
+ CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable(
+ expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS));
+ assertEquals("Can commit", true, canCommitReply.getCanCommit());
+
+ // Send the CanCommitTransaction message.
+
+ shard.tell(new CommitTransaction(transactionID).toSerializable(), getRef());
+ expectMsgClass(duration, CommitTransactionReply.SERIALIZABLE_CLASS);
NormalizedNode<?, ?> actualNode = readStore(shard, TestModel.TEST_PATH);
assertEquals(TestModel.TEST_QNAME.getLocalName(), containerNode, actualNode);
waitUntilLeader(shard);
- // Setup 2 simulated transactions with mock cohorts. The first one fails in the
- // commit phase.
+ // Setup 2 mock cohorts. The first one fails in the commit phase.
- String transactionID1 = "tx1";
- MutableCompositeModification modification1 = new MutableCompositeModification();
- DOMStoreThreePhaseCommitCohort cohort1 = mock(DOMStoreThreePhaseCommitCohort.class, "cohort1");
+ final String transactionID1 = "tx1";
+ final DOMStoreThreePhaseCommitCohort cohort1 = mock(DOMStoreThreePhaseCommitCohort.class, "cohort1");
doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort1).canCommit();
doReturn(Futures.immediateFuture(null)).when(cohort1).preCommit();
doReturn(Futures.immediateFailedFuture(new IllegalStateException("mock"))).when(cohort1).commit();
- String transactionID2 = "tx2";
- MutableCompositeModification modification2 = new MutableCompositeModification();
- DOMStoreThreePhaseCommitCohort cohort2 = mock(DOMStoreThreePhaseCommitCohort.class, "cohort2");
+ final String transactionID2 = "tx2";
+ final DOMStoreThreePhaseCommitCohort cohort2 = mock(DOMStoreThreePhaseCommitCohort.class, "cohort2");
doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort2).canCommit();
+ ShardCommitCoordinator.CohortDecorator cohortDecorator = new ShardCommitCoordinator.CohortDecorator() {
+ @Override
+ public DOMStoreThreePhaseCommitCohort decorate(String transactionID,
+ DOMStoreThreePhaseCommitCohort actual) {
+ return transactionID1.equals(transactionID) ? cohort1 : cohort2;
+ }
+ };
+
+ shard.underlyingActor().getCommitCoordinator().setCohortDecorator(cohortDecorator);
+
FiniteDuration duration = duration("5 seconds");
final Timeout timeout = new Timeout(duration);
- // Simulate the ForwardedReadyTransaction messages that would be sent
- // by the ShardTransaction.
+ // Send BatchedModifications to start and ready each transaction.
- shard.tell(new ForwardedReadyTransaction(transactionID1, CURRENT_VERSION,
- cohort1, modification1, true), getRef());
- expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS);
+ shard.tell(newBatchedModifications(transactionID1, TestModel.TEST_PATH,
+ ImmutableNodes.containerNode(TestModel.TEST_QNAME), true), getRef());
+ expectMsgClass(duration, BatchedModificationsReply.class);
- shard.tell(new ForwardedReadyTransaction(transactionID2, CURRENT_VERSION,
- cohort2, modification2, true), getRef());
- expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS);
+ shard.tell(newBatchedModifications(transactionID2, TestModel.TEST_PATH,
+ ImmutableNodes.containerNode(TestModel.TEST_QNAME), true), getRef());
+ expectMsgClass(duration, BatchedModificationsReply.class);
// Send the CanCommitTransaction message for the first Tx.
waitUntilLeader(shard);
String transactionID = "tx1";
- MutableCompositeModification modification = new MutableCompositeModification();
- DOMStoreThreePhaseCommitCohort cohort = mock(DOMStoreThreePhaseCommitCohort.class, "cohort1");
+ final DOMStoreThreePhaseCommitCohort cohort = mock(DOMStoreThreePhaseCommitCohort.class, "cohort1");
doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort).canCommit();
doReturn(Futures.immediateFailedFuture(new IllegalStateException("mock"))).when(cohort).preCommit();
+ ShardCommitCoordinator.CohortDecorator cohortDecorator = new ShardCommitCoordinator.CohortDecorator() {
+ @Override
+ public DOMStoreThreePhaseCommitCohort decorate(String transactionID,
+ DOMStoreThreePhaseCommitCohort actual) {
+ return cohort;
+ }
+ };
+
+ shard.underlyingActor().getCommitCoordinator().setCohortDecorator(cohortDecorator);
+
FiniteDuration duration = duration("5 seconds");
- // Simulate the ForwardedReadyTransaction messages that would be sent
- // by the ShardTransaction.
+ // Send BatchedModifications to start and ready a transaction.
- shard.tell(new ForwardedReadyTransaction(transactionID, CURRENT_VERSION,
- cohort, modification, true), getRef());
- expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS);
+ shard.tell(newBatchedModifications(transactionID, TestModel.TEST_PATH,
+ ImmutableNodes.containerNode(TestModel.TEST_QNAME), true), getRef());
+ expectMsgClass(duration, BatchedModificationsReply.class);
// Send the CanCommitTransaction message.
final FiniteDuration duration = duration("5 seconds");
String transactionID = "tx1";
- MutableCompositeModification modification = new MutableCompositeModification();
- DOMStoreThreePhaseCommitCohort cohort = mock(DOMStoreThreePhaseCommitCohort.class, "cohort1");
+ final DOMStoreThreePhaseCommitCohort cohort = mock(DOMStoreThreePhaseCommitCohort.class, "cohort1");
doReturn(Futures.immediateFailedFuture(new IllegalStateException("mock"))).when(cohort).canCommit();
- // Simulate the ForwardedReadyTransaction messages that would be sent
- // by the ShardTransaction.
+ ShardCommitCoordinator.CohortDecorator cohortDecorator = new ShardCommitCoordinator.CohortDecorator() {
+ @Override
+ public DOMStoreThreePhaseCommitCohort decorate(String transactionID,
+ DOMStoreThreePhaseCommitCohort actual) {
+ return cohort;
+ }
+ };
+
+ shard.underlyingActor().getCommitCoordinator().setCohortDecorator(cohortDecorator);
+
+ // Send BatchedModifications to start and ready a transaction.
- shard.tell(new ForwardedReadyTransaction(transactionID, CURRENT_VERSION,
- cohort, modification, true), getRef());
- expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS);
+ shard.tell(newBatchedModifications(transactionID, TestModel.TEST_PATH,
+ ImmutableNodes.containerNode(TestModel.TEST_QNAME), true), getRef());
+ expectMsgClass(duration, BatchedModificationsReply.class);
// Send the CanCommitTransaction message.
}
};
- MutableCompositeModification modification = new MutableCompositeModification();
- DOMStoreThreePhaseCommitCohort cohort = setupMockWriteTransaction("cohort1", dataStore,
- TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME),
- modification, preCommit);
-
- shard.tell(new ForwardedReadyTransaction(transactionID, CURRENT_VERSION,
- cohort, modification, true), getRef());
- expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS);
+ shard.tell(newBatchedModifications(transactionID, TestModel.TEST_PATH,
+ ImmutableNodes.containerNode(TestModel.TEST_QNAME), true), getRef());
+ expectMsgClass(duration, BatchedModificationsReply.class);
shard.tell(new CanCommitTransaction(transactionID).toSerializable(), getRef());
CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable(
final FiniteDuration duration = duration("5 seconds");
- InMemoryDOMDataStore dataStore = shard.underlyingActor().getDataStore();
-
writeToStore(shard, TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
writeToStore(shard, TestModel.OUTER_LIST_PATH,
ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build());
- // Create 1st Tx - will timeout
+ // Create and ready the 1st Tx - will timeout
String transactionID1 = "tx1";
- MutableCompositeModification modification1 = new MutableCompositeModification();
- DOMStoreThreePhaseCommitCohort cohort1 = setupMockWriteTransaction("cohort1", dataStore,
- YangInstanceIdentifier.builder(TestModel.OUTER_LIST_PATH)
- .nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1).build(),
- ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1),
- modification1);
+ shard.tell(newBatchedModifications(transactionID1, YangInstanceIdentifier.builder(
+ TestModel.OUTER_LIST_PATH).nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1).build(),
+ ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1), true), getRef());
+ expectMsgClass(duration, BatchedModificationsReply.class);
- // Create 2nd Tx
+ // Create and ready the 2nd Tx
- String transactionID2 = "tx3";
- MutableCompositeModification modification2 = new MutableCompositeModification();
+ String transactionID2 = "tx2";
YangInstanceIdentifier listNodePath = YangInstanceIdentifier.builder(TestModel.OUTER_LIST_PATH)
- .nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 2).build();
- DOMStoreThreePhaseCommitCohort cohort2 = setupMockWriteTransaction("cohort3", dataStore,
- listNodePath,
- ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 2),
- modification2);
-
- // Ready the Tx's
-
- shard.tell(new ForwardedReadyTransaction(transactionID1, CURRENT_VERSION,
- cohort1, modification1, true), getRef());
- expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS);
-
- shard.tell(new ForwardedReadyTransaction(transactionID2, CURRENT_VERSION,
- cohort2, modification2, true), getRef());
- expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS);
+ .nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 2).build();
+ shard.tell(newBatchedModifications(transactionID2, listNodePath,
+ ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 2), true), getRef());
+ expectMsgClass(duration, BatchedModificationsReply.class);
// canCommit 1st Tx. We don't send the commit so it should timeout.
final FiniteDuration duration = duration("5 seconds");
- InMemoryDOMDataStore dataStore = shard.underlyingActor().getDataStore();
-
String transactionID1 = "tx1";
- MutableCompositeModification modification1 = new MutableCompositeModification();
- DOMStoreThreePhaseCommitCohort cohort1 = setupMockWriteTransaction("cohort1", dataStore,
- TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME), modification1);
-
String transactionID2 = "tx2";
- MutableCompositeModification modification2 = new MutableCompositeModification();
- DOMStoreThreePhaseCommitCohort cohort2 = setupMockWriteTransaction("cohort2", dataStore,
- TestModel.OUTER_LIST_PATH,
- ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build(),
- modification2);
-
String transactionID3 = "tx3";
- MutableCompositeModification modification3 = new MutableCompositeModification();
- DOMStoreThreePhaseCommitCohort cohort3 = setupMockWriteTransaction("cohort3", dataStore,
- TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME), modification3);
- // Ready the Tx's
+ // Send a BatchedModifications to start transactions and ready them.
- shard.tell(new ForwardedReadyTransaction(transactionID1, CURRENT_VERSION,
- cohort1, modification1, true), getRef());
- expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS);
+ shard.tell(newBatchedModifications(transactionID1, TestModel.TEST_PATH,
+ ImmutableNodes.containerNode(TestModel.TEST_QNAME), true), getRef());
+ expectMsgClass(duration, BatchedModificationsReply.class);
- shard.tell(new ForwardedReadyTransaction(transactionID2, CURRENT_VERSION,
- cohort2, modification2, true), getRef());
- expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS);
+ shard.tell(newBatchedModifications(transactionID2,TestModel.OUTER_LIST_PATH,
+ ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build(), true), getRef());
+ expectMsgClass(duration, BatchedModificationsReply.class);
- shard.tell(new ForwardedReadyTransaction(transactionID3, CURRENT_VERSION,
- cohort3, modification3, true), getRef());
- expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS);
+ shard.tell(newBatchedModifications(transactionID3, TestModel.TEST_PATH,
+ ImmutableNodes.containerNode(TestModel.TEST_QNAME), true), getRef());
+ expectMsgClass(duration, BatchedModificationsReply.class);
// canCommit 1st Tx.
// Setup 2 simulated transactions with mock cohorts. The first one will be aborted.
- String transactionID1 = "tx1";
- MutableCompositeModification modification1 = new MutableCompositeModification();
- DOMStoreThreePhaseCommitCohort cohort1 = mock(DOMStoreThreePhaseCommitCohort.class, "cohort1");
+ final String transactionID1 = "tx1";
+ final DOMStoreThreePhaseCommitCohort cohort1 = mock(DOMStoreThreePhaseCommitCohort.class, "cohort1");
doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort1).canCommit();
doReturn(Futures.immediateFuture(null)).when(cohort1).abort();
- String transactionID2 = "tx2";
- MutableCompositeModification modification2 = new MutableCompositeModification();
- DOMStoreThreePhaseCommitCohort cohort2 = mock(DOMStoreThreePhaseCommitCohort.class, "cohort2");
+ final String transactionID2 = "tx2";
+ final DOMStoreThreePhaseCommitCohort cohort2 = mock(DOMStoreThreePhaseCommitCohort.class, "cohort2");
doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort2).canCommit();
FiniteDuration duration = duration("5 seconds");
final Timeout timeout = new Timeout(duration);
- // Simulate the ForwardedReadyTransaction messages that would be sent
- // by the ShardTransaction.
+ ShardCommitCoordinator.CohortDecorator cohortDecorator = new ShardCommitCoordinator.CohortDecorator() {
+ @Override
+ public DOMStoreThreePhaseCommitCohort decorate(String transactionID,
+ DOMStoreThreePhaseCommitCohort actual) {
+ return transactionID1.equals(transactionID) ? cohort1 : cohort2;
+ }
+ };
+
+ shard.underlyingActor().getCommitCoordinator().setCohortDecorator(cohortDecorator);
- shard.tell(new ForwardedReadyTransaction(transactionID1, CURRENT_VERSION,
- cohort1, modification1, true), getRef());
- expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS);
+ // Send BatchedModifications to start and ready each transaction.
- shard.tell(new ForwardedReadyTransaction(transactionID2, CURRENT_VERSION,
- cohort2, modification2, true), getRef());
- expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS);
+ shard.tell(newBatchedModifications(transactionID1, TestModel.TEST_PATH,
+ ImmutableNodes.containerNode(TestModel.TEST_QNAME), true), getRef());
+ expectMsgClass(duration, BatchedModificationsReply.class);
+
+ shard.tell(newBatchedModifications(transactionID2, TestModel.TEST_PATH,
+ ImmutableNodes.containerNode(TestModel.TEST_QNAME), true), getRef());
+ expectMsgClass(duration, BatchedModificationsReply.class);
// Send the CanCommitTransaction message for the first Tx.
shard2.tell(PoisonPill.getInstance(), ActorRef.noSender());
}};
+
}
@Test
YangInstanceIdentifier deletePath = TestModel.TEST_PATH;
- BatchedModifications batched = new BatchedModifications(DataStoreVersions.CURRENT_VERSION);
+ BatchedModifications batched = new BatchedModifications("tx1", DataStoreVersions.CURRENT_VERSION, null);
batched.addModification(new WriteModification(writePath, writeData));
batched.addModification(new MergeModification(mergePath, mergeData));
batched.addModification(new DeleteModification(deletePath));
import java.util.concurrent.atomic.AtomicReference;
import org.junit.Assert;
import org.junit.Test;
+import org.opendaylight.controller.cluster.datastore.messages.BatchedModifications;
+import org.opendaylight.controller.cluster.datastore.messages.BatchedModificationsReply;
import org.opendaylight.controller.cluster.datastore.messages.ReadyTransaction;
import org.opendaylight.controller.cluster.datastore.modification.WriteModification;
+import org.opendaylight.controller.cluster.datastore.shardstrategy.DefaultShardStrategy;
import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadTransaction;
verify(mockActorContext, times(0)).acquireTxCreationPermit();
}
+ /**
+ * Tests 2 successive chained write-only transactions and verifies the second transaction isn't
+ * initiated until the first one completes its read future.
+ */
+ @Test
+ public void testChainedWriteOnlyTransactions() throws Exception {
+ dataStoreContextBuilder.writeOnlyTransactionOptimizationsEnabled(true);
+
+ TransactionChainProxy txChainProxy = new TransactionChainProxy(mockActorContext);
+
+ ActorRef txActorRef1 = setupActorContextWithoutInitialCreateTransaction(getSystem());
+
+ Promise<Object> batchedReplyPromise1 = akka.dispatch.Futures.promise();
+ doReturn(batchedReplyPromise1.future()).when(mockActorContext).executeOperationAsync(
+ eq(actorSelection(txActorRef1)), isA(BatchedModifications.class));
+
+ DOMStoreWriteTransaction writeTx1 = txChainProxy.newWriteOnlyTransaction();
+
+ NormalizedNode<?, ?> writeNode1 = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
+ writeTx1.write(TestModel.TEST_PATH, writeNode1);
+
+ writeTx1.ready();
+
+ verify(mockActorContext, times(1)).findPrimaryShardAsync(eq(DefaultShardStrategy.DEFAULT_SHARD));
+
+ verifyOneBatchedModification(txActorRef1, new WriteModification(TestModel.TEST_PATH, writeNode1), true);
+
+ ActorRef txActorRef2 = setupActorContextWithoutInitialCreateTransaction(getSystem());
+
+ expectBatchedModifications(txActorRef2, 1);
+
+ final NormalizedNode<?, ?> writeNode2 = ImmutableNodes.containerNode(TestModel.OUTER_LIST_QNAME);
+
+ final DOMStoreWriteTransaction writeTx2 = txChainProxy.newWriteOnlyTransaction();
+
+ final AtomicReference<Exception> caughtEx = new AtomicReference<>();
+ final CountDownLatch write2Complete = new CountDownLatch(1);
+ new Thread() {
+ @Override
+ public void run() {
+ try {
+ writeTx2.write(TestModel.OUTER_LIST_PATH, writeNode2);
+ } catch (Exception e) {
+ caughtEx.set(e);
+ } finally {
+ write2Complete.countDown();
+ }
+ }
+ }.start();
+
+ assertEquals("Tx 2 write should've completed", true, write2Complete.await(5, TimeUnit.SECONDS));
+
+ if(caughtEx.get() != null) {
+ throw caughtEx.get();
+ }
+
+ try {
+ verify(mockActorContext, times(1)).findPrimaryShardAsync(eq(DefaultShardStrategy.DEFAULT_SHARD));
+ } catch (AssertionError e) {
+ fail("Tx 2 should not have initiated until the Tx 1's ready future completed");
+ }
+
+ batchedReplyPromise1.success(new BatchedModificationsReply(1, txActorRef1.path().toString()));
+
+ // Tx 2 should've proceeded to find the primary shard.
+ verify(mockActorContext, timeout(5000).times(2)).findPrimaryShardAsync(eq(DefaultShardStrategy.DEFAULT_SHARD));
+ }
+
/**
* Tests 2 successive chained read-write transactions and verifies the second transaction isn't
* initiated until the first one completes its read future.
writeTx1.ready();
- verifyOneBatchedModification(txActorRef1, new WriteModification(TestModel.TEST_PATH, writeNode1));
+ verifyOneBatchedModification(txActorRef1, new WriteModification(TestModel.TEST_PATH, writeNode1), false);
String tx2MemberName = "tx2MemberName";
doReturn(tx2MemberName).when(mockActorContext).getCurrentMemberName();
import static org.mockito.Matchers.eq;
import static org.mockito.Matchers.isA;
import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.never;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.opendaylight.controller.cluster.datastore.TransactionProxy.TransactionType.READ_ONLY;
import org.junit.Test;
import org.mockito.InOrder;
import org.mockito.Mockito;
+import org.opendaylight.controller.cluster.datastore.TransactionProxy.TransactionType;
import org.opendaylight.controller.cluster.datastore.exceptions.PrimaryNotFoundException;
import org.opendaylight.controller.cluster.datastore.exceptions.TimeoutException;
import org.opendaylight.controller.cluster.datastore.messages.BatchedModifications;
@Test
public void testWrite() throws Exception {
+ dataStoreContextBuilder.shardBatchedModificationCount(1);
ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), WRITE_ONLY);
NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
expectBatchedModifications(actorRef, 1);
- expectReadyTransaction(actorRef);
TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, WRITE_ONLY);
transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
- // This sends the batched modification.
- transactionProxy.ready();
-
- verifyOneBatchedModification(actorRef, new WriteModification(TestModel.TEST_PATH, nodeToWrite));
-
- verifyRecordingOperationFutures(transactionProxy.getRecordedOperationFutures(),
- BatchedModificationsReply.class);
+ verifyOneBatchedModification(actorRef, new WriteModification(TestModel.TEST_PATH, nodeToWrite), false);
}
@Test
// This sends the batched modification.
transactionProxy.ready();
- verifyOneBatchedModification(actorRef, new WriteModification(TestModel.TEST_PATH, nodeToWrite));
+ verifyOneBatchedModification(actorRef, new WriteModification(TestModel.TEST_PATH, nodeToWrite), false);
verifyRecordingOperationFutures(transactionProxy.getRecordedOperationFutures(),
BatchedModificationsReply.class);
@Test
public void testMerge() throws Exception {
+ dataStoreContextBuilder.shardBatchedModificationCount(1);
ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), WRITE_ONLY);
NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
expectBatchedModifications(actorRef, 1);
- expectReadyTransaction(actorRef);
TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, WRITE_ONLY);
transactionProxy.merge(TestModel.TEST_PATH, nodeToWrite);
- // This sends the batched modification.
- transactionProxy.ready();
-
- verifyOneBatchedModification(actorRef, new MergeModification(TestModel.TEST_PATH, nodeToWrite));
-
- verifyRecordingOperationFutures(transactionProxy.getRecordedOperationFutures(),
- BatchedModificationsReply.class);
+ verifyOneBatchedModification(actorRef, new MergeModification(TestModel.TEST_PATH, nodeToWrite), false);
}
@Test
public void testDelete() throws Exception {
+ dataStoreContextBuilder.shardBatchedModificationCount(1);
ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), WRITE_ONLY);
expectBatchedModifications(actorRef, 1);
- expectReadyTransaction(actorRef);
TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, WRITE_ONLY);
transactionProxy.delete(TestModel.TEST_PATH);
- // This sends the batched modification.
- transactionProxy.ready();
-
- verifyOneBatchedModification(actorRef, new DeleteModification(TestModel.TEST_PATH));
-
- verifyRecordingOperationFutures(transactionProxy.getRecordedOperationFutures(),
- BatchedModificationsReply.class);
+ verifyOneBatchedModification(actorRef, new DeleteModification(TestModel.TEST_PATH), false);
}
@Test
- public void testReady() throws Exception {
+ public void testReadyWithReadWrite() throws Exception {
ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_WRITE);
NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
verify(mockActorContext).executeOperationAsync(eq(actorSelection(actorRef)),
isA(BatchedModifications.class));
+
+ verify(mockActorContext).executeOperationAsync(eq(actorSelection(actorRef)),
+ isA(ReadyTransaction.SERIALIZABLE_CLASS));
+ }
+
+ @Test
+ public void testReadyWithWriteOnlyAndLastBatchPending() throws Exception {
+ dataStoreContextBuilder.writeOnlyTransactionOptimizationsEnabled(true);
+
+ ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), WRITE_ONLY);
+
+ NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
+
+ expectBatchedModificationsReady(actorRef, 1);
+
+ TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, WRITE_ONLY);
+
+ transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
+
+ DOMStoreThreePhaseCommitCohort ready = transactionProxy.ready();
+
+ assertTrue(ready instanceof ThreePhaseCommitCohortProxy);
+
+ ThreePhaseCommitCohortProxy proxy = (ThreePhaseCommitCohortProxy) ready;
+
+ verifyRecordingOperationFutures(transactionProxy.getRecordedOperationFutures());
+
+ verifyCohortFutures(proxy, getSystem().actorSelection(actorRef.path()));
+
+ List<BatchedModifications> batchedModifications = captureBatchedModifications(actorRef);
+ assertEquals("Captured BatchedModifications count", 1, batchedModifications.size());
+
+ verifyBatchedModifications(batchedModifications.get(0), true,
+ new WriteModification(TestModel.TEST_PATH, nodeToWrite));
+
+ verify(mockActorContext, never()).executeOperationAsync(eq(actorSelection(actorRef)),
+ isA(ReadyTransaction.SERIALIZABLE_CLASS));
+ }
+
+ @Test
+ public void testReadyWithWriteOnlyAndLastBatchEmpty() throws Exception {
+ dataStoreContextBuilder.shardBatchedModificationCount(1).writeOnlyTransactionOptimizationsEnabled(true);
+ ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), WRITE_ONLY);
+
+ NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
+
+ expectBatchedModificationsReady(actorRef, 1);
+
+ TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, WRITE_ONLY);
+
+ transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
+
+ DOMStoreThreePhaseCommitCohort ready = transactionProxy.ready();
+
+ assertTrue(ready instanceof ThreePhaseCommitCohortProxy);
+
+ ThreePhaseCommitCohortProxy proxy = (ThreePhaseCommitCohortProxy) ready;
+
+ verifyRecordingOperationFutures(transactionProxy.getRecordedOperationFutures(),
+ BatchedModificationsReply.class);
+
+ verifyCohortFutures(proxy, getSystem().actorSelection(actorRef.path()));
+
+ List<BatchedModifications> batchedModifications = captureBatchedModifications(actorRef);
+ assertEquals("Captured BatchedModifications count", 2, batchedModifications.size());
+
+ verifyBatchedModifications(batchedModifications.get(0), false,
+ new WriteModification(TestModel.TEST_PATH, nodeToWrite));
+
+ verifyBatchedModifications(batchedModifications.get(1), true);
+
+ verify(mockActorContext, never()).executeOperationAsync(eq(actorSelection(actorRef)),
+ isA(ReadyTransaction.SERIALIZABLE_CLASS));
}
@Test
public void testReadyWithRecordingOperationFailure() throws Exception {
+ dataStoreContextBuilder.shardBatchedModificationCount(1).writeOnlyTransactionOptimizationsEnabled(true);
+
ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), WRITE_ONLY);
NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
expectFailedBatchedModifications(actorRef);
- expectReadyTransaction(actorRef);
-
doReturn(false).when(mockActorContext).isPathLocal(actorRef.path().toString());
TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, WRITE_ONLY);
@Test
public void testReadyWithReplyFailure() throws Exception {
+ dataStoreContextBuilder.writeOnlyTransactionOptimizationsEnabled(true);
+
ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), WRITE_ONLY);
NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
- expectBatchedModifications(actorRef, 1);
-
- doReturn(Futures.failed(new TestException())).when(mockActorContext).
- executeOperationAsync(eq(actorSelection(actorRef)),
- isA(ReadyTransaction.SERIALIZABLE_CLASS));
+ expectFailedBatchedModifications(actorRef);
TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, WRITE_ONLY);
ThreePhaseCommitCohortProxy proxy = (ThreePhaseCommitCohortProxy) ready;
- verifyRecordingOperationFutures(transactionProxy.getRecordedOperationFutures(),
- BatchedModificationsReply.class);
-
verifyCohortFutures(proxy, TestException.class);
}
@Test
public void testReadyWithInvalidReplyMessageType() throws Exception {
+ dataStoreContextBuilder.writeOnlyTransactionOptimizationsEnabled(true);
ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), WRITE_ONLY);
NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
- expectBatchedModifications(actorRef, 1);
+ //expectBatchedModifications(actorRef, 1);
doReturn(Futures.successful(new Object())).when(mockActorContext).
executeOperationAsync(eq(actorSelection(actorRef)),
- isA(ReadyTransaction.SERIALIZABLE_CLASS));
+ isA(BatchedModifications.class));
TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, WRITE_ONLY);
verifyCohortFutures(proxy, IllegalArgumentException.class);
}
- @Test
- public void testUnusedTransaction() throws Exception {
- TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, READ_WRITE);
-
- DOMStoreThreePhaseCommitCohort ready = transactionProxy.ready();
-
- assertEquals("canCommit", true, ready.canCommit().get());
- ready.preCommit().get();
- ready.commit().get();
- }
-
@Test
public void testGetIdentifier() {
setupActorContextWithInitialCreateTransaction(getSystem(), READ_ONLY);
*/
@Test
public void testLocalTxActorRead() throws Exception {
- ActorSystem actorSystem = getSystem();
- ActorRef shardActorRef = actorSystem.actorOf(Props.create(DoNothingActor.class));
-
- doReturn(actorSystem.actorSelection(shardActorRef.path())).
- when(mockActorContext).actorSelection(shardActorRef.path().toString());
-
- doReturn(Futures.successful(actorSystem.actorSelection(shardActorRef.path()))).
- when(mockActorContext).findPrimaryShardAsync(eq(DefaultShardStrategy.DEFAULT_SHARD));
-
- String actorPath = "akka.tcp://system@127.0.0.1:2550/user/tx-actor";
- CreateTransactionReply createTransactionReply = CreateTransactionReply.newBuilder()
- .setTransactionId("txn-1").setTransactionActorPath(actorPath).build();
-
- doReturn(Futures.successful(createTransactionReply)).when(mockActorContext).
- executeOperationAsync(eq(actorSystem.actorSelection(shardActorRef.path())),
- eqCreateTransaction(memberName, READ_ONLY));
-
- doReturn(true).when(mockActorContext).isPathLocal(actorPath);
+ setupActorContextWithInitialCreateTransaction(getSystem(), READ_ONLY);
+ doReturn(true).when(mockActorContext).isPathLocal(anyString());
TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,READ_ONLY);
@Test
public void testLocalTxActorReady() throws Exception {
- ActorSystem actorSystem = getSystem();
- ActorRef shardActorRef = actorSystem.actorOf(Props.create(DoNothingActor.class));
-
- doReturn(actorSystem.actorSelection(shardActorRef.path())).
- when(mockActorContext).actorSelection(shardActorRef.path().toString());
-
- doReturn(Futures.successful(actorSystem.actorSelection(shardActorRef.path()))).
- when(mockActorContext).findPrimaryShardAsync(eq(DefaultShardStrategy.DEFAULT_SHARD));
-
- String actorPath = "akka.tcp://system@127.0.0.1:2550/user/tx-actor";
- CreateTransactionReply createTransactionReply = CreateTransactionReply.newBuilder().
- setTransactionId("txn-1").setTransactionActorPath(actorPath).
- setMessageVersion(DataStoreVersions.CURRENT_VERSION).build();
-
- doReturn(Futures.successful(createTransactionReply)).when(mockActorContext).
- executeOperationAsync(eq(actorSystem.actorSelection(shardActorRef.path())),
- eqCreateTransaction(memberName, WRITE_ONLY));
-
- doReturn(true).when(mockActorContext).isPathLocal(actorPath);
+ ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_WRITE);
+ doReturn(true).when(mockActorContext).isPathLocal(anyString());
doReturn(batchedModificationsReply(1)).when(mockActorContext).executeOperationAsync(
any(ActorSelection.class), isA(BatchedModifications.class));
- TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, WRITE_ONLY);
+ TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, READ_WRITE);
NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
- verifyRecordingOperationFutures(transactionProxy.getRecordedOperationFutures(),
- BatchedModificationsReply.class);
-
// testing ready
- doReturn(readyTxReply(shardActorRef.path().toString())).when(mockActorContext).executeOperationAsync(
- any(ActorSelection.class), isA(ReadyTransaction.class));
+ doReturn(readyTxReply(actorRef.path().toString())).when(mockActorContext).executeOperationAsync(
+ eq(actorSelection(actorRef)), isA(ReadyTransaction.class));
DOMStoreThreePhaseCommitCohort ready = transactionProxy.ready();
ThreePhaseCommitCohortProxy proxy = (ThreePhaseCommitCohortProxy) ready;
- verifyCohortFutures(proxy, getSystem().actorSelection(shardActorRef.path()));
+ verifyCohortFutures(proxy, getSystem().actorSelection(actorRef.path()));
}
private static interface TransactionProxyOperation {
doReturn(Futures.successful(actorSystem.actorSelection(shardActorRef.path()))).
when(mockActorContext).findPrimaryShardAsync(eq(DefaultShardStrategy.DEFAULT_SHARD));
} else {
- doReturn(Futures.failed(new Exception("not found")))
+ doReturn(Futures.failed(new PrimaryNotFoundException("test")))
.when(mockActorContext).findPrimaryShardAsync(eq(DefaultShardStrategy.DEFAULT_SHARD));
}
- String actorPath = "akka.tcp://system@127.0.0.1:2550/user/tx-actor";
+ ActorRef txActorRef = actorSystem.actorOf(Props.create(DoNothingActor.class));
+ String actorPath = txActorRef.path().toString();
CreateTransactionReply createTransactionReply = CreateTransactionReply.newBuilder().
setTransactionId("txn-1").setTransactionActorPath(actorPath).
setMessageVersion(DataStoreVersions.CURRENT_VERSION).build();
+ doReturn(actorSystem.actorSelection(actorPath)).when(mockActorContext).actorSelection(actorPath);
+
doReturn(Futures.successful(createTransactionReply)).when(mockActorContext).
executeOperationAsync(eq(actorSystem.actorSelection(shardActorRef.path())),
eqCreateTransaction(memberName, READ_WRITE));
- doReturn(true).when(mockActorContext).isPathLocal(actorPath);
+ doReturn(true).when(mockActorContext).isPathLocal(anyString());
TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, READ_WRITE);
@Test
public void testWriteThrottlingWhenShardFound(){
+ dataStoreContextBuilder.shardBatchedModificationCount(1);
throttleOperation(new TransactionProxyOperation() {
@Override
public void run(TransactionProxy transactionProxy) {
@Test
public void testWriteThrottlingWhenShardNotFound(){
// Confirm that there is no throttling when the Shard is not found
+ dataStoreContextBuilder.shardBatchedModificationCount(1);
completeOperation(new TransactionProxyOperation() {
@Override
public void run(TransactionProxy transactionProxy) {
@Test
public void testWriteCompletion(){
+ dataStoreContextBuilder.shardBatchedModificationCount(1);
completeOperation(new TransactionProxyOperation() {
@Override
public void run(TransactionProxy transactionProxy) {
@Test
public void testMergeThrottlingWhenShardFound(){
-
+ dataStoreContextBuilder.shardBatchedModificationCount(1);
throttleOperation(new TransactionProxyOperation() {
@Override
public void run(TransactionProxy transactionProxy) {
@Test
public void testMergeThrottlingWhenShardNotFound(){
-
+ dataStoreContextBuilder.shardBatchedModificationCount(1);
completeOperation(new TransactionProxyOperation() {
@Override
public void run(TransactionProxy transactionProxy) {
@Test
public void testMergeCompletion(){
+ dataStoreContextBuilder.shardBatchedModificationCount(1);
completeOperation(new TransactionProxyOperation() {
@Override
public void run(TransactionProxy transactionProxy) {
@Test
public void testDeleteCompletion(){
+ dataStoreContextBuilder.shardBatchedModificationCount(1);
completeOperation(new TransactionProxyOperation() {
@Override
public void run(TransactionProxy transactionProxy) {
}, 2, true);
}
- @Test
- public void testModificationOperationBatching() throws Throwable {
+ private void testModificationOperationBatching(TransactionType type) throws Exception {
int shardBatchedModificationCount = 3;
- doReturn(dataStoreContextBuilder.shardBatchedModificationCount(shardBatchedModificationCount).build()).
- when(mockActorContext).getDatastoreContext();
+ dataStoreContextBuilder.shardBatchedModificationCount(shardBatchedModificationCount);
- ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_WRITE);
+ ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), type);
expectBatchedModifications(actorRef, shardBatchedModificationCount);
YangInstanceIdentifier deletePath1 = TestModel.TEST_PATH;
YangInstanceIdentifier deletePath2 = TestModel.OUTER_LIST_PATH;
- TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, READ_WRITE);
+ TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, type);
transactionProxy.write(writePath1, writeNode1);
transactionProxy.write(writePath2, writeNode2);
List<BatchedModifications> batchedModifications = captureBatchedModifications(actorRef);
assertEquals("Captured BatchedModifications count", 3, batchedModifications.size());
- verifyBatchedModifications(batchedModifications.get(0), new WriteModification(writePath1, writeNode1),
+ verifyBatchedModifications(batchedModifications.get(0), false, new WriteModification(writePath1, writeNode1),
new WriteModification(writePath2, writeNode2), new DeleteModification(deletePath1));
- verifyBatchedModifications(batchedModifications.get(1), new MergeModification(mergePath1, mergeNode1),
+ verifyBatchedModifications(batchedModifications.get(1), false, new MergeModification(mergePath1, mergeNode1),
new MergeModification(mergePath2, mergeNode2), new WriteModification(writePath3, writeNode3));
- verifyBatchedModifications(batchedModifications.get(2), new MergeModification(mergePath3, mergeNode3),
+ boolean optimizedWriteOnly = type == WRITE_ONLY && dataStoreContextBuilder.build().isWriteOnlyTransactionOptimizationsEnabled();
+ verifyBatchedModifications(batchedModifications.get(2), optimizedWriteOnly, new MergeModification(mergePath3, mergeNode3),
new DeleteModification(deletePath2));
- verifyRecordingOperationFutures(transactionProxy.getRecordedOperationFutures(),
- BatchedModificationsReply.class, BatchedModificationsReply.class, BatchedModificationsReply.class);
+ if(optimizedWriteOnly) {
+ verifyRecordingOperationFutures(transactionProxy.getRecordedOperationFutures(),
+ BatchedModificationsReply.class, BatchedModificationsReply.class);
+ } else {
+ verifyRecordingOperationFutures(transactionProxy.getRecordedOperationFutures(),
+ BatchedModificationsReply.class, BatchedModificationsReply.class, BatchedModificationsReply.class);
+ }
+ }
+
+ @Test
+ public void testReadWriteModificationOperationBatching() throws Throwable {
+ testModificationOperationBatching(READ_WRITE);
+ }
+
+ @Test
+ public void testWriteOnlyModificationOperationBatching() throws Throwable {
+ testModificationOperationBatching(WRITE_ONLY);
+ }
+
+ @Test
+ public void testOptimizedWriteOnlyModificationOperationBatching() throws Throwable {
+ dataStoreContextBuilder.writeOnlyTransactionOptimizationsEnabled(true);
+ testModificationOperationBatching(WRITE_ONLY);
}
@Test
public void testModificationOperationBatchingWithInterleavedReads() throws Throwable {
+
int shardBatchedModificationCount = 10;
- doReturn(dataStoreContextBuilder.shardBatchedModificationCount(shardBatchedModificationCount).build()).
- when(mockActorContext).getDatastoreContext();
+ dataStoreContextBuilder.shardBatchedModificationCount(shardBatchedModificationCount);
ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_WRITE);
List<BatchedModifications> batchedModifications = captureBatchedModifications(actorRef);
assertEquals("Captured BatchedModifications count", 3, batchedModifications.size());
- verifyBatchedModifications(batchedModifications.get(0), new WriteModification(writePath1, writeNode1),
+ verifyBatchedModifications(batchedModifications.get(0), false, new WriteModification(writePath1, writeNode1),
new WriteModification(writePath2, writeNode2));
- verifyBatchedModifications(batchedModifications.get(1), new MergeModification(mergePath1, mergeNode1),
+ verifyBatchedModifications(batchedModifications.get(1), false, new MergeModification(mergePath1, mergeNode1),
new MergeModification(mergePath2, mergeNode2));
- verifyBatchedModifications(batchedModifications.get(2), new DeleteModification(deletePath));
+ verifyBatchedModifications(batchedModifications.get(2), false, new DeleteModification(deletePath));
InOrder inOrder = Mockito.inOrder(mockActorContext);
inOrder.verify(mockActorContext).executeOperationAsync(
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.verify;
import static org.opendaylight.controller.cluster.datastore.TransactionProxy.TransactionType.READ_WRITE;
+import static org.opendaylight.controller.cluster.datastore.TransactionProxy.TransactionType.WRITE_ONLY;
import akka.actor.ActorRef;
import akka.dispatch.Futures;
import com.google.common.base.Optional;
import java.util.concurrent.TimeUnit;
+import org.junit.Ignore;
import org.junit.Test;
import org.mockito.ArgumentMatcher;
import org.mockito.Mockito;
verify(mockActorContext, Mockito.never()).resolvePath(eq(actorRef.path().toString()),
eq(actorRef.path().toString()));
}
+
+ @Test
+ @Ignore
+ // FIXME: disabled until we can get the primary shard version from the ShardManager as we now skip
+ // creating transaction actors for write-only Tx's.
+ public void testWriteOnlyCompatibilityWithHeliumR2Version() throws Exception {
+ short version = DataStoreVersions.HELIUM_2_VERSION;
+ ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), WRITE_ONLY, version);
+
+ NormalizedNode<?, ?> testNode = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
+
+ doReturn(Futures.successful(new WriteDataReply().toSerializable(version))).when(mockActorContext).
+ executeOperationAsync(eq(actorSelection(actorRef)), eqLegacyWriteData(testNode));
+
+ doReturn(readySerializedTxReply(actorRef.path().toString())).when(mockActorContext).executeOperationAsync(
+ eq(actorSelection(actorRef)), isA(ReadyTransaction.SERIALIZABLE_CLASS));
+
+ doReturn(actorRef.path().toString()).when(mockActorContext).resolvePath(eq(actorRef.path().toString()),
+ eq(actorRef.path().toString()));
+
+ TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, WRITE_ONLY);
+
+ transactionProxy.write(TestModel.TEST_PATH, testNode);
+
+ DOMStoreThreePhaseCommitCohort ready = transactionProxy.ready();
+
+ assertTrue(ready instanceof ThreePhaseCommitCohortProxy);
+
+ ThreePhaseCommitCohortProxy proxy = (ThreePhaseCommitCohortProxy) ready;
+
+ verifyCohortFutures(proxy, getSystem().actorSelection(actorRef.path()));
+ }
}
YangInstanceIdentifier deletePath = TestModel.TEST_PATH;
- BatchedModifications batched = new BatchedModifications(DataStoreVersions.CURRENT_VERSION);
+ BatchedModifications batched = new BatchedModifications("tx1", DataStoreVersions.CURRENT_VERSION, "txChain");
batched.addModification(new WriteModification(writePath, writeData));
batched.addModification(new MergeModification(mergePath, mergeData));
batched.addModification(new DeleteModification(deletePath));
+ batched.setReady(true);
BatchedModifications clone = (BatchedModifications) SerializationUtils.clone(
(Serializable) batched.toSerializable());
assertEquals("getVersion", DataStoreVersions.CURRENT_VERSION, clone.getVersion());
+ assertEquals("getTransactionID", "tx1", clone.getTransactionID());
+ assertEquals("getTransactionChainID", "txChain", clone.getTransactionChainID());
+ assertEquals("isReady", true, clone.isReady());
assertEquals("getModifications size", 3, clone.getModifications().size());
DeleteModification delete = (DeleteModification)clone.getModifications().get(2);
assertEquals("getVersion", DataStoreVersions.CURRENT_VERSION, delete.getVersion());
assertEquals("getPath", deletePath, delete.getPath());
+
+ // Test with different params.
+
+ batched = new BatchedModifications("tx2", (short)10, null);
+
+ clone = (BatchedModifications) SerializationUtils.clone((Serializable) batched.toSerializable());
+
+ assertEquals("getVersion", 10, clone.getVersion());
+ assertEquals("getTransactionID", "tx2", clone.getTransactionID());
+ assertEquals("getTransactionChainID", "", clone.getTransactionChainID());
+ assertEquals("isReady", false, clone.isReady());
+
+ assertEquals("getModifications size", 0, clone.getModifications().size());
+
}
@Test
BatchedModificationsReply clone = (BatchedModificationsReply) SerializationUtils.clone(
(Serializable) new BatchedModificationsReply(100).toSerializable());
assertEquals("getNumBatched", 100, clone.getNumBatched());
+ assertEquals("getCohortPath", null, clone.getCohortPath());
+
+ clone = (BatchedModificationsReply) SerializationUtils.clone(
+ (Serializable) new BatchedModificationsReply(50, "cohort path").toSerializable());
+ assertEquals("getNumBatched", 50, clone.getNumBatched());
+ assertEquals("getCohortPath", "cohort path", clone.getCohortPath());
}
}