import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
import com.google.common.util.concurrent.CheckedFuture;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.SettableFuture;
import java.util.ArrayList;
-import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ExecutionException;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
-import javax.annotation.concurrent.GuardedBy;
import org.opendaylight.controller.cluster.datastore.compat.PreLithiumTransactionContextImpl;
-import org.opendaylight.controller.cluster.datastore.exceptions.NoShardLeaderException;
import org.opendaylight.controller.cluster.datastore.identifiers.TransactionIdentifier;
-import org.opendaylight.controller.cluster.datastore.messages.CreateTransaction;
-import org.opendaylight.controller.cluster.datastore.messages.CreateTransactionReply;
+import org.opendaylight.controller.cluster.datastore.messages.PrimaryShardInfo;
import org.opendaylight.controller.cluster.datastore.shardstrategy.ShardStrategyFactory;
import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
+import org.opendaylight.controller.cluster.datastore.utils.NormalizedNodeAggregator;
import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
import org.opendaylight.controller.sal.core.spi.data.AbstractDOMStoreTransaction;
import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadWriteTransaction;
import org.opendaylight.yangtools.util.concurrent.MappingCheckedFuture;
import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
+import org.opendaylight.yangtools.yang.data.api.schema.tree.DataValidationFailedException;
import org.opendaylight.yangtools.yang.model.api.SchemaContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.concurrent.Future;
import scala.concurrent.Promise;
-import scala.concurrent.duration.FiniteDuration;
/**
* TransactionProxy acts as a proxy for one or more transactions that were created on a remote shard
private static final Logger LOG = LoggerFactory.getLogger(TransactionProxy.class);
- /**
- * Time interval in between transaction create retries.
- */
- private static final FiniteDuration CREATE_TX_TRY_INTERVAL =
- FiniteDuration.create(1, TimeUnit.SECONDS);
-
/**
* Stores the remote Tx actors for each requested data store path to be used by the
* PhantomReference to close the remote Tx's. This is only used for read-only Tx's. The
return new TransactionIdentifier(memberName, counter.getAndIncrement());
}
- @VisibleForTesting
- List<Future<Object>> getRecordedOperationFutures() {
- List<Future<Object>> recordedOperationFutures = Lists.newArrayList();
- for(TransactionFutureCallback txFutureCallback : txFutureCallbackMap.values()) {
- TransactionContext transactionContext = txFutureCallback.getTransactionContext();
- if (transactionContext != null) {
- transactionContext.copyRecordedOperationFutures(recordedOperationFutures);
- }
- }
-
- return recordedOperationFutures;
- }
-
@VisibleForTesting
boolean hasTransactionContext() {
for(TransactionFutureCallback txFutureCallback : txFutureCallbackMap.values()) {
return false;
}
+ private static boolean isRootPath(YangInstanceIdentifier path) {
+ return !path.getPathArguments().iterator().hasNext();
+ }
+
@Override
public CheckedFuture<Optional<NormalizedNode<?, ?>>, ReadFailedException> read(final YangInstanceIdentifier path) {
LOG.debug("Tx {} read {}", getIdentifier(), path);
- throttleOperation();
-
final SettableFuture<Optional<NormalizedNode<?, ?>>> proxyFuture = SettableFuture.create();
- TransactionFutureCallback txFutureCallback = getOrCreateTxFutureCallback(path);
- txFutureCallback.enqueueTransactionOperation(new TransactionOperation() {
- @Override
- public void invoke(TransactionContext transactionContext) {
- transactionContext.readData(path, proxyFuture);
- }
- });
+ if(isRootPath(path)){
+ readAllData(path, proxyFuture);
+ } else {
+ throttleOperation();
+
+ TransactionFutureCallback txFutureCallback = getOrCreateTxFutureCallback(path);
+ txFutureCallback.enqueueTransactionOperation(new TransactionOperation() {
+ @Override
+ public void invoke(TransactionContext transactionContext) {
+ transactionContext.readData(path, proxyFuture);
+ }
+ });
+
+ }
return MappingCheckedFuture.create(proxyFuture, ReadFailedException.MAPPER);
}
+ private void readAllData(final YangInstanceIdentifier path,
+ final SettableFuture<Optional<NormalizedNode<?, ?>>> proxyFuture) {
+ Set<String> allShardNames = actorContext.getConfiguration().getAllShardNames();
+ List<SettableFuture<Optional<NormalizedNode<?, ?>>>> futures = new ArrayList<>(allShardNames.size());
+
+ for(String shardName : allShardNames){
+ final SettableFuture<Optional<NormalizedNode<?, ?>>> subProxyFuture = SettableFuture.create();
+
+ throttleOperation();
+
+ TransactionFutureCallback txFutureCallback = getOrCreateTxFutureCallback(shardName);
+ txFutureCallback.enqueueTransactionOperation(new TransactionOperation() {
+ @Override
+ public void invoke(TransactionContext transactionContext) {
+ transactionContext.readData(path, subProxyFuture);
+ }
+ });
+
+ futures.add(subProxyFuture);
+ }
+
+ final ListenableFuture<List<Optional<NormalizedNode<?, ?>>>> future = Futures.allAsList(futures);
+
+ future.addListener(new Runnable() {
+ @Override
+ public void run() {
+ try {
+ proxyFuture.set(NormalizedNodeAggregator.aggregate(YangInstanceIdentifier.builder().build(),
+ future.get(), actorContext.getSchemaContext()));
+ } catch (DataValidationFailedException | InterruptedException | ExecutionException e) {
+ proxyFuture.setException(e);
+ }
+ }
+ }, actorContext.getActorSystem().dispatcher());
+ }
+
@Override
public CheckedFuture<Boolean, ReadFailedException> exists(final YangInstanceIdentifier path) {
}
}
+ final void ensureInitializied() {
+ Preconditions.checkState(initialized, "Transaction %s was not propertly initialized.", getIdentifier());
+ }
+
@Override
public void write(final YangInstanceIdentifier path, final NormalizedNode<?, ?> data) {
}
@Override
- public AbstractThreePhaseCommitCohort ready() {
+ public AbstractThreePhaseCommitCohort<?> ready() {
Preconditions.checkState(transactionType != TransactionType.READ_ONLY,
"Read-only transactions cannot be readied");
throttleOperation(txFutureCallbackMap.size());
+ final boolean isSingleShard = txFutureCallbackMap.size() == 1;
+ return isSingleShard ? createSingleCommitCohort() : createMultiCommitCohort();
+ }
+
+ @SuppressWarnings({ "rawtypes", "unchecked" })
+ private AbstractThreePhaseCommitCohort<Object> createSingleCommitCohort() {
+ TransactionFutureCallback txFutureCallback = txFutureCallbackMap.values().iterator().next();
+
+ LOG.debug("Tx {} Readying transaction for shard {} on chain {}", getIdentifier(),
+ txFutureCallback.getShardName(), transactionChainId);
+
+ final OperationCallback.Reference operationCallbackRef =
+ new OperationCallback.Reference(OperationCallback.NO_OP_CALLBACK);
+ final TransactionContext transactionContext = txFutureCallback.getTransactionContext();
+ final Future future;
+ if (transactionContext != null) {
+ // avoid the creation of a promise and a TransactionOperation
+ future = getReadyOrDirectCommitFuture(transactionContext, operationCallbackRef);
+ } else {
+ final Promise promise = akka.dispatch.Futures.promise();
+ txFutureCallback.enqueueTransactionOperation(new TransactionOperation() {
+ @Override
+ public void invoke(TransactionContext transactionContext) {
+ promise.completeWith(getReadyOrDirectCommitFuture(transactionContext, operationCallbackRef));
+ }
+ });
+ future = promise.future();
+ }
+
+ return new SingleCommitCohortProxy(actorContext, future, getIdentifier().toString(), operationCallbackRef);
+ }
+
+ private Future<?> getReadyOrDirectCommitFuture(TransactionContext transactionContext,
+ OperationCallback.Reference operationCallbackRef) {
+ if(transactionContext.supportsDirectCommit()) {
+ TransactionRateLimitingCallback rateLimitingCallback = new TransactionRateLimitingCallback(actorContext);
+ operationCallbackRef.set(rateLimitingCallback);
+ rateLimitingCallback.run();
+ return transactionContext.directCommit();
+ } else {
+ return transactionContext.readyTransaction();
+ }
+ }
+
+ private AbstractThreePhaseCommitCohort<ActorSelection> createMultiCommitCohort() {
List<Future<ActorSelection>> cohortFutures = new ArrayList<>(txFutureCallbackMap.size());
for(TransactionFutureCallback txFutureCallback : txFutureCallbackMap.values()) {
- LOG.debug("Tx {} Readying transaction for shard {} chain {}", getIdentifier(),
+ LOG.debug("Tx {} Readying transaction for shard {} on chain {}", getIdentifier(),
txFutureCallback.getShardName(), transactionChainId);
final TransactionContext transactionContext = txFutureCallback.getTransactionContext();
cohortFutures.add(future);
}
- return new ThreePhaseCommitCohortProxy(actorContext, cohortFutures,
- getIdentifier().toString());
+ return new ThreePhaseCommitCohortProxy(actorContext, cohortFutures, getIdentifier().toString());
}
@Override
return ShardStrategyFactory.getStrategy(path).findShard(path);
}
- protected Future<ActorSelection> sendFindPrimaryShardAsync(String shardName) {
+ protected Future<PrimaryShardInfo> sendFindPrimaryShardAsync(String shardName) {
return actorContext.findPrimaryShardAsync(shardName);
}
+ final TransactionType getTransactionType() {
+ return transactionType;
+ }
+
+ final Semaphore getOperationLimiter() {
+ return operationLimiter;
+ }
+
private TransactionFutureCallback getOrCreateTxFutureCallback(YangInstanceIdentifier path) {
String shardName = shardNameFromIdentifier(path);
+ return getOrCreateTxFutureCallback(shardName);
+ }
+
+ private TransactionFutureCallback getOrCreateTxFutureCallback(String shardName) {
TransactionFutureCallback txFutureCallback = txFutureCallbackMap.get(shardName);
if(txFutureCallback == null) {
- Future<ActorSelection> findPrimaryFuture = sendFindPrimaryShardAsync(shardName);
+ Future<PrimaryShardInfo> findPrimaryFuture = sendFindPrimaryShardAsync(shardName);
- final TransactionFutureCallback newTxFutureCallback = new TransactionFutureCallback(shardName);
+ final TransactionFutureCallback newTxFutureCallback = new TransactionFutureCallback(this, shardName);
txFutureCallback = newTxFutureCallback;
txFutureCallbackMap.put(shardName, txFutureCallback);
- findPrimaryFuture.onComplete(new OnComplete<ActorSelection>() {
+ findPrimaryFuture.onComplete(new OnComplete<PrimaryShardInfo>() {
@Override
- public void onComplete(Throwable failure, ActorSelection primaryShard) {
+ public void onComplete(Throwable failure, PrimaryShardInfo primaryShardInfo) {
if(failure != null) {
newTxFutureCallback.createTransactionContext(failure, null);
} else {
- newTxFutureCallback.setPrimaryShard(primaryShard);
+ newTxFutureCallback.setPrimaryShard(primaryShardInfo.getPrimaryShardActor());
}
}
}, actorContext.getClientDispatcher());
return txFutureCallback;
}
- public String getTransactionChainId() {
+ String getTransactionChainId() {
return transactionChainId;
}
return actorContext;
}
- /**
- * Implements a Future OnComplete callback for a CreateTransaction message. This class handles
- * retries, up to a limit, if the shard doesn't have a leader yet. This is done by scheduling a
- * retry task after a short delay.
- * <p>
- * The end result from a completed CreateTransaction message is a TransactionContext that is
- * used to perform transaction operations. Transaction operations that occur before the
- * CreateTransaction completes are cache and executed once the CreateTransaction completes,
- * successfully or not.
- */
- private class TransactionFutureCallback extends OnComplete<Object> {
-
- /**
- * The list of transaction operations to execute once the CreateTransaction completes.
- */
- @GuardedBy("txOperationsOnComplete")
- private final List<TransactionOperation> txOperationsOnComplete = Lists.newArrayList();
-
- /**
- * The TransactionContext resulting from the CreateTransaction reply.
- */
- private volatile TransactionContext transactionContext;
-
- /**
- * The target primary shard.
- */
- private volatile ActorSelection primaryShard;
-
- private volatile int createTxTries = (int) (actorContext.getDatastoreContext().
- getShardLeaderElectionTimeout().duration().toMillis() /
- CREATE_TX_TRY_INTERVAL.toMillis());
-
- private final String shardName;
-
- TransactionFutureCallback(String shardName) {
- this.shardName = shardName;
- }
-
- String getShardName() {
- return shardName;
- }
-
- TransactionContext getTransactionContext() {
- return transactionContext;
- }
-
-
- /**
- * Sets the target primary shard and initiates a CreateTransaction try.
- */
- void setPrimaryShard(ActorSelection primaryShard) {
- this.primaryShard = primaryShard;
-
- if(transactionType == TransactionType.WRITE_ONLY &&
- actorContext.getDatastoreContext().isWriteOnlyTransactionOptimizationsEnabled()) {
- LOG.debug("Tx {} Primary shard {} found - creating WRITE_ONLY transaction context",
- getIdentifier(), primaryShard);
-
- // For write-only Tx's we prepare the transaction modifications directly on the shard actor
- // to avoid the overhead of creating a separate transaction actor.
- // FIXME: can't assume the shard version is LITHIUM_VERSION - need to obtain it somehow.
- executeTxOperatonsOnComplete(createValidTransactionContext(this.primaryShard,
- this.primaryShard.path().toString(), DataStoreVersions.LITHIUM_VERSION));
- } else {
- tryCreateTransaction();
- }
- }
-
- /**
- * Adds a TransactionOperation to be executed after the CreateTransaction completes.
- */
- void addTxOperationOnComplete(TransactionOperation operation) {
- boolean invokeOperation = true;
- synchronized(txOperationsOnComplete) {
- if(transactionContext == null) {
- LOG.debug("Tx {} Adding operation on complete", getIdentifier());
-
- invokeOperation = false;
- txOperationsOnComplete.add(operation);
- }
- }
-
- if(invokeOperation) {
- operation.invoke(transactionContext);
- }
- }
-
- void enqueueTransactionOperation(final TransactionOperation op) {
-
- if (transactionContext != null) {
- op.invoke(transactionContext);
- } else {
- // The shard Tx hasn't been created yet so add the Tx operation to the Tx Future
- // callback to be executed after the Tx is created.
- addTxOperationOnComplete(op);
- }
- }
-
- /**
- * Performs a CreateTransaction try async.
- */
- private void tryCreateTransaction() {
- if(LOG.isDebugEnabled()) {
- LOG.debug("Tx {} Primary shard {} found - trying create transaction", getIdentifier(), primaryShard);
- }
-
- Object serializedCreateMessage = new CreateTransaction(getIdentifier().toString(),
- TransactionProxy.this.transactionType.ordinal(),
- getTransactionChainId()).toSerializable();
-
- Future<Object> createTxFuture = actorContext.executeOperationAsync(primaryShard, serializedCreateMessage);
-
- createTxFuture.onComplete(this, actorContext.getClientDispatcher());
- }
-
- @Override
- public void onComplete(Throwable failure, Object response) {
- if(failure instanceof NoShardLeaderException) {
- // There's no leader for the shard yet - schedule and try again, unless we're out
- // of retries. Note: createTxTries is volatile as it may be written by different
- // threads however not concurrently, therefore decrementing it non-atomically here
- // is ok.
- if(--createTxTries > 0) {
- LOG.debug("Tx {} Shard {} has no leader yet - scheduling create Tx retry",
- getIdentifier(), shardName);
-
- actorContext.getActorSystem().scheduler().scheduleOnce(CREATE_TX_TRY_INTERVAL,
- new Runnable() {
- @Override
- public void run() {
- tryCreateTransaction();
- }
- }, actorContext.getClientDispatcher());
- return;
- }
- }
+ TransactionContext createValidTransactionContext(ActorSelection transactionActor,
+ String transactionPath, short remoteTransactionVersion) {
- createTransactionContext(failure, response);
- }
+ if (transactionType == TransactionType.READ_ONLY) {
+ // Read-only Tx's aren't explicitly closed by the client so we create a PhantomReference
+ // to close the remote Tx's when this instance is no longer in use and is garbage
+ // collected.
- private void createTransactionContext(Throwable failure, Object response) {
- // Mainly checking for state violation here to perform a volatile read of "initialized" to
- // ensure updates to operationLimter et al are visible to this thread (ie we're doing
- // "piggy-back" synchronization here).
- Preconditions.checkState(initialized, "Tx was not propertly initialized.");
-
- // Create the TransactionContext from the response or failure. Store the new
- // TransactionContext locally until we've completed invoking the
- // TransactionOperations. This avoids thread timing issues which could cause
- // out-of-order TransactionOperations. Eg, on a modification operation, if the
- // TransactionContext is non-null, then we directly call the TransactionContext.
- // However, at the same time, the code may be executing the cached
- // TransactionOperations. So to avoid thus timing, we don't publish the
- // TransactionContext until after we've executed all cached TransactionOperations.
- TransactionContext localTransactionContext;
- if(failure != null) {
- LOG.debug("Tx {} Creating NoOpTransaction because of error", getIdentifier(), failure);
-
- localTransactionContext = new NoOpTransactionContext(failure, getIdentifier(), operationLimiter);
- } else if (CreateTransactionReply.SERIALIZABLE_CLASS.equals(response.getClass())) {
- localTransactionContext = createValidTransactionContext(
- CreateTransactionReply.fromSerializable(response));
- } else {
- IllegalArgumentException exception = new IllegalArgumentException(String.format(
- "Invalid reply type %s for CreateTransaction", response.getClass()));
+ if(remoteTransactionActorsMB == null) {
+ remoteTransactionActors = Lists.newArrayList();
+ remoteTransactionActorsMB = new AtomicBoolean();
- localTransactionContext = new NoOpTransactionContext(exception, getIdentifier(), operationLimiter);
+ TransactionProxyCleanupPhantomReference.track(TransactionProxy.this);
}
- executeTxOperatonsOnComplete(localTransactionContext);
- }
-
- private void executeTxOperatonsOnComplete(TransactionContext localTransactionContext) {
- while(true) {
- // Access to txOperationsOnComplete and transactionContext must be protected and atomic
- // (ie synchronized) with respect to #addTxOperationOnComplete to handle timing
- // issues and ensure no TransactionOperation is missed and that they are processed
- // in the order they occurred.
-
- // We'll make a local copy of the txOperationsOnComplete list to handle re-entrancy
- // in case a TransactionOperation results in another transaction operation being
- // queued (eg a put operation from a client read Future callback that is notified
- // synchronously).
- Collection<TransactionOperation> operationsBatch = null;
- synchronized(txOperationsOnComplete) {
- if(txOperationsOnComplete.isEmpty()) {
- // We're done invoking the TransactionOperations so we can now publish the
- // TransactionContext.
- transactionContext = localTransactionContext;
- break;
- }
+ // Add the actor to the remoteTransactionActors list for access by the
+ // cleanup PhantonReference.
+ remoteTransactionActors.add(transactionActor);
- operationsBatch = new ArrayList<>(txOperationsOnComplete);
- txOperationsOnComplete.clear();
- }
-
- // Invoke TransactionOperations outside the sync block to avoid unnecessary blocking.
- // A slight down-side is that we need to re-acquire the lock below but this should
- // be negligible.
- for(TransactionOperation oper: operationsBatch) {
- oper.invoke(localTransactionContext);
- }
- }
- }
-
- private TransactionContext createValidTransactionContext(CreateTransactionReply reply) {
- LOG.debug("Tx {} Received {}", getIdentifier(), reply);
-
- return createValidTransactionContext(actorContext.actorSelection(reply.getTransactionPath()),
- reply.getTransactionPath(), reply.getVersion());
+ // Write to the memory barrier volatile to publish the above update to the
+ // remoteTransactionActors list for thread visibility.
+ remoteTransactionActorsMB.set(true);
}
- private TransactionContext createValidTransactionContext(ActorSelection transactionActor,
- String transactionPath, short remoteTransactionVersion) {
-
- if (transactionType == TransactionType.READ_ONLY) {
- // Read-only Tx's aren't explicitly closed by the client so we create a PhantomReference
- // to close the remote Tx's when this instance is no longer in use and is garbage
- // collected.
-
- if(remoteTransactionActorsMB == null) {
- remoteTransactionActors = Lists.newArrayList();
- remoteTransactionActorsMB = new AtomicBoolean();
+ // TxActor is always created where the leader of the shard is.
+ // Check if TxActor is created in the same node
+ boolean isTxActorLocal = actorContext.isPathLocal(transactionPath);
- TransactionProxyCleanupPhantomReference.track(TransactionProxy.this);
- }
-
- // Add the actor to the remoteTransactionActors list for access by the
- // cleanup PhantonReference.
- remoteTransactionActors.add(transactionActor);
-
- // Write to the memory barrier volatile to publish the above update to the
- // remoteTransactionActors list for thread visibility.
- remoteTransactionActorsMB.set(true);
- }
-
- // TxActor is always created where the leader of the shard is.
- // Check if TxActor is created in the same node
- boolean isTxActorLocal = actorContext.isPathLocal(transactionPath);
-
- if(remoteTransactionVersion < DataStoreVersions.LITHIUM_VERSION) {
- return new PreLithiumTransactionContextImpl(transactionPath, transactionActor, getIdentifier(),
- transactionChainId, actorContext, schemaContext, isTxActorLocal, remoteTransactionVersion,
- operationCompleter);
- } else if (transactionType == TransactionType.WRITE_ONLY &&
- actorContext.getDatastoreContext().isWriteOnlyTransactionOptimizationsEnabled()) {
- return new WriteOnlyTransactionContextImpl(transactionActor, getIdentifier(), transactionChainId,
+ if(remoteTransactionVersion < DataStoreVersions.LITHIUM_VERSION) {
+ return new PreLithiumTransactionContextImpl(transactionPath, transactionActor, getIdentifier(),
+ transactionChainId, actorContext, schemaContext, isTxActorLocal, remoteTransactionVersion,
+ operationCompleter);
+ } else {
+ return new TransactionContextImpl(transactionActor, getIdentifier(), transactionChainId,
actorContext, schemaContext, isTxActorLocal, remoteTransactionVersion, operationCompleter);
- } else {
- return new TransactionContextImpl(transactionActor, getIdentifier(), transactionChainId,
- actorContext, schemaContext, isTxActorLocal, remoteTransactionVersion, operationCompleter);
- }
}
}
}