X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-distributed-datastore%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fdatastore%2FTransactionChainProxy.java;h=c3b81d7e15730c338669757e22338ef816938ee1;hp=92de88e1126c19c38718f0f7c5c8cd51ad8430c0;hb=f662ce8b1fa94b77ba66f7ece8bcaff91dee809e;hpb=ef43357d61c0d95de93e5acbb4db23e1d04d9c88 diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionChainProxy.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionChainProxy.java index 92de88e112..c3b81d7e15 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionChainProxy.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionChainProxy.java @@ -1,209 +1,331 @@ /* - * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved. + * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved. * * This program and the accompanying materials are made available under the * terms of the Eclipse Public License v1.0 which accompanies this distribution, * and is available at http://www.eclipse.org/legal/epl-v10.html */ - package org.opendaylight.controller.cluster.datastore; import akka.actor.ActorSelection; +import akka.dispatch.Futures; import akka.dispatch.OnComplete; import com.google.common.base.Preconditions; -import java.util.AbstractMap.SimpleEntry; -import java.util.Collections; +import java.util.ArrayList; +import java.util.Collection; import java.util.List; +import java.util.Map.Entry; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; import java.util.concurrent.atomic.AtomicReferenceFieldUpdater; +import org.opendaylight.controller.cluster.access.concepts.LocalHistoryIdentifier; +import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier; import org.opendaylight.controller.cluster.datastore.messages.CloseTransactionChain; -import org.opendaylight.controller.cluster.datastore.utils.ActorContext; -import org.opendaylight.controller.md.sal.common.api.data.TransactionChainClosedException; -import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadTransaction; -import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadWriteTransaction; -import org.opendaylight.controller.sal.core.spi.data.DOMStoreTransactionChain; -import org.opendaylight.controller.sal.core.spi.data.DOMStoreWriteTransaction; +import org.opendaylight.controller.cluster.datastore.messages.PrimaryShardInfo; +import org.opendaylight.mdsal.dom.api.DOMTransactionChainClosedException; +import org.opendaylight.mdsal.dom.spi.store.DOMStoreReadTransaction; +import org.opendaylight.mdsal.dom.spi.store.DOMStoreReadWriteTransaction; +import org.opendaylight.mdsal.dom.spi.store.DOMStoreTransactionChain; +import org.opendaylight.mdsal.dom.spi.store.DOMStoreWriteTransaction; +import org.opendaylight.yangtools.yang.data.api.schema.tree.ReadOnlyDataTree; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import scala.concurrent.Future; import scala.concurrent.Promise; /** - * TransactionChainProxy acts as a proxy for a DOMStoreTransactionChain created on a remote shard + * A chain of {@link TransactionProxy}s. It allows a single open transaction to be open + * at a time. For remote transactions, it also tracks the outstanding readiness requests + * towards the shard and unblocks operations only after all have completed. */ -public class TransactionChainProxy implements DOMStoreTransactionChain { - private interface State { - boolean isReady(); - - SimpleEntry>> getReadyFutures(); +final class TransactionChainProxy extends AbstractTransactionContextFactory + implements DOMStoreTransactionChain { + private abstract static class State { + /** + * Check if it is okay to allocate a new transaction. + * @throws IllegalStateException if a transaction may not be allocated. + */ + abstract void checkReady(); - void setReadyFutures(Object txIdentifier, List> readyFutures); + /** + * Return the future which needs to be waited for before shard information + * is returned (which unblocks remote transactions). + * @return Future to wait for, or null of no wait is necessary + */ + abstract Future previousFuture(); } - private static class Allocated implements State { - private volatile SimpleEntry>> readyFutures; + private abstract static class Pending extends State { + private final TransactionIdentifier transaction; + private final Future previousFuture; - @Override - public boolean isReady() { - return readyFutures != null; + Pending(final TransactionIdentifier transaction, final Future previousFuture) { + this.previousFuture = previousFuture; + this.transaction = Preconditions.checkNotNull(transaction); } @Override - public SimpleEntry>> getReadyFutures() { - return readyFutures != null ? readyFutures : EMPTY_READY_FUTURES; + final Future previousFuture() { + return previousFuture; + } + + final TransactionIdentifier getIdentifier() { + return transaction; + } + } + + private static final class Allocated extends Pending { + Allocated(final TransactionIdentifier transaction, final Future previousFuture) { + super(transaction, previousFuture); } @Override - public void setReadyFutures(Object txIdentifier, List> readyFutures) { - this.readyFutures = new SimpleEntry<>(txIdentifier, readyFutures); + void checkReady() { + throw new IllegalStateException(String.format("Previous transaction %s is not ready yet", getIdentifier())); } } - private static abstract class AbstractDefaultState implements State { + private static final class Submitted extends Pending { + Submitted(final TransactionIdentifier transaction, final Future previousFuture) { + super(transaction, previousFuture); + } + @Override - public SimpleEntry>> getReadyFutures() { - return EMPTY_READY_FUTURES; + void checkReady() { + // Okay to allocate } + } + private abstract static class DefaultState extends State { @Override - public void setReadyFutures(Object txIdentifier, List> readyFutures) { - throw new IllegalStateException("No transaction is allocated"); + final Future previousFuture() { + return null; } } - private static final State IDLE_STATE = new AbstractDefaultState() { + private static final State IDLE_STATE = new DefaultState() { @Override - public boolean isReady() { - return true; + void checkReady() { + // Okay to allocate } }; - private static final State CLOSED_STATE = new AbstractDefaultState() { + private static final State CLOSED_STATE = new DefaultState() { @Override - public boolean isReady() { - throw new TransactionChainClosedException("Transaction chain has been closed"); + void checkReady() { + throw new DOMTransactionChainClosedException("Transaction chain has been closed"); } }; - private static final SimpleEntry>> EMPTY_READY_FUTURES = - new SimpleEntry>>("", - Collections.>emptyList()); - + private static final Logger LOG = LoggerFactory.getLogger(TransactionChainProxy.class); private static final AtomicReferenceFieldUpdater STATE_UPDATER = - AtomicReferenceFieldUpdater.newUpdater(TransactionChainProxy.class, State.class, "state"); - - private final ActorContext actorContext; - private final String transactionChainId; - private volatile State state = IDLE_STATE; - - public TransactionChainProxy(ActorContext actorContext) { - this.actorContext = actorContext; - transactionChainId = actorContext.getCurrentMemberName() + "-" + System.currentTimeMillis(); + AtomicReferenceFieldUpdater.newUpdater(TransactionChainProxy.class, State.class, "currentState"); + + private final TransactionContextFactory parent; + private volatile State currentState = IDLE_STATE; + + /** + * This map holds Promise instances for each read-only tx. It is used to maintain ordering of tx creates + * wrt to read-only tx's between this class and a LocalTransactionChain since they're bridged by + * asynchronous futures. Otherwise, in the following scenario, eg: + *

+ * 1) Create write tx1 on chain + * 2) do write and submit + * 3) Create read-only tx2 on chain and issue read + * 4) Create write tx3 on chain, do write but do not submit + *

+ * if the sequence/timing is right, tx3 may create its local tx on the LocalTransactionChain before tx2, + * which results in tx2 failing b/c tx3 isn't ready yet. So maintaining ordering prevents this issue + * (see Bug 4774). + *

+ * A Promise is added via newReadOnlyTransaction. When the parent class completes the primary shard + * lookup and creates the TransactionContext (either success or failure), onTransactionContextCreated is + * called which completes the Promise. A write tx that is created prior to completion will wait on the + * Promise's Future via findPrimaryShard. + */ + private final ConcurrentMap> priorReadOnlyTxPromises = + new ConcurrentHashMap<>(); + + TransactionChainProxy(final TransactionContextFactory parent, final LocalHistoryIdentifier historyId) { + super(parent.getActorUtils(), historyId); + this.parent = parent; } @Override public DOMStoreReadTransaction newReadOnlyTransaction() { - checkReadyState(); - return new ChainedTransactionProxy(actorContext, TransactionProxy.TransactionType.READ_ONLY); + currentState.checkReady(); + TransactionProxy transactionProxy = new TransactionProxy(this, TransactionType.READ_ONLY); + priorReadOnlyTxPromises.put(transactionProxy.getIdentifier(), Futures.promise()); + return transactionProxy; } @Override public DOMStoreReadWriteTransaction newReadWriteTransaction() { - return allocateWriteTransaction(TransactionProxy.TransactionType.READ_WRITE); + getActorUtils().acquireTxCreationPermit(); + return allocateWriteTransaction(TransactionType.READ_WRITE); } @Override public DOMStoreWriteTransaction newWriteOnlyTransaction() { - return allocateWriteTransaction(TransactionProxy.TransactionType.WRITE_ONLY); + getActorUtils().acquireTxCreationPermit(); + return allocateWriteTransaction(TransactionType.WRITE_ONLY); } @Override public void close() { - state = CLOSED_STATE; + currentState = CLOSED_STATE; // Send a close transaction chain request to each and every shard - actorContext.broadcast(new CloseTransactionChain(transactionChainId)); - } - private ChainedTransactionProxy allocateWriteTransaction(TransactionProxy.TransactionType type) { - checkReadyState(); + getActorUtils().broadcast(version -> new CloseTransactionChain(getHistoryId(), version).toSerializable(), + CloseTransactionChain.class); + } - ChainedTransactionProxy txProxy = new ChainedTransactionProxy(actorContext, type); - STATE_UPDATER.compareAndSet(this, IDLE_STATE, new Allocated()); + private TransactionProxy allocateWriteTransaction(final TransactionType type) { + State localState = currentState; + localState.checkReady(); - return txProxy; + final TransactionProxy ret = new TransactionProxy(this, type); + currentState = new Allocated(ret.getIdentifier(), localState.previousFuture()); + return ret; } - private void checkReadyState() { - Preconditions.checkState(state.isReady(), "Previous transaction %s is not ready yet", - state.getReadyFutures().getKey()); + @Override + protected LocalTransactionChain factoryForShard(final String shardName, final ActorSelection shardLeader, + final ReadOnlyDataTree dataTree) { + final LocalTransactionChain ret = new LocalTransactionChain(this, shardLeader, dataTree); + LOG.debug("Allocated transaction chain {} for shard {} leader {}", ret, shardName, shardLeader); + return ret; } - private class ChainedTransactionProxy extends TransactionProxy { - - ChainedTransactionProxy(ActorContext actorContext, TransactionType transactionType) { - super(actorContext, transactionType, transactionChainId); + /** + * This method is overridden to ensure the previous Tx's ready operations complete + * before we initiate the next Tx in the chain to avoid creation failures if the + * previous Tx's ready operations haven't completed yet. + */ + @SuppressWarnings({ "unchecked", "rawtypes" }) + @Override + protected Future findPrimaryShard(final String shardName, final TransactionIdentifier txId) { + // Read current state atomically + final State localState = currentState; + + // There are no outstanding futures, shortcut + Future previous = localState.previousFuture(); + if (previous == null) { + return combineFutureWithPossiblePriorReadOnlyTxFutures(parent.findPrimaryShard(shardName, txId), txId); } - @Override - protected void onTransactionReady(List> readyFutures) { - state.setReadyFutures(getIdentifier(), readyFutures); + final String previousTransactionId; + + if (localState instanceof Pending) { + previousTransactionId = ((Pending) localState).getIdentifier().toString(); + LOG.debug("Tx: {} - waiting for ready futures with pending Tx {}", txId, previousTransactionId); + } else { + previousTransactionId = ""; + LOG.debug("Waiting for ready futures on chain {}", getHistoryId()); } - /** - * This method is overridden to ensure the previous Tx's ready operations complete - * before we create the next shard Tx in the chain to avoid creation failures if the - * previous Tx's ready operations haven't completed yet. - */ - @Override - protected Future sendCreateTransaction(final ActorSelection shard, - final Object serializedCreateMessage) { - - // Check if there are any previous ready Futures, otherwise let the super class handle it. - // The second check is done to ensure the the previous ready Futures aren't for this - // Tx instance as deadlock would occur if we tried to wait on our own Futures. This can - // occur in this scenario: - // - // - the TransactionProxy is created and the client does a write. - // - // - the TransactionProxy then attempts to create the shard Tx. However it first - // sends a FindPrimaryShard message to the shard manager to find the local shard - // This call is done async. - // - // - the client submits the Tx and the TransactionProxy is readied and we cache - // the ready Futures here. - // - // - then the FindPrimaryShard call completes and this method is called to create - // the shard Tx. However the cached Futures were from the ready on this Tx. If we - // tried to wait on them, it would cause a form of deadlock as the ready Future - // would be waiting on the Tx create Future and vice versa. - SimpleEntry>> readyFuturesEntry = state.getReadyFutures(); - List> readyFutures = readyFuturesEntry.getValue(); - if(readyFutures.isEmpty() || getIdentifier().equals(readyFuturesEntry.getKey())) { - return super.sendCreateTransaction(shard, serializedCreateMessage); + previous = combineFutureWithPossiblePriorReadOnlyTxFutures(previous, txId); + + // Add a callback for completion of the combined Futures. + final Promise returnPromise = Futures.promise(); + + final OnComplete onComplete = new OnComplete() { + @Override + public void onComplete(final Throwable failure, final Object notUsed) { + if (failure != null) { + // A Ready Future failed so fail the returned Promise. + LOG.error("Tx: {} - ready future failed for previous Tx {}", txId, previousTransactionId); + returnPromise.failure(failure); + } else { + LOG.debug("Tx: {} - previous Tx {} readied - proceeding to FindPrimaryShard", + txId, previousTransactionId); + + // Send the FindPrimaryShard message and use the resulting Future to complete the + // returned Promise. + returnPromise.completeWith(parent.findPrimaryShard(shardName, txId)); + } + } + }; + + previous.onComplete(onComplete, getActorUtils().getClientDispatcher()); + return returnPromise.future(); + } + + private Future combineFutureWithPossiblePriorReadOnlyTxFutures(final Future future, + final TransactionIdentifier txId) { + if (!priorReadOnlyTxPromises.containsKey(txId) && !priorReadOnlyTxPromises.isEmpty()) { + Collection>> priorReadOnlyTxPromiseEntries = + new ArrayList<>(priorReadOnlyTxPromises.entrySet()); + if (priorReadOnlyTxPromiseEntries.isEmpty()) { + return future; } - // Combine the ready Futures into 1. - Future> combinedFutures = akka.dispatch.Futures.sequence( - readyFutures, actorContext.getActorSystem().dispatcher()); + List> priorReadOnlyTxFutures = new ArrayList<>(priorReadOnlyTxPromiseEntries.size()); + for (Entry> entry: priorReadOnlyTxPromiseEntries) { + LOG.debug("Tx: {} - waiting on future for prior read-only Tx {}", txId, entry.getKey()); + priorReadOnlyTxFutures.add(entry.getValue().future()); + } + + Future> combinedFutures = Futures.sequence(priorReadOnlyTxFutures, + getActorUtils().getClientDispatcher()); - // Add a callback for completion of the combined Futures. - final Promise createTxPromise = akka.dispatch.Futures.promise(); - OnComplete> onComplete = new OnComplete>() { + final Promise returnPromise = Futures.promise(); + final OnComplete> onComplete = new OnComplete>() { @Override - public void onComplete(Throwable failure, Iterable notUsed) { - if(failure != null) { - // A Ready Future failed so fail the returned Promise. - createTxPromise.failure(failure); - } else { - // Send the CreateTx message and use the resulting Future to complete the - // returned Promise. - createTxPromise.completeWith(actorContext.executeOperationAsync(shard, - serializedCreateMessage)); - } + public void onComplete(final Throwable failure, final Iterable notUsed) { + LOG.debug("Tx: {} - prior read-only Tx futures complete", txId); + + // Complete the returned Promise with the original Future. + returnPromise.completeWith(future); } }; - combinedFutures.onComplete(onComplete, actorContext.getActorSystem().dispatcher()); + combinedFutures.onComplete(onComplete, getActorUtils().getClientDispatcher()); + return returnPromise.future(); + } else { + return future; + } + } + + @Override + protected void onTransactionReady(final TransactionIdentifier transaction, + final Collection> cohortFutures) { + final State localState = currentState; + Preconditions.checkState(localState instanceof Allocated, "Readying transaction %s while state is %s", + transaction, localState); + final TransactionIdentifier currentTx = ((Allocated)localState).getIdentifier(); + Preconditions.checkState(transaction.equals(currentTx), "Readying transaction %s while %s is allocated", + transaction, currentTx); + + // Transaction ready and we are not waiting for futures -- go to idle + if (cohortFutures.isEmpty()) { + currentState = IDLE_STATE; + return; + } + + // Combine the ready Futures into 1 + final Future> combined = Futures.sequence(cohortFutures, getActorUtils().getClientDispatcher()); - return createTxPromise.future(); + // Record the we have outstanding futures + final State newState = new Submitted(transaction, combined); + currentState = newState; + + // Attach a completion reset, but only if we do not allocate a transaction + // in-between + combined.onComplete(new OnComplete>() { + @Override + public void onComplete(final Throwable arg0, final Iterable arg1) { + STATE_UPDATER.compareAndSet(TransactionChainProxy.this, newState, IDLE_STATE); + } + }, getActorUtils().getClientDispatcher()); + } + + @Override + protected void onTransactionContextCreated(TransactionIdentifier transactionId) { + Promise promise = priorReadOnlyTxPromises.remove(transactionId); + if (promise != null) { + promise.success(null); } } }