X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-distributed-datastore%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fdatastore%2FTransactionProxy.java;h=16a979fa6a1176b17f1d449d23de04de6bc58f1f;hp=a0987cd5d63943979985527794dbe8471c0afe78;hb=HEAD;hpb=f89552de4942d3709d6ee84415e672c6c7de489f diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionProxy.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionProxy.java deleted file mode 100644 index a0987cd5d6..0000000000 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionProxy.java +++ /dev/null @@ -1,522 +0,0 @@ -/* - * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved. - * - * This program and the accompanying materials are made available under the - * terms of the Eclipse Public License v1.0 which accompanies this distribution, - * and is available at http://www.eclipse.org/legal/epl-v10.html - */ - -package org.opendaylight.controller.cluster.datastore; - -import akka.actor.ActorSelection; -import akka.dispatch.Mapper; -import akka.dispatch.OnComplete; -import com.google.common.annotations.VisibleForTesting; -import com.google.common.base.Optional; -import com.google.common.base.Preconditions; -import com.google.common.collect.Lists; -import com.google.common.util.concurrent.CheckedFuture; -import com.google.common.util.concurrent.Futures; -import com.google.common.util.concurrent.ListenableFuture; -import com.google.common.util.concurrent.SettableFuture; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.Set; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.Semaphore; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicLong; -import org.opendaylight.controller.cluster.datastore.compat.PreLithiumTransactionContextImpl; -import org.opendaylight.controller.cluster.datastore.identifiers.TransactionIdentifier; -import org.opendaylight.controller.cluster.datastore.shardstrategy.ShardStrategyFactory; -import org.opendaylight.controller.cluster.datastore.utils.ActorContext; -import org.opendaylight.controller.cluster.datastore.utils.NormalizedNodeAggregator; -import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException; -import org.opendaylight.controller.sal.core.spi.data.AbstractDOMStoreTransaction; -import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadWriteTransaction; -import org.opendaylight.yangtools.util.concurrent.MappingCheckedFuture; -import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; -import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode; -import org.opendaylight.yangtools.yang.model.api.SchemaContext; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import scala.concurrent.Future; -import scala.concurrent.Promise; - -/** - * TransactionProxy acts as a proxy for one or more transactions that were created on a remote shard - *

- * Creating a transaction on the consumer side will create one instance of a transaction proxy. If during - * the transaction reads and writes are done on data that belongs to different shards then a separate transaction will - * be created on each of those shards by the TransactionProxy - *

- *

- * The TransactionProxy does not make any guarantees about atomicity or order in which the transactions on the various - * shards will be executed. - *

- */ -public class TransactionProxy extends AbstractDOMStoreTransaction implements DOMStoreReadWriteTransaction { - - public static enum TransactionType { - READ_ONLY, - WRITE_ONLY, - READ_WRITE; - - // Cache all values - private static final TransactionType[] VALUES = values(); - - public static TransactionType fromInt(final int type) { - try { - return VALUES[type]; - } catch (IndexOutOfBoundsException e) { - throw new IllegalArgumentException("In TransactionType enum value " + type, e); - } - } - } - - private static enum TransactionState { - OPEN, - READY, - CLOSED, - } - - static final Mapper SAME_FAILURE_TRANSFORMER = - new Mapper() { - @Override - public Throwable apply(Throwable failure) { - return failure; - } - }; - - private static final AtomicLong counter = new AtomicLong(); - - private static final Logger LOG = LoggerFactory.getLogger(TransactionProxy.class); - - /** - * Stores the remote Tx actors for each requested data store path to be used by the - * PhantomReference to close the remote Tx's. This is only used for read-only Tx's. The - * remoteTransactionActorsMB volatile serves as a memory barrier to publish updates to the - * remoteTransactionActors list so they will be visible to the thread accessing the - * PhantomReference. - */ - List remoteTransactionActors; - volatile AtomicBoolean remoteTransactionActorsMB; - - /** - * Stores the create transaction results per shard. - */ - private final Map txFutureCallbackMap = new HashMap<>(); - - private final TransactionType transactionType; - final ActorContext actorContext; - private final String transactionChainId; - private final SchemaContext schemaContext; - private TransactionState state = TransactionState.OPEN; - - private volatile boolean initialized; - private Semaphore operationLimiter; - private OperationCompleter operationCompleter; - - public TransactionProxy(ActorContext actorContext, TransactionType transactionType) { - this(actorContext, transactionType, ""); - } - - public TransactionProxy(ActorContext actorContext, TransactionType transactionType, String transactionChainId) { - super(createIdentifier(actorContext)); - this.actorContext = Preconditions.checkNotNull(actorContext, - "actorContext should not be null"); - this.transactionType = Preconditions.checkNotNull(transactionType, - "transactionType should not be null"); - this.schemaContext = Preconditions.checkNotNull(actorContext.getSchemaContext(), - "schemaContext should not be null"); - this.transactionChainId = transactionChainId; - - LOG.debug("Created txn {} of type {} on chain {}", getIdentifier(), transactionType, transactionChainId); - } - - private static TransactionIdentifier createIdentifier(ActorContext actorContext) { - String memberName = actorContext.getCurrentMemberName(); - if (memberName == null) { - memberName = "UNKNOWN-MEMBER"; - } - - return new TransactionIdentifier(memberName, counter.getAndIncrement()); - } - - @VisibleForTesting - boolean hasTransactionContext() { - for(TransactionFutureCallback txFutureCallback : txFutureCallbackMap.values()) { - TransactionContext transactionContext = txFutureCallback.getTransactionContext(); - if(transactionContext != null) { - return true; - } - } - - return false; - } - - private boolean isRootPath(YangInstanceIdentifier path){ - return !path.getPathArguments().iterator().hasNext(); - } - - @Override - public CheckedFuture>, ReadFailedException> read(final YangInstanceIdentifier path) { - - Preconditions.checkState(transactionType != TransactionType.WRITE_ONLY, - "Read operation on write-only transaction is not allowed"); - - LOG.debug("Tx {} read {}", getIdentifier(), path); - - final SettableFuture>> proxyFuture = SettableFuture.create(); - - if(isRootPath(path)){ - readAllData(path, proxyFuture); - } else { - throttleOperation(); - - TransactionFutureCallback txFutureCallback = getOrCreateTxFutureCallback(path); - txFutureCallback.enqueueTransactionOperation(new TransactionOperation() { - @Override - public void invoke(TransactionContext transactionContext) { - transactionContext.readData(path, proxyFuture); - } - }); - - } - - return MappingCheckedFuture.create(proxyFuture, ReadFailedException.MAPPER); - } - - private void readAllData(final YangInstanceIdentifier path, - final SettableFuture>> proxyFuture) { - Set allShardNames = actorContext.getConfiguration().getAllShardNames(); - List>>> futures = new ArrayList<>(allShardNames.size()); - - for(String shardName : allShardNames){ - final SettableFuture>> subProxyFuture = SettableFuture.create(); - - throttleOperation(); - - TransactionFutureCallback txFutureCallback = getOrCreateTxFutureCallback(shardName); - txFutureCallback.enqueueTransactionOperation(new TransactionOperation() { - @Override - public void invoke(TransactionContext transactionContext) { - transactionContext.readData(path, subProxyFuture); - } - }); - - futures.add(subProxyFuture); - } - - final ListenableFuture>>> future = Futures.allAsList(futures); - - future.addListener(new Runnable() { - @Override - public void run() { - try { - proxyFuture.set(NormalizedNodeAggregator.aggregate(YangInstanceIdentifier.builder().build(), - future.get(), actorContext.getSchemaContext())); - } catch (InterruptedException | ExecutionException e) { - proxyFuture.setException(e); - } - } - }, actorContext.getActorSystem().dispatcher()); - } - - @Override - public CheckedFuture exists(final YangInstanceIdentifier path) { - - Preconditions.checkState(transactionType != TransactionType.WRITE_ONLY, - "Exists operation on write-only transaction is not allowed"); - - LOG.debug("Tx {} exists {}", getIdentifier(), path); - - throttleOperation(); - - final SettableFuture proxyFuture = SettableFuture.create(); - - TransactionFutureCallback txFutureCallback = getOrCreateTxFutureCallback(path); - txFutureCallback.enqueueTransactionOperation(new TransactionOperation() { - @Override - public void invoke(TransactionContext transactionContext) { - transactionContext.dataExists(path, proxyFuture); - } - }); - - return MappingCheckedFuture.create(proxyFuture, ReadFailedException.MAPPER); - } - - private void checkModificationState() { - Preconditions.checkState(transactionType != TransactionType.READ_ONLY, - "Modification operation on read-only transaction is not allowed"); - Preconditions.checkState(state == TransactionState.OPEN, - "Transaction is sealed - further modifications are not allowed"); - } - - private void throttleOperation() { - throttleOperation(1); - } - - private void throttleOperation(int acquirePermits) { - if(!initialized) { - // Note : Currently mailbox-capacity comes from akka.conf and not from the config-subsystem - operationLimiter = new Semaphore(actorContext.getTransactionOutstandingOperationLimit()); - operationCompleter = new OperationCompleter(operationLimiter); - - // Make sure we write this last because it's volatile and will also publish the non-volatile writes - // above as well so they'll be visible to other threads. - initialized = true; - } - - try { - if(!operationLimiter.tryAcquire(acquirePermits, - actorContext.getDatastoreContext().getOperationTimeoutInSeconds(), TimeUnit.SECONDS)){ - LOG.warn("Failed to acquire operation permit for transaction {}", getIdentifier()); - } - } catch (InterruptedException e) { - if(LOG.isDebugEnabled()) { - LOG.debug("Interrupted when trying to acquire operation permit for transaction " + getIdentifier().toString(), e); - } else { - LOG.warn("Interrupted when trying to acquire operation permit for transaction {}", getIdentifier()); - } - } - } - - final void ensureInitializied() { - Preconditions.checkState(initialized, "Transaction %s was not propertly initialized.", getIdentifier()); - } - - @Override - public void write(final YangInstanceIdentifier path, final NormalizedNode data) { - - checkModificationState(); - - LOG.debug("Tx {} write {}", getIdentifier(), path); - - throttleOperation(); - - TransactionFutureCallback txFutureCallback = getOrCreateTxFutureCallback(path); - txFutureCallback.enqueueTransactionOperation(new TransactionOperation() { - @Override - public void invoke(TransactionContext transactionContext) { - transactionContext.writeData(path, data); - } - }); - } - - @Override - public void merge(final YangInstanceIdentifier path, final NormalizedNode data) { - - checkModificationState(); - - LOG.debug("Tx {} merge {}", getIdentifier(), path); - - throttleOperation(); - - TransactionFutureCallback txFutureCallback = getOrCreateTxFutureCallback(path); - txFutureCallback.enqueueTransactionOperation(new TransactionOperation() { - @Override - public void invoke(TransactionContext transactionContext) { - transactionContext.mergeData(path, data); - } - }); - } - - @Override - public void delete(final YangInstanceIdentifier path) { - - checkModificationState(); - - LOG.debug("Tx {} delete {}", getIdentifier(), path); - - throttleOperation(); - - TransactionFutureCallback txFutureCallback = getOrCreateTxFutureCallback(path); - txFutureCallback.enqueueTransactionOperation(new TransactionOperation() { - @Override - public void invoke(TransactionContext transactionContext) { - transactionContext.deleteData(path); - } - }); - } - - private boolean seal(final TransactionState newState) { - if (state == TransactionState.OPEN) { - state = newState; - return true; - } else { - return false; - } - } - - @Override - public AbstractThreePhaseCommitCohort ready() { - Preconditions.checkState(transactionType != TransactionType.READ_ONLY, - "Read-only transactions cannot be readied"); - - final boolean success = seal(TransactionState.READY); - Preconditions.checkState(success, "Transaction %s is %s, it cannot be readied", getIdentifier(), state); - - LOG.debug("Tx {} Readying {} transactions for commit", getIdentifier(), - txFutureCallbackMap.size()); - - if (txFutureCallbackMap.isEmpty()) { - TransactionRateLimitingCallback.adjustRateLimitForUnusedTransaction(actorContext); - return NoOpDOMStoreThreePhaseCommitCohort.INSTANCE; - } - - throttleOperation(txFutureCallbackMap.size()); - - List> cohortFutures = new ArrayList<>(txFutureCallbackMap.size()); - for(TransactionFutureCallback txFutureCallback : txFutureCallbackMap.values()) { - - LOG.debug("Tx {} Readying transaction for shard {} chain {}", getIdentifier(), - txFutureCallback.getShardName(), transactionChainId); - - final TransactionContext transactionContext = txFutureCallback.getTransactionContext(); - final Future future; - if (transactionContext != null) { - // avoid the creation of a promise and a TransactionOperation - future = transactionContext.readyTransaction(); - } else { - final Promise promise = akka.dispatch.Futures.promise(); - txFutureCallback.enqueueTransactionOperation(new TransactionOperation() { - @Override - public void invoke(TransactionContext transactionContext) { - promise.completeWith(transactionContext.readyTransaction()); - } - }); - future = promise.future(); - } - - cohortFutures.add(future); - } - - return new ThreePhaseCommitCohortProxy(actorContext, cohortFutures, - getIdentifier().toString()); - } - - @Override - public void close() { - if (!seal(TransactionState.CLOSED)) { - if (state == TransactionState.CLOSED) { - // Idempotent no-op as per AutoCloseable recommendation - return; - } - - throw new IllegalStateException(String.format("Transaction %s is ready, it cannot be closed", - getIdentifier())); - } - - for (TransactionFutureCallback txFutureCallback : txFutureCallbackMap.values()) { - txFutureCallback.enqueueTransactionOperation(new TransactionOperation() { - @Override - public void invoke(TransactionContext transactionContext) { - transactionContext.closeTransaction(); - } - }); - } - - txFutureCallbackMap.clear(); - - if(remoteTransactionActorsMB != null) { - remoteTransactionActors.clear(); - remoteTransactionActorsMB.set(true); - } - } - - private String shardNameFromIdentifier(YangInstanceIdentifier path){ - return ShardStrategyFactory.getStrategy(path).findShard(path); - } - - protected Future sendFindPrimaryShardAsync(String shardName) { - return actorContext.findPrimaryShardAsync(shardName); - } - - final TransactionType getTransactionType() { - return transactionType; - } - - final Semaphore getOperationLimiter() { - return operationLimiter; - } - - private TransactionFutureCallback getOrCreateTxFutureCallback(YangInstanceIdentifier path) { - String shardName = shardNameFromIdentifier(path); - return getOrCreateTxFutureCallback(shardName); - } - - private TransactionFutureCallback getOrCreateTxFutureCallback(String shardName) { - TransactionFutureCallback txFutureCallback = txFutureCallbackMap.get(shardName); - if(txFutureCallback == null) { - Future findPrimaryFuture = sendFindPrimaryShardAsync(shardName); - - final TransactionFutureCallback newTxFutureCallback = new TransactionFutureCallback(this, shardName); - - txFutureCallback = newTxFutureCallback; - txFutureCallbackMap.put(shardName, txFutureCallback); - - findPrimaryFuture.onComplete(new OnComplete() { - @Override - public void onComplete(Throwable failure, ActorSelection primaryShard) { - if(failure != null) { - newTxFutureCallback.createTransactionContext(failure, null); - } else { - newTxFutureCallback.setPrimaryShard(primaryShard); - } - } - }, actorContext.getClientDispatcher()); - } - - return txFutureCallback; - } - - String getTransactionChainId() { - return transactionChainId; - } - - protected ActorContext getActorContext() { - return actorContext; - } - - TransactionContext createValidTransactionContext(ActorSelection transactionActor, - String transactionPath, short remoteTransactionVersion) { - - if (transactionType == TransactionType.READ_ONLY) { - // Read-only Tx's aren't explicitly closed by the client so we create a PhantomReference - // to close the remote Tx's when this instance is no longer in use and is garbage - // collected. - - if(remoteTransactionActorsMB == null) { - remoteTransactionActors = Lists.newArrayList(); - remoteTransactionActorsMB = new AtomicBoolean(); - - TransactionProxyCleanupPhantomReference.track(TransactionProxy.this); - } - - // Add the actor to the remoteTransactionActors list for access by the - // cleanup PhantonReference. - remoteTransactionActors.add(transactionActor); - - // Write to the memory barrier volatile to publish the above update to the - // remoteTransactionActors list for thread visibility. - remoteTransactionActorsMB.set(true); - } - - // TxActor is always created where the leader of the shard is. - // Check if TxActor is created in the same node - boolean isTxActorLocal = actorContext.isPathLocal(transactionPath); - - if(remoteTransactionVersion < DataStoreVersions.LITHIUM_VERSION) { - return new PreLithiumTransactionContextImpl(transactionPath, transactionActor, getIdentifier(), - transactionChainId, actorContext, schemaContext, isTxActorLocal, remoteTransactionVersion, - operationCompleter); - } else { - return new TransactionContextImpl(transactionActor, getIdentifier(), transactionChainId, - actorContext, schemaContext, isTxActorLocal, remoteTransactionVersion, operationCompleter); - } - } -}