X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-distributed-datastore%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fdatastore%2FTransactionChainProxy.java;h=4ef89b4684c907923274ee1fb4ef0ae6d554da27;hp=dc215454c02c922d3bfc25f56029460cc944d17c;hb=b65e66f7b1bafb0d0c5fbe1c569835eb890f672a;hpb=466078ab1dc8a8cc2981b161051f6edecd6af85a diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionChainProxy.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionChainProxy.java index dc215454c0..4ef89b4684 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionChainProxy.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionChainProxy.java @@ -7,10 +7,12 @@ */ package org.opendaylight.controller.cluster.datastore; +import static com.google.common.base.Preconditions.checkState; +import static java.util.Objects.requireNonNull; + import akka.actor.ActorSelection; import akka.dispatch.Futures; import akka.dispatch.OnComplete; -import com.google.common.base.Preconditions; import java.util.ArrayList; import java.util.Collection; import java.util.List; @@ -18,7 +20,6 @@ import java.util.Map.Entry; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.atomic.AtomicReferenceFieldUpdater; -import javax.annotation.Nonnull; import org.opendaylight.controller.cluster.access.concepts.LocalHistoryIdentifier; import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier; import org.opendaylight.controller.cluster.datastore.messages.CloseTransactionChain; @@ -28,7 +29,7 @@ import org.opendaylight.mdsal.dom.spi.store.DOMStoreReadTransaction; import org.opendaylight.mdsal.dom.spi.store.DOMStoreReadWriteTransaction; import org.opendaylight.mdsal.dom.spi.store.DOMStoreTransactionChain; import org.opendaylight.mdsal.dom.spi.store.DOMStoreWriteTransaction; -import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTree; +import org.opendaylight.yangtools.yang.data.api.schema.tree.ReadOnlyDataTree; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import scala.concurrent.Future; @@ -62,7 +63,7 @@ final class TransactionChainProxy extends AbstractTransactionContextFactory previousFuture) { this.previousFuture = previousFuture; - this.transaction = Preconditions.checkNotNull(transaction); + this.transaction = requireNonNull(transaction); } @Override @@ -148,7 +149,7 @@ final class TransactionChainProxy extends AbstractTransactionContextFactory(); TransactionChainProxy(final TransactionContextFactory parent, final LocalHistoryIdentifier historyId) { - super(parent.getActorContext(), historyId); + super(parent.getActorUtils(), historyId); this.parent = parent; } @@ -162,13 +163,13 @@ final class TransactionChainProxy extends AbstractTransactionContextFactory new CloseTransactionChain(getHistoryId(), version).toSerializable(), + getActorUtils().broadcast(version -> new CloseTransactionChain(getHistoryId(), version).toSerializable(), CloseTransactionChain.class); } @@ -193,7 +194,7 @@ final class TransactionChainProxy extends AbstractTransactionContextFactory Future combineFutureWithPossiblePriorReadOnlyTxFutures(final Future future, final TransactionIdentifier txId) { - if (!priorReadOnlyTxPromises.containsKey(txId) && !priorReadOnlyTxPromises.isEmpty()) { - Collection>> priorReadOnlyTxPromiseEntries = - new ArrayList<>(priorReadOnlyTxPromises.entrySet()); - if (priorReadOnlyTxPromiseEntries.isEmpty()) { - return future; - } + return priorReadOnlyTxPromises.isEmpty() || priorReadOnlyTxPromises.containsKey(txId) ? future + // Tough luck, we need do some work + : combineWithPriorReadOnlyTxFutures(future, txId); + } - List> priorReadOnlyTxFutures = new ArrayList<>(priorReadOnlyTxPromiseEntries.size()); - for (Entry> entry: priorReadOnlyTxPromiseEntries) { - LOG.debug("Tx: {} - waiting on future for prior read-only Tx {}", txId, entry.getKey()); - priorReadOnlyTxFutures.add(entry.getValue().future()); - } + // Split out of the common path + private Future combineWithPriorReadOnlyTxFutures(final Future future, final TransactionIdentifier txId) { + // Take a stable snapshot, and check if we raced + final List>> priorReadOnlyTxPromiseEntries = + new ArrayList<>(priorReadOnlyTxPromises.entrySet()); + if (priorReadOnlyTxPromiseEntries.isEmpty()) { + return future; + } - Future> combinedFutures = Futures.sequence(priorReadOnlyTxFutures, - getActorContext().getClientDispatcher()); + final List> priorReadOnlyTxFutures = new ArrayList<>(priorReadOnlyTxPromiseEntries.size()); + for (Entry> entry: priorReadOnlyTxPromiseEntries) { + LOG.debug("Tx: {} - waiting on future for prior read-only Tx {}", txId, entry.getKey()); + priorReadOnlyTxFutures.add(entry.getValue().future()); + } - final Promise returnPromise = Futures.promise(); - final OnComplete> onComplete = new OnComplete>() { - @Override - public void onComplete(final Throwable failure, final Iterable notUsed) { - LOG.debug("Tx: {} - prior read-only Tx futures complete", txId); + final Future> combinedFutures = Futures.sequence(priorReadOnlyTxFutures, + getActorUtils().getClientDispatcher()); - // Complete the returned Promise with the original Future. - returnPromise.completeWith(future); - } - }; + final Promise returnPromise = Futures.promise(); + final OnComplete> onComplete = new OnComplete<>() { + @Override + public void onComplete(final Throwable failure, final Iterable notUsed) { + LOG.debug("Tx: {} - prior read-only Tx futures complete", txId); - combinedFutures.onComplete(onComplete, getActorContext().getClientDispatcher()); - return returnPromise.future(); - } else { - return future; - } + // Complete the returned Promise with the original Future. + returnPromise.completeWith(future); + } + }; + + combinedFutures.onComplete(onComplete, getActorUtils().getClientDispatcher()); + return returnPromise.future(); } @Override protected void onTransactionReady(final TransactionIdentifier transaction, final Collection> cohortFutures) { final State localState = currentState; - Preconditions.checkState(localState instanceof Allocated, "Readying transaction %s while state is %s", - transaction, localState); + checkState(localState instanceof Allocated, "Readying transaction %s while state is %s", transaction, + localState); final TransactionIdentifier currentTx = ((Allocated)localState).getIdentifier(); - Preconditions.checkState(transaction.equals(currentTx), "Readying transaction %s while %s is allocated", - transaction, currentTx); + checkState(transaction.equals(currentTx), "Readying transaction %s while %s is allocated", transaction, + currentTx); // Transaction ready and we are not waiting for futures -- go to idle if (cohortFutures.isEmpty()) { @@ -306,7 +311,7 @@ final class TransactionChainProxy extends AbstractTransactionContextFactory> combined = Futures.sequence(cohortFutures, getActorContext().getClientDispatcher()); + final Future> combined = Futures.sequence(cohortFutures, getActorUtils().getClientDispatcher()); // Record the we have outstanding futures final State newState = new Submitted(transaction, combined); @@ -319,11 +324,11 @@ final class TransactionChainProxy extends AbstractTransactionContextFactory arg1) { STATE_UPDATER.compareAndSet(TransactionChainProxy.this, newState, IDLE_STATE); } - }, getActorContext().getClientDispatcher()); + }, getActorUtils().getClientDispatcher()); } @Override - protected void onTransactionContextCreated(@Nonnull TransactionIdentifier transactionId) { + protected void onTransactionContextCreated(final TransactionIdentifier transactionId) { Promise promise = priorReadOnlyTxPromises.remove(transactionId); if (promise != null) { promise.success(null);