BUG 3045 : Use non-strict parsing in hello message.
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / main / java / org / opendaylight / controller / cluster / datastore / ChainedTransactionProxy.java
1 /*
2  * Copyright (c) 2015 Brocade Communications Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.controller.cluster.datastore;
9
10 import akka.actor.ActorSelection;
11 import akka.dispatch.OnComplete;
12 import java.util.List;
13 import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
14 import org.slf4j.Logger;
15 import org.slf4j.LoggerFactory;
16 import scala.concurrent.Future;
17 import scala.concurrent.Promise;
18
19 final class ChainedTransactionProxy extends TransactionProxy {
20     private static final Logger LOG = LoggerFactory.getLogger(ChainedTransactionProxy.class);
21
22     /**
23      * Stores the ready Futures from the previous Tx in the chain.
24      */
25     private final List<Future<Object>> previousReadyFutures;
26
27     /**
28      * Stores the ready Futures from this transaction when it is readied.
29      */
30     private volatile List<Future<Object>> readyFutures;
31
32     ChainedTransactionProxy(ActorContext actorContext, TransactionType transactionType,
33             String transactionChainId, List<Future<Object>> previousReadyFutures) {
34         super(actorContext, transactionType, transactionChainId);
35         this.previousReadyFutures = previousReadyFutures;
36     }
37
38     List<Future<Object>> getReadyFutures() {
39         return readyFutures;
40     }
41
42     boolean isReady() {
43         return readyFutures != null;
44     }
45
46     @SuppressWarnings({ "unchecked", "rawtypes" })
47     @Override
48     public AbstractThreePhaseCommitCohort<?> ready() {
49         final AbstractThreePhaseCommitCohort<?> ret = super.ready();
50         readyFutures = (List)ret.getCohortFutures();
51         LOG.debug("onTransactionReady {} pending readyFutures size {} chain {}", getIdentifier(),
52             readyFutures.size(), getTransactionChainId());
53         return ret;
54     }
55
56     /**
57      * This method is overridden to ensure the previous Tx's ready operations complete
58      * before we initiate the next Tx in the chain to avoid creation failures if the
59      * previous Tx's ready operations haven't completed yet.
60      */
61     @Override
62     protected Future<ActorSelection> sendFindPrimaryShardAsync(final String shardName) {
63         // Check if there are any previous ready Futures, otherwise let the super class handle it.
64         if(previousReadyFutures.isEmpty()) {
65             return super.sendFindPrimaryShardAsync(shardName);
66         }
67
68         if (LOG.isDebugEnabled()) {
69             LOG.debug("Waiting for {} previous ready futures for Tx {} on chain {}",
70                     previousReadyFutures.size(), getIdentifier(), getTransactionChainId());
71         }
72
73         // Combine the ready Futures into 1.
74         Future<Iterable<Object>> combinedFutures = akka.dispatch.Futures.sequence(
75                 previousReadyFutures, getActorContext().getClientDispatcher());
76
77         // Add a callback for completion of the combined Futures.
78         final Promise<ActorSelection> returnPromise = akka.dispatch.Futures.promise();
79         OnComplete<Iterable<Object>> onComplete = new OnComplete<Iterable<Object>>() {
80             @Override
81             public void onComplete(Throwable failure, Iterable<Object> notUsed) {
82                 if(failure != null) {
83                     // A Ready Future failed so fail the returned Promise.
84                     returnPromise.failure(failure);
85                 } else {
86                     LOG.debug("Previous Tx readied - sending FindPrimaryShard for {} on chain {}",
87                             getIdentifier(), getTransactionChainId());
88
89                     // Send the FindPrimaryShard message and use the resulting Future to complete the
90                     // returned Promise.
91                     returnPromise.completeWith(ChainedTransactionProxy.super.sendFindPrimaryShardAsync(shardName));
92                 }
93             }
94         };
95
96         combinedFutures.onComplete(onComplete, getActorContext().getClientDispatcher());
97
98         return returnPromise.future();
99     }
100 }