Merge "Do not override managed versions"
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / main / java / org / opendaylight / controller / cluster / datastore / TransactionChainProxy.java
1 /*
2  * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.controller.cluster.datastore;
10
11 import akka.actor.ActorSelection;
12 import akka.dispatch.OnComplete;
13 import java.util.AbstractMap.SimpleEntry;
14 import java.util.List;
15 import org.opendaylight.controller.cluster.datastore.messages.CloseTransactionChain;
16 import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
17 import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadTransaction;
18 import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadWriteTransaction;
19 import org.opendaylight.controller.sal.core.spi.data.DOMStoreTransactionChain;
20 import org.opendaylight.controller.sal.core.spi.data.DOMStoreWriteTransaction;
21 import scala.concurrent.Future;
22 import scala.concurrent.Promise;
23
24 /**
25  * TransactionChainProxy acts as a proxy for a DOMStoreTransactionChain created on a remote shard
26  */
27 public class TransactionChainProxy implements DOMStoreTransactionChain{
28     private final ActorContext actorContext;
29     private final String transactionChainId;
30     private volatile SimpleEntry<Object, List<Future<ActorSelection>>> previousTxReadyFutures;
31
32     public TransactionChainProxy(ActorContext actorContext) {
33         this.actorContext = actorContext;
34         transactionChainId = actorContext.getCurrentMemberName() + "-" + System.currentTimeMillis();
35     }
36
37     @Override
38     public DOMStoreReadTransaction newReadOnlyTransaction() {
39         return new ChainedTransactionProxy(actorContext, TransactionProxy.TransactionType.READ_ONLY);
40     }
41
42     @Override
43     public DOMStoreReadWriteTransaction newReadWriteTransaction() {
44         return new ChainedTransactionProxy(actorContext, TransactionProxy.TransactionType.READ_WRITE);
45     }
46
47     @Override
48     public DOMStoreWriteTransaction newWriteOnlyTransaction() {
49         return new ChainedTransactionProxy(actorContext, TransactionProxy.TransactionType.WRITE_ONLY);
50     }
51
52     @Override
53     public void close() {
54         // Send a close transaction chain request to each and every shard
55         actorContext.broadcast(new CloseTransactionChain(transactionChainId));
56     }
57
58     public String getTransactionChainId() {
59         return transactionChainId;
60     }
61
62     private class ChainedTransactionProxy extends TransactionProxy {
63
64         ChainedTransactionProxy(ActorContext actorContext, TransactionType transactionType) {
65             super(actorContext, transactionType, transactionChainId);
66         }
67
68         @Override
69         protected void onTransactionReady(List<Future<ActorSelection>> cohortFutures) {
70             if(!cohortFutures.isEmpty()) {
71                 previousTxReadyFutures = new SimpleEntry<>(getIdentifier(), cohortFutures);
72             } else {
73                 previousTxReadyFutures = null;
74             }
75         }
76
77         /**
78          * This method is overridden to ensure the previous Tx's ready operations complete
79          * before we create the next shard Tx in the chain to avoid creation failures if the
80          * previous Tx's ready operations haven't completed yet.
81          */
82         @Override
83         protected Future<Object> sendCreateTransaction(final ActorSelection shard,
84                 final Object serializedCreateMessage) {
85             // Check if there are any previous ready Futures. Also make sure the previous ready
86             // Futures aren't for this Tx as deadlock would occur if tried to wait on our own
87             // Futures. This may happen b/c the shard Tx creates are done async so it's possible
88             // for the client to ready this Tx before we've even attempted to create a shard Tx.
89             if(previousTxReadyFutures == null ||
90                     previousTxReadyFutures.getKey().equals(getIdentifier())) {
91                 return super.sendCreateTransaction(shard, serializedCreateMessage);
92             }
93
94             // Combine the ready Futures into 1.
95             Future<Iterable<ActorSelection>> combinedFutures = akka.dispatch.Futures.sequence(
96                     previousTxReadyFutures.getValue(), actorContext.getActorSystem().dispatcher());
97
98             // Add a callback for completion of the combined Futures.
99             final Promise<Object> createTxPromise = akka.dispatch.Futures.promise();
100             OnComplete<Iterable<ActorSelection>> onComplete = new OnComplete<Iterable<ActorSelection>>() {
101                 @Override
102                 public void onComplete(Throwable failure, Iterable<ActorSelection> notUsed) {
103                     if(failure != null) {
104                         // A Ready Future failed so fail the returned Promise.
105                         createTxPromise.failure(failure);
106                     } else {
107                         // Send the CreateTx message and use the resulting Future to complete the
108                         // returned Promise.
109                         createTxPromise.completeWith(actorContext.executeOperationAsync(shard,
110                                 serializedCreateMessage));
111                     }
112                 }
113             };
114
115             combinedFutures.onComplete(onComplete, actorContext.getActorSystem().dispatcher());
116
117             return createTxPromise.future();
118         }
119     }
120 }