2 * Copyright (c) 2015 Brocade Communications Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.controller.cluster.datastore;
10 import akka.dispatch.OnComplete;
11 import java.util.List;
12 import org.opendaylight.controller.cluster.datastore.messages.PrimaryShardInfo;
13 import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
14 import org.slf4j.Logger;
15 import org.slf4j.LoggerFactory;
16 import scala.concurrent.Future;
17 import scala.concurrent.Promise;
19 final class ChainedTransactionProxy extends TransactionProxy {
20 private static final Logger LOG = LoggerFactory.getLogger(ChainedTransactionProxy.class);
23 * Stores the ready Futures from the previous Tx in the chain.
25 private final List<Future<Object>> previousReadyFutures;
28 * Stores the ready Futures from this transaction when it is readied.
30 private volatile List<Future<Object>> readyFutures;
32 ChainedTransactionProxy(ActorContext actorContext, TransactionType transactionType,
33 String transactionChainId, List<Future<Object>> previousReadyFutures) {
34 super(actorContext, transactionType, transactionChainId);
35 this.previousReadyFutures = previousReadyFutures;
38 List<Future<Object>> getReadyFutures() {
43 return readyFutures != null;
46 @SuppressWarnings({ "unchecked", "rawtypes" })
48 public AbstractThreePhaseCommitCohort<?> ready() {
49 final AbstractThreePhaseCommitCohort<?> ret = super.ready();
50 readyFutures = (List)ret.getCohortFutures();
51 LOG.debug("onTransactionReady {} pending readyFutures size {} chain {}", getIdentifier(),
52 readyFutures.size(), getTransactionChainId());
57 * This method is overridden to ensure the previous Tx's ready operations complete
58 * before we initiate the next Tx in the chain to avoid creation failures if the
59 * previous Tx's ready operations haven't completed yet.
62 protected Future<PrimaryShardInfo> sendFindPrimaryShardAsync(final String shardName) {
63 // Check if there are any previous ready Futures, otherwise let the super class handle it.
64 if(previousReadyFutures.isEmpty()) {
65 return super.sendFindPrimaryShardAsync(shardName);
68 if (LOG.isDebugEnabled()) {
69 LOG.debug("Waiting for {} previous ready futures for Tx {} on chain {}",
70 previousReadyFutures.size(), getIdentifier(), getTransactionChainId());
73 // Combine the ready Futures into 1.
74 Future<Iterable<Object>> combinedFutures = akka.dispatch.Futures.sequence(
75 previousReadyFutures, getActorContext().getClientDispatcher());
77 // Add a callback for completion of the combined Futures.
78 final Promise<PrimaryShardInfo> returnPromise = akka.dispatch.Futures.promise();
79 OnComplete<Iterable<Object>> onComplete = new OnComplete<Iterable<Object>>() {
81 public void onComplete(Throwable failure, Iterable<Object> notUsed) {
83 // A Ready Future failed so fail the returned Promise.
84 returnPromise.failure(failure);
86 LOG.debug("Previous Tx readied - sending FindPrimaryShard for {} on chain {}",
87 getIdentifier(), getTransactionChainId());
89 // Send the FindPrimaryShard message and use the resulting Future to complete the
91 returnPromise.completeWith(ChainedTransactionProxy.super.sendFindPrimaryShardAsync(shardName));
96 combinedFutures.onComplete(onComplete, getActorContext().getClientDispatcher());
98 return returnPromise.future();