Merge "Bug 2684 - Fixing support for Neutron binding extended attributes"
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / main / java / org / opendaylight / controller / cluster / datastore / TransactionChainProxy.java
1 /*
2  * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.controller.cluster.datastore;
10
11 import akka.actor.ActorSelection;
12 import akka.dispatch.OnComplete;
13 import com.google.common.base.Preconditions;
14 import java.util.Collections;
15 import java.util.List;
16 import java.util.concurrent.atomic.AtomicInteger;
17 import org.opendaylight.controller.cluster.datastore.messages.CloseTransactionChain;
18 import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
19 import org.opendaylight.controller.md.sal.common.api.data.TransactionChainClosedException;
20 import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadTransaction;
21 import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadWriteTransaction;
22 import org.opendaylight.controller.sal.core.spi.data.DOMStoreTransactionChain;
23 import org.opendaylight.controller.sal.core.spi.data.DOMStoreWriteTransaction;
24 import org.slf4j.Logger;
25 import org.slf4j.LoggerFactory;
26 import scala.concurrent.Future;
27 import scala.concurrent.Promise;
28
29 /**
30  * TransactionChainProxy acts as a proxy for a DOMStoreTransactionChain created on a remote shard
31  */
32 public class TransactionChainProxy implements DOMStoreTransactionChain {
33
34     private static final Logger LOG = LoggerFactory.getLogger(TransactionChainProxy.class);
35
36     private interface State {
37         boolean isReady();
38
39         List<Future<ActorSelection>> getPreviousReadyFutures();
40     }
41
42     private static class Allocated implements State {
43         private final ChainedTransactionProxy transaction;
44
45         Allocated(ChainedTransactionProxy transaction) {
46             this.transaction = transaction;
47         }
48
49         @Override
50         public boolean isReady() {
51             return transaction.isReady();
52         }
53
54         @Override
55         public List<Future<ActorSelection>> getPreviousReadyFutures() {
56             return transaction.getReadyFutures();
57         }
58     }
59
60     private static abstract class AbstractDefaultState implements State {
61         @Override
62         public List<Future<ActorSelection>> getPreviousReadyFutures() {
63             return Collections.emptyList();
64         }
65     }
66
67     private static final State IDLE_STATE = new AbstractDefaultState() {
68         @Override
69         public boolean isReady() {
70             return true;
71         }
72     };
73
74     private static final State CLOSED_STATE = new AbstractDefaultState() {
75         @Override
76         public boolean isReady() {
77             throw new TransactionChainClosedException("Transaction chain has been closed");
78         }
79     };
80
81     private static final AtomicInteger counter = new AtomicInteger(0);
82
83     private final ActorContext actorContext;
84     private final String transactionChainId;
85     private volatile State currentState = IDLE_STATE;
86
87     public TransactionChainProxy(ActorContext actorContext) {
88         this.actorContext = actorContext;
89         transactionChainId = actorContext.getCurrentMemberName() + "-txn-chain-" + counter.incrementAndGet();
90     }
91
92     public String getTransactionChainId() {
93         return transactionChainId;
94     }
95
96     @Override
97     public DOMStoreReadTransaction newReadOnlyTransaction() {
98         State localState = currentState;
99         checkReadyState(localState);
100
101         return new ChainedTransactionProxy(actorContext, TransactionProxy.TransactionType.READ_ONLY,
102                 transactionChainId, localState.getPreviousReadyFutures());
103     }
104
105     @Override
106     public DOMStoreReadWriteTransaction newReadWriteTransaction() {
107         actorContext.acquireTxCreationPermit();
108         return allocateWriteTransaction(TransactionProxy.TransactionType.READ_WRITE);
109     }
110
111     @Override
112     public DOMStoreWriteTransaction newWriteOnlyTransaction() {
113         actorContext.acquireTxCreationPermit();
114         return allocateWriteTransaction(TransactionProxy.TransactionType.WRITE_ONLY);
115     }
116
117     @Override
118     public void close() {
119         currentState = CLOSED_STATE;
120
121         // Send a close transaction chain request to each and every shard
122         actorContext.broadcast(new CloseTransactionChain(transactionChainId));
123     }
124
125     private ChainedTransactionProxy allocateWriteTransaction(TransactionProxy.TransactionType type) {
126         State localState = currentState;
127
128         checkReadyState(localState);
129
130         // Pass the ready Futures from the previous Tx.
131         ChainedTransactionProxy txProxy = new ChainedTransactionProxy(actorContext, type,
132                 transactionChainId, localState.getPreviousReadyFutures());
133
134         currentState = new Allocated(txProxy);
135
136         return txProxy;
137     }
138
139     private void checkReadyState(State state) {
140         Preconditions.checkState(state.isReady(), "Previous transaction is not ready yet");
141     }
142
143     private static class ChainedTransactionProxy extends TransactionProxy {
144
145         /**
146          * Stores the ready Futures from the previous Tx in the chain.
147          */
148         private final List<Future<ActorSelection>> previousReadyFutures;
149
150         /**
151          * Stores the ready Futures from this transaction when it is readied.
152          */
153         private volatile List<Future<ActorSelection>> readyFutures;
154
155         private ChainedTransactionProxy(ActorContext actorContext, TransactionType transactionType,
156                 String transactionChainId, List<Future<ActorSelection>> previousReadyFutures) {
157             super(actorContext, transactionType, transactionChainId);
158             this.previousReadyFutures = previousReadyFutures;
159         }
160
161         List<Future<ActorSelection>> getReadyFutures() {
162             return readyFutures;
163         }
164
165         boolean isReady() {
166             return readyFutures != null;
167         }
168
169         @Override
170         protected void onTransactionReady(List<Future<ActorSelection>> readyFutures) {
171             LOG.debug("onTransactionReady {} pending readyFutures size {} chain {}", getIdentifier(),
172                     readyFutures.size(), getTransactionChainId());
173             this.readyFutures = readyFutures;
174         }
175
176         /**
177          * This method is overridden to ensure the previous Tx's ready operations complete
178          * before we create the next shard Tx in the chain to avoid creation failures if the
179          * previous Tx's ready operations haven't completed yet.
180          */
181         @Override
182         protected Future<Object> sendCreateTransaction(final ActorSelection shard,
183                 final Object serializedCreateMessage) {
184
185             // Check if there are any previous ready Futures, otherwise let the super class handle it.
186             if(previousReadyFutures.isEmpty()) {
187                 return super.sendCreateTransaction(shard, serializedCreateMessage);
188             }
189
190             // Combine the ready Futures into 1.
191             Future<Iterable<ActorSelection>> combinedFutures = akka.dispatch.Futures.sequence(
192                     previousReadyFutures, getActorContext().getActorSystem().dispatcher());
193
194             // Add a callback for completion of the combined Futures.
195             final Promise<Object> createTxPromise = akka.dispatch.Futures.promise();
196             OnComplete<Iterable<ActorSelection>> onComplete = new OnComplete<Iterable<ActorSelection>>() {
197                 @Override
198                 public void onComplete(Throwable failure, Iterable<ActorSelection> notUsed) {
199                     if(failure != null) {
200                         // A Ready Future failed so fail the returned Promise.
201                         createTxPromise.failure(failure);
202                     } else {
203                         LOG.debug("Previous Tx readied - sending CreateTransaction for {} on chain {}",
204                                 getIdentifier(), getTransactionChainId());
205
206                         // Send the CreateTx message and use the resulting Future to complete the
207                         // returned Promise.
208                         createTxPromise.completeWith(getActorContext().executeOperationAsync(shard,
209                                 serializedCreateMessage));
210                     }
211                 }
212             };
213
214             combinedFutures.onComplete(onComplete, getActorContext().getActorSystem().dispatcher());
215
216             return createTxPromise.future();
217         }
218     }
219 }