X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-distributed-datastore%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fdatastore%2FTransactionChainProxy.java;h=7c36adb70e6f65cdd559f68ff524b56461bc14ff;hb=HEAD;hp=ee3a5cc82573d6e415a5bbcd080277673e4cb051;hpb=bbaba878c38f381b0b924f89b29a1d0fcf6e2a2f;p=controller.git diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionChainProxy.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionChainProxy.java deleted file mode 100644 index ee3a5cc825..0000000000 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionChainProxy.java +++ /dev/null @@ -1,219 +0,0 @@ -/* - * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved. - * - * This program and the accompanying materials are made available under the - * terms of the Eclipse Public License v1.0 which accompanies this distribution, - * and is available at http://www.eclipse.org/legal/epl-v10.html - */ - -package org.opendaylight.controller.cluster.datastore; - -import akka.actor.ActorSelection; -import akka.dispatch.OnComplete; -import com.google.common.base.Preconditions; -import java.util.Collections; -import java.util.List; -import java.util.concurrent.atomic.AtomicInteger; -import org.opendaylight.controller.cluster.datastore.messages.CloseTransactionChain; -import org.opendaylight.controller.cluster.datastore.utils.ActorContext; -import org.opendaylight.controller.md.sal.common.api.data.TransactionChainClosedException; -import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadTransaction; -import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadWriteTransaction; -import org.opendaylight.controller.sal.core.spi.data.DOMStoreTransactionChain; -import org.opendaylight.controller.sal.core.spi.data.DOMStoreWriteTransaction; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import scala.concurrent.Future; -import scala.concurrent.Promise; - -/** - * TransactionChainProxy acts as a proxy for a DOMStoreTransactionChain created on a remote shard - */ -public class TransactionChainProxy implements DOMStoreTransactionChain { - - private static final Logger LOG = LoggerFactory.getLogger(TransactionChainProxy.class); - - private interface State { - boolean isReady(); - - List> getPreviousReadyFutures(); - } - - private static class Allocated implements State { - private final ChainedTransactionProxy transaction; - - Allocated(ChainedTransactionProxy transaction) { - this.transaction = transaction; - } - - @Override - public boolean isReady() { - return transaction.isReady(); - } - - @Override - public List> getPreviousReadyFutures() { - return transaction.getReadyFutures(); - } - } - - private static abstract class AbstractDefaultState implements State { - @Override - public List> getPreviousReadyFutures() { - return Collections.emptyList(); - } - } - - private static final State IDLE_STATE = new AbstractDefaultState() { - @Override - public boolean isReady() { - return true; - } - }; - - private static final State CLOSED_STATE = new AbstractDefaultState() { - @Override - public boolean isReady() { - throw new TransactionChainClosedException("Transaction chain has been closed"); - } - }; - - private static final AtomicInteger counter = new AtomicInteger(0); - - private final ActorContext actorContext; - private final String transactionChainId; - private volatile State currentState = IDLE_STATE; - - public TransactionChainProxy(ActorContext actorContext) { - this.actorContext = actorContext; - transactionChainId = actorContext.getCurrentMemberName() + "-txn-chain-" + counter.incrementAndGet(); - } - - public String getTransactionChainId() { - return transactionChainId; - } - - @Override - public DOMStoreReadTransaction newReadOnlyTransaction() { - State localState = currentState; - checkReadyState(localState); - - return new ChainedTransactionProxy(actorContext, TransactionProxy.TransactionType.READ_ONLY, - transactionChainId, localState.getPreviousReadyFutures()); - } - - @Override - public DOMStoreReadWriteTransaction newReadWriteTransaction() { - actorContext.acquireTxCreationPermit(); - return allocateWriteTransaction(TransactionProxy.TransactionType.READ_WRITE); - } - - @Override - public DOMStoreWriteTransaction newWriteOnlyTransaction() { - actorContext.acquireTxCreationPermit(); - return allocateWriteTransaction(TransactionProxy.TransactionType.WRITE_ONLY); - } - - @Override - public void close() { - currentState = CLOSED_STATE; - - // Send a close transaction chain request to each and every shard - actorContext.broadcast(new CloseTransactionChain(transactionChainId)); - } - - private ChainedTransactionProxy allocateWriteTransaction(TransactionProxy.TransactionType type) { - State localState = currentState; - - checkReadyState(localState); - - // Pass the ready Futures from the previous Tx. - ChainedTransactionProxy txProxy = new ChainedTransactionProxy(actorContext, type, - transactionChainId, localState.getPreviousReadyFutures()); - - currentState = new Allocated(txProxy); - - return txProxy; - } - - private void checkReadyState(State state) { - Preconditions.checkState(state.isReady(), "Previous transaction is not ready yet"); - } - - private static class ChainedTransactionProxy extends TransactionProxy { - - /** - * Stores the ready Futures from the previous Tx in the chain. - */ - private final List> previousReadyFutures; - - /** - * Stores the ready Futures from this transaction when it is readied. - */ - private volatile List> readyFutures; - - private ChainedTransactionProxy(ActorContext actorContext, TransactionType transactionType, - String transactionChainId, List> previousReadyFutures) { - super(actorContext, transactionType, transactionChainId); - this.previousReadyFutures = previousReadyFutures; - } - - List> getReadyFutures() { - return readyFutures; - } - - boolean isReady() { - return readyFutures != null; - } - - @Override - protected void onTransactionReady(List> readyFutures) { - LOG.debug("onTransactionReady {} pending readyFutures size {} chain {}", getIdentifier(), - readyFutures.size(), getTransactionChainId()); - this.readyFutures = readyFutures; - } - - /** - * This method is overridden to ensure the previous Tx's ready operations complete - * before we create the next shard Tx in the chain to avoid creation failures if the - * previous Tx's ready operations haven't completed yet. - */ - @Override - protected Future sendCreateTransaction(final ActorSelection shard, - final Object serializedCreateMessage) { - - // Check if there are any previous ready Futures, otherwise let the super class handle it. - if(previousReadyFutures.isEmpty()) { - return super.sendCreateTransaction(shard, serializedCreateMessage); - } - - // Combine the ready Futures into 1. - Future> combinedFutures = akka.dispatch.Futures.sequence( - previousReadyFutures, getActorContext().getActorSystem().dispatcher()); - - // Add a callback for completion of the combined Futures. - final Promise createTxPromise = akka.dispatch.Futures.promise(); - OnComplete> onComplete = new OnComplete>() { - @Override - public void onComplete(Throwable failure, Iterable notUsed) { - if(failure != null) { - // A Ready Future failed so fail the returned Promise. - createTxPromise.failure(failure); - } else { - LOG.debug("Previous Tx readied - sending CreateTransaction for {} on chain {}", - getIdentifier(), getTransactionChainId()); - - // Send the CreateTx message and use the resulting Future to complete the - // returned Promise. - createTxPromise.completeWith(getActorContext().executeOperationAsync(shard, - serializedCreateMessage)); - } - } - }; - - combinedFutures.onComplete(onComplete, getActorContext().getActorSystem().dispatcher()); - - return createTxPromise.future(); - } - } -}