X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-distributed-datastore%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fdatastore%2FChainedTransactionProxy.java;h=9a800c1659b624132cbdb5fe22435aeda9c7a626;hp=c59a277fa867860355f00bc1ba47439fb98035e7;hb=107324809285bfbb9890cba38ffa18390f8de4bd;hpb=3d256dbaa5db779d7883398a6f44badf88955eaf diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ChainedTransactionProxy.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ChainedTransactionProxy.java index c59a277fa8..9a800c1659 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ChainedTransactionProxy.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ChainedTransactionProxy.java @@ -7,9 +7,9 @@ */ package org.opendaylight.controller.cluster.datastore; -import akka.actor.ActorSelection; import akka.dispatch.OnComplete; import java.util.List; +import org.opendaylight.controller.cluster.datastore.messages.PrimaryShardInfo; import org.opendaylight.controller.cluster.datastore.utils.ActorContext; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -22,20 +22,20 @@ final class ChainedTransactionProxy extends TransactionProxy { /** * Stores the ready Futures from the previous Tx in the chain. */ - private final List> previousReadyFutures; + private final List> previousReadyFutures; /** * Stores the ready Futures from this transaction when it is readied. */ - private volatile List> readyFutures; + private volatile List> readyFutures; ChainedTransactionProxy(ActorContext actorContext, TransactionType transactionType, - String transactionChainId, List> previousReadyFutures) { + String transactionChainId, List> previousReadyFutures) { super(actorContext, transactionType, transactionChainId); this.previousReadyFutures = previousReadyFutures; } - List> getReadyFutures() { + List> getReadyFutures() { return readyFutures; } @@ -43,11 +43,14 @@ final class ChainedTransactionProxy extends TransactionProxy { return readyFutures != null; } + @SuppressWarnings({ "unchecked", "rawtypes" }) @Override - protected void onTransactionReady(List> readyFutures) { + public AbstractThreePhaseCommitCohort ready() { + final AbstractThreePhaseCommitCohort ret = super.ready(); + readyFutures = (List)ret.getCohortFutures(); LOG.debug("onTransactionReady {} pending readyFutures size {} chain {}", getIdentifier(), - readyFutures.size(), getTransactionChainId()); - this.readyFutures = readyFutures; + readyFutures.size(), getTransactionChainId()); + return ret; } /** @@ -56,7 +59,7 @@ final class ChainedTransactionProxy extends TransactionProxy { * previous Tx's ready operations haven't completed yet. */ @Override - protected Future sendFindPrimaryShardAsync(final String shardName) { + protected Future sendFindPrimaryShardAsync(final String shardName) { // Check if there are any previous ready Futures, otherwise let the super class handle it. if(previousReadyFutures.isEmpty()) { return super.sendFindPrimaryShardAsync(shardName); @@ -68,14 +71,14 @@ final class ChainedTransactionProxy extends TransactionProxy { } // Combine the ready Futures into 1. - Future> combinedFutures = akka.dispatch.Futures.sequence( + Future> combinedFutures = akka.dispatch.Futures.sequence( previousReadyFutures, getActorContext().getClientDispatcher()); // Add a callback for completion of the combined Futures. - final Promise returnPromise = akka.dispatch.Futures.promise(); - OnComplete> onComplete = new OnComplete>() { + final Promise returnPromise = akka.dispatch.Futures.promise(); + OnComplete> onComplete = new OnComplete>() { @Override - public void onComplete(Throwable failure, Iterable notUsed) { + public void onComplete(Throwable failure, Iterable notUsed) { if(failure != null) { // A Ready Future failed so fail the returned Promise. returnPromise.failure(failure);