X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-distributed-datastore%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fdatastore%2FTransactionProxy.java;h=64b9086c250c16f759a417003d0cefc9839f688b;hp=58b37be2a2727babd9b0305e868d85d90c079052;hb=691c47ae72532db04f9b2c33cb8a0cef642e5a17;hpb=ff0ca2ab34a016be25fc13fc05bfbd6aa698cee0 diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionProxy.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionProxy.java index 58b37be2a2..64b9086c25 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionProxy.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionProxy.java @@ -18,9 +18,11 @@ import com.google.common.base.Optional; import com.google.common.base.Preconditions; import com.google.common.collect.Lists; import com.google.common.util.concurrent.CheckedFuture; +import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.SettableFuture; import java.util.ArrayList; import java.util.Collection; +import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -30,6 +32,7 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; import javax.annotation.concurrent.GuardedBy; +import org.opendaylight.controller.cluster.datastore.compat.PreLithiumTransactionContextImpl; import org.opendaylight.controller.cluster.datastore.exceptions.NoShardLeaderException; import org.opendaylight.controller.cluster.datastore.identifiers.TransactionIdentifier; import org.opendaylight.controller.cluster.datastore.messages.CloseTransaction; @@ -162,7 +165,7 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction { * PhantomReference. */ private List remoteTransactionActors; - private AtomicBoolean remoteTransactionActorsMB; + private volatile AtomicBoolean remoteTransactionActorsMB; /** * Stores the create transaction results per shard. @@ -175,8 +178,10 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction { private final String transactionChainId; private final SchemaContext schemaContext; private boolean inReadyState; - private final Semaphore operationLimiter; - private final OperationCompleter operationCompleter; + + private volatile boolean initialized; + private Semaphore operationLimiter; + private OperationCompleter operationCompleter; public TransactionProxy(ActorContext actorContext, TransactionType transactionType) { this(actorContext, transactionType, ""); @@ -197,25 +202,7 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction { memberName = "UNKNOWN-MEMBER"; } - this.identifier = TransactionIdentifier.builder().memberName(memberName).counter( - counter.getAndIncrement()).build(); - - if(transactionType == TransactionType.READ_ONLY) { - // Read-only Tx's aren't explicitly closed by the client so we create a PhantomReference - // to close the remote Tx's when this instance is no longer in use and is garbage - // collected. - - remoteTransactionActors = Lists.newArrayList(); - remoteTransactionActorsMB = new AtomicBoolean(); - - TransactionProxyCleanupPhantomReference cleanup = - new TransactionProxyCleanupPhantomReference(this); - phantomReferenceCache.put(cleanup, cleanup); - } - - // Note : Currently mailbox-capacity comes from akka.conf and not from the config-subsystem - this.operationLimiter = new Semaphore(actorContext.getTransactionOutstandingOperationLimit()); - this.operationCompleter = new OperationCompleter(operationLimiter); + this.identifier = new TransactionIdentifier(memberName, counter.getAndIncrement()); LOG.debug("Created txn {} of type {} on chain {}", identifier, transactionType, transactionChainId); } @@ -303,6 +290,16 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction { } private void throttleOperation(int acquirePermits) { + if(!initialized) { + // Note : Currently mailbox-capacity comes from akka.conf and not from the config-subsystem + operationLimiter = new Semaphore(actorContext.getTransactionOutstandingOperationLimit()); + operationCompleter = new OperationCompleter(operationLimiter); + + // Make sure we write this last because it's volatile and will also publish the non-volatile writes + // above as well so they'll be visible to other threads. + initialized = true; + } + try { if(!operationLimiter.tryAcquire(acquirePermits, actorContext.getDatastoreContext().getOperationTimeoutInSeconds(), TimeUnit.SECONDS)){ @@ -377,13 +374,18 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction { checkModificationState(); - throttleOperation(txFutureCallbackMap.size()); - inReadyState = true; LOG.debug("Tx {} Readying {} transactions for commit", identifier, txFutureCallbackMap.size()); + if(txFutureCallbackMap.size() == 0) { + onTransactionReady(Collections.>emptyList()); + return NoOpDOMStoreThreePhaseCommitCohort.INSTANCE; + } + + throttleOperation(txFutureCallbackMap.size()); + List> cohortFutures = Lists.newArrayList(); for(TransactionFutureCallback txFutureCallback : txFutureCallbackMap.values()) { @@ -424,18 +426,6 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction { protected void onTransactionReady(List> cohortFutures) { } - /** - * Method called to send a CreateTransaction message to a shard. - * - * @param shard the shard actor to send to - * @param serializedCreateMessage the serialized message to send - * @return the response Future - */ - protected Future sendCreateTransaction(ActorSelection shard, - Object serializedCreateMessage) { - return actorContext.executeOperationAsync(shard, serializedCreateMessage); - } - @Override public Object getIdentifier() { return this.identifier; @@ -454,7 +444,7 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction { txFutureCallbackMap.clear(); - if(transactionType == TransactionType.READ_ONLY) { + if(remoteTransactionActorsMB != null) { remoteTransactionActors.clear(); remoteTransactionActorsMB.set(true); } @@ -464,14 +454,17 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction { return ShardStrategyFactory.getStrategy(path).findShard(path); } + protected Future sendFindPrimaryShardAsync(String shardName) { + return actorContext.findPrimaryShardAsync(shardName); + } + private TransactionFutureCallback getOrCreateTxFutureCallback(YangInstanceIdentifier path) { String shardName = shardNameFromIdentifier(path); TransactionFutureCallback txFutureCallback = txFutureCallbackMap.get(shardName); if(txFutureCallback == null) { - Future findPrimaryFuture = actorContext.findPrimaryShardAsync(shardName); + Future findPrimaryFuture = sendFindPrimaryShardAsync(shardName); - final TransactionFutureCallback newTxFutureCallback = - new TransactionFutureCallback(shardName); + final TransactionFutureCallback newTxFutureCallback = new TransactionFutureCallback(shardName); txFutureCallback = newTxFutureCallback; txFutureCallbackMap.put(shardName, txFutureCallback); @@ -597,10 +590,11 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction { * Performs a CreateTransaction try async. */ private void tryCreateTransaction() { - Future createTxFuture = sendCreateTransaction(primaryShard, - new CreateTransaction(identifier.toString(), - TransactionProxy.this.transactionType.ordinal(), - getTransactionChainId()).toSerializable()); + Object serializedCreateMessage = new CreateTransaction(identifier.toString(), + TransactionProxy.this.transactionType.ordinal(), + getTransactionChainId()).toSerializable(); + + Future createTxFuture = actorContext.executeOperationAsync(primaryShard, serializedCreateMessage); createTxFuture.onComplete(this, actorContext.getClientDispatcher()); } @@ -627,6 +621,11 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction { } } + // Mainly checking for state violation here to perform a volatile read of "initialized" to + // ensure updates to operationLimter et al are visible to this thread (ie we're doing + // "piggy-back" synchronization here). + Preconditions.checkState(initialized, "Tx was not propertly initialized."); + // Create the TransactionContext from the response or failure. Store the new // TransactionContext locally until we've completed invoking the // TransactionOperations. This avoids thread timing issues which could cause @@ -695,6 +694,19 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction { ActorSelection transactionActor = actorContext.actorSelection(transactionPath); if (transactionType == TransactionType.READ_ONLY) { + // Read-only Tx's aren't explicitly closed by the client so we create a PhantomReference + // to close the remote Tx's when this instance is no longer in use and is garbage + // collected. + + if(remoteTransactionActorsMB == null) { + remoteTransactionActors = Lists.newArrayList(); + remoteTransactionActorsMB = new AtomicBoolean(); + + TransactionProxyCleanupPhantomReference cleanup = + new TransactionProxyCleanupPhantomReference(TransactionProxy.this); + phantomReferenceCache.put(cleanup, cleanup); + } + // Add the actor to the remoteTransactionActors list for access by the // cleanup PhantonReference. remoteTransactionActors.add(transactionActor); @@ -712,9 +724,41 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction { return new TransactionContextImpl(transactionPath, transactionActor, identifier, actorContext, schemaContext, isTxActorLocal, reply.getVersion(), operationCompleter); } else { - return new LegacyTransactionContextImpl(transactionPath, transactionActor, identifier, + return new PreLithiumTransactionContextImpl(transactionPath, transactionActor, identifier, actorContext, schemaContext, isTxActorLocal, reply.getVersion(), operationCompleter); } } } + + private static class NoOpDOMStoreThreePhaseCommitCohort implements DOMStoreThreePhaseCommitCohort { + static NoOpDOMStoreThreePhaseCommitCohort INSTANCE = new NoOpDOMStoreThreePhaseCommitCohort(); + + private static final ListenableFuture IMMEDIATE_VOID_SUCCESS = + com.google.common.util.concurrent.Futures.immediateFuture(null); + private static final ListenableFuture IMMEDIATE_BOOLEAN_SUCCESS = + com.google.common.util.concurrent.Futures.immediateFuture(Boolean.TRUE); + + private NoOpDOMStoreThreePhaseCommitCohort() { + } + + @Override + public ListenableFuture canCommit() { + return IMMEDIATE_BOOLEAN_SUCCESS; + } + + @Override + public ListenableFuture preCommit() { + return IMMEDIATE_VOID_SUCCESS; + } + + @Override + public ListenableFuture abort() { + return IMMEDIATE_VOID_SUCCESS; + } + + @Override + public ListenableFuture commit() { + return IMMEDIATE_VOID_SUCCESS; + } + } }