From e70650ee977b2269651073b71c8cfc0bc5b5e198 Mon Sep 17 00:00:00 2001 From: Robert Varga Date: Wed, 1 Apr 2015 12:47:27 +0200 Subject: [PATCH] Split out ChainedTransactionProxy Rather than having it as a inner class, move it into its own file, increasing clarity. Change-Id: I83d667db353cc01a950ac24da7ee68fc66ceb46d Signed-off-by: Robert Varga --- .../datastore/ChainedTransactionProxy.java | 97 +++++++++++++++++++ .../datastore/TransactionChainProxy.java | 85 ---------------- 2 files changed, 97 insertions(+), 85 deletions(-) create mode 100644 opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ChainedTransactionProxy.java diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ChainedTransactionProxy.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ChainedTransactionProxy.java new file mode 100644 index 0000000000..c59a277fa8 --- /dev/null +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ChainedTransactionProxy.java @@ -0,0 +1,97 @@ +/* + * Copyright (c) 2015 Brocade Communications Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.controller.cluster.datastore; + +import akka.actor.ActorSelection; +import akka.dispatch.OnComplete; +import java.util.List; +import org.opendaylight.controller.cluster.datastore.utils.ActorContext; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import scala.concurrent.Future; +import scala.concurrent.Promise; + +final class ChainedTransactionProxy extends TransactionProxy { + private static final Logger LOG = LoggerFactory.getLogger(ChainedTransactionProxy.class); + + /** + * Stores the ready Futures from the previous Tx in the chain. + */ + private final List> previousReadyFutures; + + /** + * Stores the ready Futures from this transaction when it is readied. + */ + private volatile List> readyFutures; + + ChainedTransactionProxy(ActorContext actorContext, TransactionType transactionType, + String transactionChainId, List> previousReadyFutures) { + super(actorContext, transactionType, transactionChainId); + this.previousReadyFutures = previousReadyFutures; + } + + List> getReadyFutures() { + return readyFutures; + } + + boolean isReady() { + return readyFutures != null; + } + + @Override + protected void onTransactionReady(List> readyFutures) { + LOG.debug("onTransactionReady {} pending readyFutures size {} chain {}", getIdentifier(), + readyFutures.size(), getTransactionChainId()); + this.readyFutures = readyFutures; + } + + /** + * This method is overridden to ensure the previous Tx's ready operations complete + * before we initiate the next Tx in the chain to avoid creation failures if the + * previous Tx's ready operations haven't completed yet. + */ + @Override + protected Future sendFindPrimaryShardAsync(final String shardName) { + // Check if there are any previous ready Futures, otherwise let the super class handle it. + if(previousReadyFutures.isEmpty()) { + return super.sendFindPrimaryShardAsync(shardName); + } + + if (LOG.isDebugEnabled()) { + LOG.debug("Waiting for {} previous ready futures for Tx {} on chain {}", + previousReadyFutures.size(), getIdentifier(), getTransactionChainId()); + } + + // Combine the ready Futures into 1. + Future> combinedFutures = akka.dispatch.Futures.sequence( + previousReadyFutures, getActorContext().getClientDispatcher()); + + // Add a callback for completion of the combined Futures. + final Promise returnPromise = akka.dispatch.Futures.promise(); + OnComplete> onComplete = new OnComplete>() { + @Override + public void onComplete(Throwable failure, Iterable notUsed) { + if(failure != null) { + // A Ready Future failed so fail the returned Promise. + returnPromise.failure(failure); + } else { + LOG.debug("Previous Tx readied - sending FindPrimaryShard for {} on chain {}", + getIdentifier(), getTransactionChainId()); + + // Send the FindPrimaryShard message and use the resulting Future to complete the + // returned Promise. + returnPromise.completeWith(ChainedTransactionProxy.super.sendFindPrimaryShardAsync(shardName)); + } + } + }; + + combinedFutures.onComplete(onComplete, getActorContext().getClientDispatcher()); + + return returnPromise.future(); + } +} \ No newline at end of file diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionChainProxy.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionChainProxy.java index 58ac1d8b82..11066edd54 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionChainProxy.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionChainProxy.java @@ -9,7 +9,6 @@ package org.opendaylight.controller.cluster.datastore; import akka.actor.ActorSelection; -import akka.dispatch.OnComplete; import com.google.common.base.Preconditions; import java.util.Collections; import java.util.List; @@ -21,18 +20,13 @@ import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadTransaction; import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadWriteTransaction; import org.opendaylight.controller.sal.core.spi.data.DOMStoreTransactionChain; import org.opendaylight.controller.sal.core.spi.data.DOMStoreWriteTransaction; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import scala.concurrent.Future; -import scala.concurrent.Promise; /** * TransactionChainProxy acts as a proxy for a DOMStoreTransactionChain created on a remote shard */ public class TransactionChainProxy implements DOMStoreTransactionChain { - private static final Logger LOG = LoggerFactory.getLogger(TransactionChainProxy.class); - private interface State { boolean isReady(); @@ -139,83 +133,4 @@ public class TransactionChainProxy implements DOMStoreTransactionChain { private void checkReadyState(State state) { Preconditions.checkState(state.isReady(), "Previous transaction is not ready yet"); } - - private static class ChainedTransactionProxy extends TransactionProxy { - - /** - * Stores the ready Futures from the previous Tx in the chain. - */ - private final List> previousReadyFutures; - - /** - * Stores the ready Futures from this transaction when it is readied. - */ - private volatile List> readyFutures; - - private ChainedTransactionProxy(ActorContext actorContext, TransactionType transactionType, - String transactionChainId, List> previousReadyFutures) { - super(actorContext, transactionType, transactionChainId); - this.previousReadyFutures = previousReadyFutures; - } - - List> getReadyFutures() { - return readyFutures; - } - - boolean isReady() { - return readyFutures != null; - } - - @Override - protected void onTransactionReady(List> readyFutures) { - LOG.debug("onTransactionReady {} pending readyFutures size {} chain {}", getIdentifier(), - readyFutures.size(), getTransactionChainId()); - this.readyFutures = readyFutures; - } - - /** - * This method is overridden to ensure the previous Tx's ready operations complete - * before we initiate the next Tx in the chain to avoid creation failures if the - * previous Tx's ready operations haven't completed yet. - */ - @Override - protected Future sendFindPrimaryShardAsync(final String shardName) { - // Check if there are any previous ready Futures, otherwise let the super class handle it. - if(previousReadyFutures.isEmpty()) { - return super.sendFindPrimaryShardAsync(shardName); - } - - if(LOG.isDebugEnabled()) { - LOG.debug("Waiting for {} previous ready futures for Tx {} on chain {}", - previousReadyFutures.size(), getIdentifier(), getTransactionChainId()); - } - - // Combine the ready Futures into 1. - Future> combinedFutures = akka.dispatch.Futures.sequence( - previousReadyFutures, getActorContext().getClientDispatcher()); - - // Add a callback for completion of the combined Futures. - final Promise returnPromise = akka.dispatch.Futures.promise(); - OnComplete> onComplete = new OnComplete>() { - @Override - public void onComplete(Throwable failure, Iterable notUsed) { - if(failure != null) { - // A Ready Future failed so fail the returned Promise. - returnPromise.failure(failure); - } else { - LOG.debug("Previous Tx readied - sending FindPrimaryShard for {} on chain {}", - getIdentifier(), getTransactionChainId()); - - // Send the FindPrimaryShard message and use the resulting Future to complete the - // returned Promise. - returnPromise.completeWith(ChainedTransactionProxy.super.sendFindPrimaryShardAsync(shardName)); - } - } - }; - - combinedFutures.onComplete(onComplete, getActorContext().getClientDispatcher()); - - return returnPromise.future(); - } - } } -- 2.36.6