/* * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved. * * This program and the accompanying materials are made available under the * terms of the Eclipse Public License v1.0 which accompanies this distribution, * and is available at http://www.eclipse.org/legal/epl-v10.html */ package org.opendaylight.controller.cluster.datastore; import akka.actor.ActorSelection; import akka.dispatch.Mapper; import akka.dispatch.OnComplete; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Optional; import com.google.common.base.Preconditions; import com.google.common.collect.Lists; import com.google.common.util.concurrent.CheckedFuture; import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.SettableFuture; import java.util.ArrayList; import java.util.Collection; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Set; import java.util.concurrent.ExecutionException; import java.util.concurrent.Semaphore; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; import javax.annotation.concurrent.GuardedBy; import org.opendaylight.controller.cluster.datastore.compat.PreLithiumTransactionContextImpl; import org.opendaylight.controller.cluster.datastore.exceptions.NoShardLeaderException; import org.opendaylight.controller.cluster.datastore.identifiers.TransactionIdentifier; import org.opendaylight.controller.cluster.datastore.messages.CreateTransaction; import org.opendaylight.controller.cluster.datastore.messages.CreateTransactionReply; import org.opendaylight.controller.cluster.datastore.shardstrategy.ShardStrategyFactory; import org.opendaylight.controller.cluster.datastore.utils.ActorContext; import org.opendaylight.controller.cluster.datastore.utils.NormalizedNodeAggregator; import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException; import org.opendaylight.controller.sal.core.spi.data.AbstractDOMStoreTransaction; import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadWriteTransaction; import org.opendaylight.yangtools.util.concurrent.MappingCheckedFuture; import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode; import org.opendaylight.yangtools.yang.model.api.SchemaContext; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import scala.concurrent.Future; import scala.concurrent.Promise; import scala.concurrent.duration.FiniteDuration; /** * TransactionProxy acts as a proxy for one or more transactions that were created on a remote shard *

* Creating a transaction on the consumer side will create one instance of a transaction proxy. If during * the transaction reads and writes are done on data that belongs to different shards then a separate transaction will * be created on each of those shards by the TransactionProxy *

*

* The TransactionProxy does not make any guarantees about atomicity or order in which the transactions on the various * shards will be executed. *

*/ public class TransactionProxy extends AbstractDOMStoreTransaction implements DOMStoreReadWriteTransaction { public static enum TransactionType { READ_ONLY, WRITE_ONLY, READ_WRITE; // Cache all values private static final TransactionType[] VALUES = values(); public static TransactionType fromInt(final int type) { try { return VALUES[type]; } catch (IndexOutOfBoundsException e) { throw new IllegalArgumentException("In TransactionType enum value " + type, e); } } } private static enum TransactionState { OPEN, READY, CLOSED, } static final Mapper SAME_FAILURE_TRANSFORMER = new Mapper() { @Override public Throwable apply(Throwable failure) { return failure; } }; private static final AtomicLong counter = new AtomicLong(); private static final Logger LOG = LoggerFactory.getLogger(TransactionProxy.class); /** * Time interval in between transaction create retries. */ private static final FiniteDuration CREATE_TX_TRY_INTERVAL = FiniteDuration.create(1, TimeUnit.SECONDS); /** * Stores the remote Tx actors for each requested data store path to be used by the * PhantomReference to close the remote Tx's. This is only used for read-only Tx's. The * remoteTransactionActorsMB volatile serves as a memory barrier to publish updates to the * remoteTransactionActors list so they will be visible to the thread accessing the * PhantomReference. */ List remoteTransactionActors; volatile AtomicBoolean remoteTransactionActorsMB; /** * Stores the create transaction results per shard. */ private final Map txFutureCallbackMap = new HashMap<>(); private final TransactionType transactionType; final ActorContext actorContext; private final String transactionChainId; private final SchemaContext schemaContext; private TransactionState state = TransactionState.OPEN; private volatile boolean initialized; private Semaphore operationLimiter; private OperationCompleter operationCompleter; public TransactionProxy(ActorContext actorContext, TransactionType transactionType) { this(actorContext, transactionType, ""); } public TransactionProxy(ActorContext actorContext, TransactionType transactionType, String transactionChainId) { super(createIdentifier(actorContext)); this.actorContext = Preconditions.checkNotNull(actorContext, "actorContext should not be null"); this.transactionType = Preconditions.checkNotNull(transactionType, "transactionType should not be null"); this.schemaContext = Preconditions.checkNotNull(actorContext.getSchemaContext(), "schemaContext should not be null"); this.transactionChainId = transactionChainId; LOG.debug("Created txn {} of type {} on chain {}", getIdentifier(), transactionType, transactionChainId); } private static TransactionIdentifier createIdentifier(ActorContext actorContext) { String memberName = actorContext.getCurrentMemberName(); if (memberName == null) { memberName = "UNKNOWN-MEMBER"; } return new TransactionIdentifier(memberName, counter.getAndIncrement()); } @VisibleForTesting boolean hasTransactionContext() { for(TransactionFutureCallback txFutureCallback : txFutureCallbackMap.values()) { TransactionContext transactionContext = txFutureCallback.getTransactionContext(); if(transactionContext != null) { return true; } } return false; } private boolean isRootPath(YangInstanceIdentifier path){ return !path.getPathArguments().iterator().hasNext(); } @Override public CheckedFuture>, ReadFailedException> read(final YangInstanceIdentifier path) { Preconditions.checkState(transactionType != TransactionType.WRITE_ONLY, "Read operation on write-only transaction is not allowed"); LOG.debug("Tx {} read {}", getIdentifier(), path); final SettableFuture>> proxyFuture = SettableFuture.create(); if(isRootPath(path)){ readAllData(path, proxyFuture); } else { throttleOperation(); TransactionFutureCallback txFutureCallback = getOrCreateTxFutureCallback(path); txFutureCallback.enqueueTransactionOperation(new TransactionOperation() { @Override public void invoke(TransactionContext transactionContext) { transactionContext.readData(path, proxyFuture); } }); } return MappingCheckedFuture.create(proxyFuture, ReadFailedException.MAPPER); } private void readAllData(final YangInstanceIdentifier path, final SettableFuture>> proxyFuture) { Set allShardNames = actorContext.getConfiguration().getAllShardNames(); List>>> futures = new ArrayList<>(allShardNames.size()); for(String shardName : allShardNames){ final SettableFuture>> subProxyFuture = SettableFuture.create(); throttleOperation(); TransactionFutureCallback txFutureCallback = getOrCreateTxFutureCallback(shardName); txFutureCallback.enqueueTransactionOperation(new TransactionOperation() { @Override public void invoke(TransactionContext transactionContext) { transactionContext.readData(path, subProxyFuture); } }); futures.add(subProxyFuture); } final ListenableFuture>>> future = Futures.allAsList(futures); future.addListener(new Runnable() { @Override public void run() { try { proxyFuture.set(NormalizedNodeAggregator.aggregate(YangInstanceIdentifier.builder().build(), future.get(), actorContext.getSchemaContext())); } catch (InterruptedException | ExecutionException e) { proxyFuture.setException(e); } } }, actorContext.getActorSystem().dispatcher()); } @Override public CheckedFuture exists(final YangInstanceIdentifier path) { Preconditions.checkState(transactionType != TransactionType.WRITE_ONLY, "Exists operation on write-only transaction is not allowed"); LOG.debug("Tx {} exists {}", getIdentifier(), path); throttleOperation(); final SettableFuture proxyFuture = SettableFuture.create(); TransactionFutureCallback txFutureCallback = getOrCreateTxFutureCallback(path); txFutureCallback.enqueueTransactionOperation(new TransactionOperation() { @Override public void invoke(TransactionContext transactionContext) { transactionContext.dataExists(path, proxyFuture); } }); return MappingCheckedFuture.create(proxyFuture, ReadFailedException.MAPPER); } private void checkModificationState() { Preconditions.checkState(transactionType != TransactionType.READ_ONLY, "Modification operation on read-only transaction is not allowed"); Preconditions.checkState(state == TransactionState.OPEN, "Transaction is sealed - further modifications are not allowed"); } private void throttleOperation() { throttleOperation(1); } private void throttleOperation(int acquirePermits) { if(!initialized) { // Note : Currently mailbox-capacity comes from akka.conf and not from the config-subsystem operationLimiter = new Semaphore(actorContext.getTransactionOutstandingOperationLimit()); operationCompleter = new OperationCompleter(operationLimiter); // Make sure we write this last because it's volatile and will also publish the non-volatile writes // above as well so they'll be visible to other threads. initialized = true; } try { if(!operationLimiter.tryAcquire(acquirePermits, actorContext.getDatastoreContext().getOperationTimeoutInSeconds(), TimeUnit.SECONDS)){ LOG.warn("Failed to acquire operation permit for transaction {}", getIdentifier()); } } catch (InterruptedException e) { if(LOG.isDebugEnabled()) { LOG.debug("Interrupted when trying to acquire operation permit for transaction " + getIdentifier().toString(), e); } else { LOG.warn("Interrupted when trying to acquire operation permit for transaction {}", getIdentifier()); } } } @Override public void write(final YangInstanceIdentifier path, final NormalizedNode data) { checkModificationState(); LOG.debug("Tx {} write {}", getIdentifier(), path); throttleOperation(); TransactionFutureCallback txFutureCallback = getOrCreateTxFutureCallback(path); txFutureCallback.enqueueTransactionOperation(new TransactionOperation() { @Override public void invoke(TransactionContext transactionContext) { transactionContext.writeData(path, data); } }); } @Override public void merge(final YangInstanceIdentifier path, final NormalizedNode data) { checkModificationState(); LOG.debug("Tx {} merge {}", getIdentifier(), path); throttleOperation(); TransactionFutureCallback txFutureCallback = getOrCreateTxFutureCallback(path); txFutureCallback.enqueueTransactionOperation(new TransactionOperation() { @Override public void invoke(TransactionContext transactionContext) { transactionContext.mergeData(path, data); } }); } @Override public void delete(final YangInstanceIdentifier path) { checkModificationState(); LOG.debug("Tx {} delete {}", getIdentifier(), path); throttleOperation(); TransactionFutureCallback txFutureCallback = getOrCreateTxFutureCallback(path); txFutureCallback.enqueueTransactionOperation(new TransactionOperation() { @Override public void invoke(TransactionContext transactionContext) { transactionContext.deleteData(path); } }); } private boolean seal(final TransactionState newState) { if (state == TransactionState.OPEN) { state = newState; return true; } else { return false; } } @Override public AbstractThreePhaseCommitCohort ready() { Preconditions.checkState(transactionType != TransactionType.READ_ONLY, "Read-only transactions cannot be readied"); final boolean success = seal(TransactionState.READY); Preconditions.checkState(success, "Transaction %s is %s, it cannot be readied", getIdentifier(), state); LOG.debug("Tx {} Readying {} transactions for commit", getIdentifier(), txFutureCallbackMap.size()); if (txFutureCallbackMap.isEmpty()) { TransactionRateLimitingCallback.adjustRateLimitForUnusedTransaction(actorContext); return NoOpDOMStoreThreePhaseCommitCohort.INSTANCE; } throttleOperation(txFutureCallbackMap.size()); List> cohortFutures = new ArrayList<>(txFutureCallbackMap.size()); for(TransactionFutureCallback txFutureCallback : txFutureCallbackMap.values()) { LOG.debug("Tx {} Readying transaction for shard {} chain {}", getIdentifier(), txFutureCallback.getShardName(), transactionChainId); final TransactionContext transactionContext = txFutureCallback.getTransactionContext(); final Future future; if (transactionContext != null) { // avoid the creation of a promise and a TransactionOperation future = transactionContext.readyTransaction(); } else { final Promise promise = akka.dispatch.Futures.promise(); txFutureCallback.enqueueTransactionOperation(new TransactionOperation() { @Override public void invoke(TransactionContext transactionContext) { promise.completeWith(transactionContext.readyTransaction()); } }); future = promise.future(); } cohortFutures.add(future); } return new ThreePhaseCommitCohortProxy(actorContext, cohortFutures, getIdentifier().toString()); } @Override public void close() { if (!seal(TransactionState.CLOSED)) { if (state == TransactionState.CLOSED) { // Idempotent no-op as per AutoCloseable recommendation return; } throw new IllegalStateException(String.format("Transaction %s is ready, it cannot be closed", getIdentifier())); } for (TransactionFutureCallback txFutureCallback : txFutureCallbackMap.values()) { txFutureCallback.enqueueTransactionOperation(new TransactionOperation() { @Override public void invoke(TransactionContext transactionContext) { transactionContext.closeTransaction(); } }); } txFutureCallbackMap.clear(); if(remoteTransactionActorsMB != null) { remoteTransactionActors.clear(); remoteTransactionActorsMB.set(true); } } private String shardNameFromIdentifier(YangInstanceIdentifier path){ return ShardStrategyFactory.getStrategy(path).findShard(path); } protected Future sendFindPrimaryShardAsync(String shardName) { return actorContext.findPrimaryShardAsync(shardName); } private TransactionFutureCallback getOrCreateTxFutureCallback(YangInstanceIdentifier path) { String shardName = shardNameFromIdentifier(path); return getOrCreateTxFutureCallback(shardName); } private TransactionFutureCallback getOrCreateTxFutureCallback(String shardName) { TransactionFutureCallback txFutureCallback = txFutureCallbackMap.get(shardName); if(txFutureCallback == null) { Future findPrimaryFuture = sendFindPrimaryShardAsync(shardName); final TransactionFutureCallback newTxFutureCallback = new TransactionFutureCallback(shardName); txFutureCallback = newTxFutureCallback; txFutureCallbackMap.put(shardName, txFutureCallback); findPrimaryFuture.onComplete(new OnComplete() { @Override public void onComplete(Throwable failure, ActorSelection primaryShard) { if(failure != null) { newTxFutureCallback.createTransactionContext(failure, null); } else { newTxFutureCallback.setPrimaryShard(primaryShard); } } }, actorContext.getClientDispatcher()); } return txFutureCallback; } public String getTransactionChainId() { return transactionChainId; } protected ActorContext getActorContext() { return actorContext; } /** * Implements a Future OnComplete callback for a CreateTransaction message. This class handles * retries, up to a limit, if the shard doesn't have a leader yet. This is done by scheduling a * retry task after a short delay. *

* The end result from a completed CreateTransaction message is a TransactionContext that is * used to perform transaction operations. Transaction operations that occur before the * CreateTransaction completes are cache and executed once the CreateTransaction completes, * successfully or not. */ private class TransactionFutureCallback extends OnComplete { /** * The list of transaction operations to execute once the CreateTransaction completes. */ @GuardedBy("txOperationsOnComplete") private final List txOperationsOnComplete = Lists.newArrayList(); /** * The TransactionContext resulting from the CreateTransaction reply. */ private volatile TransactionContext transactionContext; /** * The target primary shard. */ private volatile ActorSelection primaryShard; private volatile int createTxTries = (int) (actorContext.getDatastoreContext(). getShardLeaderElectionTimeout().duration().toMillis() / CREATE_TX_TRY_INTERVAL.toMillis()); private final String shardName; TransactionFutureCallback(String shardName) { this.shardName = shardName; } String getShardName() { return shardName; } TransactionContext getTransactionContext() { return transactionContext; } /** * Sets the target primary shard and initiates a CreateTransaction try. */ void setPrimaryShard(ActorSelection primaryShard) { this.primaryShard = primaryShard; if(transactionType == TransactionType.WRITE_ONLY && actorContext.getDatastoreContext().isWriteOnlyTransactionOptimizationsEnabled()) { LOG.debug("Tx {} Primary shard {} found - creating WRITE_ONLY transaction context", getIdentifier(), primaryShard); // For write-only Tx's we prepare the transaction modifications directly on the shard actor // to avoid the overhead of creating a separate transaction actor. // FIXME: can't assume the shard version is LITHIUM_VERSION - need to obtain it somehow. executeTxOperatonsOnComplete(createValidTransactionContext(this.primaryShard, this.primaryShard.path().toString(), DataStoreVersions.LITHIUM_VERSION)); } else { tryCreateTransaction(); } } /** * Adds a TransactionOperation to be executed after the CreateTransaction completes. */ void addTxOperationOnComplete(TransactionOperation operation) { boolean invokeOperation = true; synchronized(txOperationsOnComplete) { if(transactionContext == null) { LOG.debug("Tx {} Adding operation on complete", getIdentifier()); invokeOperation = false; txOperationsOnComplete.add(operation); } } if(invokeOperation) { operation.invoke(transactionContext); } } void enqueueTransactionOperation(final TransactionOperation op) { if (transactionContext != null) { op.invoke(transactionContext); } else { // The shard Tx hasn't been created yet so add the Tx operation to the Tx Future // callback to be executed after the Tx is created. addTxOperationOnComplete(op); } } /** * Performs a CreateTransaction try async. */ private void tryCreateTransaction() { if(LOG.isDebugEnabled()) { LOG.debug("Tx {} Primary shard {} found - trying create transaction", getIdentifier(), primaryShard); } Object serializedCreateMessage = new CreateTransaction(getIdentifier().toString(), TransactionProxy.this.transactionType.ordinal(), getTransactionChainId()).toSerializable(); Future createTxFuture = actorContext.executeOperationAsync(primaryShard, serializedCreateMessage); createTxFuture.onComplete(this, actorContext.getClientDispatcher()); } @Override public void onComplete(Throwable failure, Object response) { if(failure instanceof NoShardLeaderException) { // There's no leader for the shard yet - schedule and try again, unless we're out // of retries. Note: createTxTries is volatile as it may be written by different // threads however not concurrently, therefore decrementing it non-atomically here // is ok. if(--createTxTries > 0) { LOG.debug("Tx {} Shard {} has no leader yet - scheduling create Tx retry", getIdentifier(), shardName); actorContext.getActorSystem().scheduler().scheduleOnce(CREATE_TX_TRY_INTERVAL, new Runnable() { @Override public void run() { tryCreateTransaction(); } }, actorContext.getClientDispatcher()); return; } } createTransactionContext(failure, response); } private void createTransactionContext(Throwable failure, Object response) { // Mainly checking for state violation here to perform a volatile read of "initialized" to // ensure updates to operationLimter et al are visible to this thread (ie we're doing // "piggy-back" synchronization here). Preconditions.checkState(initialized, "Tx was not propertly initialized."); // Create the TransactionContext from the response or failure. Store the new // TransactionContext locally until we've completed invoking the // TransactionOperations. This avoids thread timing issues which could cause // out-of-order TransactionOperations. Eg, on a modification operation, if the // TransactionContext is non-null, then we directly call the TransactionContext. // However, at the same time, the code may be executing the cached // TransactionOperations. So to avoid thus timing, we don't publish the // TransactionContext until after we've executed all cached TransactionOperations. TransactionContext localTransactionContext; if(failure != null) { LOG.debug("Tx {} Creating NoOpTransaction because of error", getIdentifier(), failure); localTransactionContext = new NoOpTransactionContext(failure, getIdentifier(), operationLimiter); } else if (CreateTransactionReply.SERIALIZABLE_CLASS.equals(response.getClass())) { localTransactionContext = createValidTransactionContext( CreateTransactionReply.fromSerializable(response)); } else { IllegalArgumentException exception = new IllegalArgumentException(String.format( "Invalid reply type %s for CreateTransaction", response.getClass())); localTransactionContext = new NoOpTransactionContext(exception, getIdentifier(), operationLimiter); } executeTxOperatonsOnComplete(localTransactionContext); } private void executeTxOperatonsOnComplete(TransactionContext localTransactionContext) { while(true) { // Access to txOperationsOnComplete and transactionContext must be protected and atomic // (ie synchronized) with respect to #addTxOperationOnComplete to handle timing // issues and ensure no TransactionOperation is missed and that they are processed // in the order they occurred. // We'll make a local copy of the txOperationsOnComplete list to handle re-entrancy // in case a TransactionOperation results in another transaction operation being // queued (eg a put operation from a client read Future callback that is notified // synchronously). Collection operationsBatch = null; synchronized(txOperationsOnComplete) { if(txOperationsOnComplete.isEmpty()) { // We're done invoking the TransactionOperations so we can now publish the // TransactionContext. transactionContext = localTransactionContext; break; } operationsBatch = new ArrayList<>(txOperationsOnComplete); txOperationsOnComplete.clear(); } // Invoke TransactionOperations outside the sync block to avoid unnecessary blocking. // A slight down-side is that we need to re-acquire the lock below but this should // be negligible. for(TransactionOperation oper: operationsBatch) { oper.invoke(localTransactionContext); } } } private TransactionContext createValidTransactionContext(CreateTransactionReply reply) { LOG.debug("Tx {} Received {}", getIdentifier(), reply); return createValidTransactionContext(actorContext.actorSelection(reply.getTransactionPath()), reply.getTransactionPath(), reply.getVersion()); } private TransactionContext createValidTransactionContext(ActorSelection transactionActor, String transactionPath, short remoteTransactionVersion) { if (transactionType == TransactionType.READ_ONLY) { // Read-only Tx's aren't explicitly closed by the client so we create a PhantomReference // to close the remote Tx's when this instance is no longer in use and is garbage // collected. if(remoteTransactionActorsMB == null) { remoteTransactionActors = Lists.newArrayList(); remoteTransactionActorsMB = new AtomicBoolean(); TransactionProxyCleanupPhantomReference.track(TransactionProxy.this); } // Add the actor to the remoteTransactionActors list for access by the // cleanup PhantonReference. remoteTransactionActors.add(transactionActor); // Write to the memory barrier volatile to publish the above update to the // remoteTransactionActors list for thread visibility. remoteTransactionActorsMB.set(true); } // TxActor is always created where the leader of the shard is. // Check if TxActor is created in the same node boolean isTxActorLocal = actorContext.isPathLocal(transactionPath); if(remoteTransactionVersion < DataStoreVersions.LITHIUM_VERSION) { return new PreLithiumTransactionContextImpl(transactionPath, transactionActor, getIdentifier(), transactionChainId, actorContext, schemaContext, isTxActorLocal, remoteTransactionVersion, operationCompleter); } else { return new TransactionContextImpl(transactionActor, getIdentifier(), transactionChainId, actorContext, schemaContext, isTxActorLocal, remoteTransactionVersion, operationCompleter); } } } }