Move byte-based serialization method
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / main / java / org / opendaylight / controller / cluster / datastore / TransactionChainProxy.java
1 /*
2  * Copyright (c) 2015 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.controller.cluster.datastore;
9
10 import akka.actor.ActorSelection;
11 import akka.dispatch.Futures;
12 import akka.dispatch.OnComplete;
13 import com.google.common.base.Preconditions;
14 import java.util.ArrayList;
15 import java.util.Collection;
16 import java.util.List;
17 import java.util.Map.Entry;
18 import java.util.concurrent.ConcurrentHashMap;
19 import java.util.concurrent.ConcurrentMap;
20 import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
21 import org.opendaylight.controller.cluster.access.concepts.LocalHistoryIdentifier;
22 import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
23 import org.opendaylight.controller.cluster.datastore.messages.CloseTransactionChain;
24 import org.opendaylight.controller.cluster.datastore.messages.PrimaryShardInfo;
25 import org.opendaylight.mdsal.dom.api.DOMTransactionChainClosedException;
26 import org.opendaylight.mdsal.dom.spi.store.DOMStoreReadTransaction;
27 import org.opendaylight.mdsal.dom.spi.store.DOMStoreReadWriteTransaction;
28 import org.opendaylight.mdsal.dom.spi.store.DOMStoreTransactionChain;
29 import org.opendaylight.mdsal.dom.spi.store.DOMStoreWriteTransaction;
30 import org.opendaylight.yangtools.yang.data.api.schema.tree.ReadOnlyDataTree;
31 import org.slf4j.Logger;
32 import org.slf4j.LoggerFactory;
33 import scala.concurrent.Future;
34 import scala.concurrent.Promise;
35
36 /**
37  * A chain of {@link TransactionProxy}s. It allows a single open transaction to be open
38  * at a time. For remote transactions, it also tracks the outstanding readiness requests
39  * towards the shard and unblocks operations only after all have completed.
40  */
41 final class TransactionChainProxy extends AbstractTransactionContextFactory<LocalTransactionChain>
42         implements DOMStoreTransactionChain {
43     private abstract static class State {
44         /**
45          * Check if it is okay to allocate a new transaction.
46          * @throws IllegalStateException if a transaction may not be allocated.
47          */
48         abstract void checkReady();
49
50         /**
51          * Return the future which needs to be waited for before shard information
52          * is returned (which unblocks remote transactions).
53          * @return Future to wait for, or null of no wait is necessary
54          */
55         abstract Future<?> previousFuture();
56     }
57
58     private abstract static class Pending extends State {
59         private final TransactionIdentifier transaction;
60         private final Future<?> previousFuture;
61
62         Pending(final TransactionIdentifier transaction, final Future<?> previousFuture) {
63             this.previousFuture = previousFuture;
64             this.transaction = Preconditions.checkNotNull(transaction);
65         }
66
67         @Override
68         final Future<?> previousFuture() {
69             return previousFuture;
70         }
71
72         final TransactionIdentifier getIdentifier() {
73             return transaction;
74         }
75     }
76
77     private static final class Allocated extends Pending {
78         Allocated(final TransactionIdentifier transaction, final Future<?> previousFuture) {
79             super(transaction, previousFuture);
80         }
81
82         @Override
83         void checkReady() {
84             throw new IllegalStateException(String.format("Previous transaction %s is not ready yet", getIdentifier()));
85         }
86     }
87
88     private static final class Submitted extends Pending {
89         Submitted(final TransactionIdentifier transaction, final Future<?> previousFuture) {
90             super(transaction, previousFuture);
91         }
92
93         @Override
94         void checkReady() {
95             // Okay to allocate
96         }
97     }
98
99     private abstract static class DefaultState extends State {
100         @Override
101         final Future<?> previousFuture() {
102             return null;
103         }
104     }
105
106     private static final State IDLE_STATE = new DefaultState() {
107         @Override
108         void checkReady() {
109             // Okay to allocate
110         }
111     };
112
113     private static final State CLOSED_STATE = new DefaultState() {
114         @Override
115         void checkReady() {
116             throw new DOMTransactionChainClosedException("Transaction chain has been closed");
117         }
118     };
119
120     private static final Logger LOG = LoggerFactory.getLogger(TransactionChainProxy.class);
121     private static final AtomicReferenceFieldUpdater<TransactionChainProxy, State> STATE_UPDATER =
122             AtomicReferenceFieldUpdater.newUpdater(TransactionChainProxy.class, State.class, "currentState");
123
124     private final TransactionContextFactory parent;
125     private volatile State currentState = IDLE_STATE;
126
127     /**
128      * This map holds Promise instances for each read-only tx. It is used to maintain ordering of tx creates
129      * wrt to read-only tx's between this class and a LocalTransactionChain since they're bridged by
130      * asynchronous futures. Otherwise, in the following scenario, eg:
131      * <p/>
132      *   1) Create write tx1 on chain
133      *   2) do write and submit
134      *   3) Create read-only tx2 on chain and issue read
135      *   4) Create write tx3 on chain, do write but do not submit
136      * <p/>
137      * if the sequence/timing is right, tx3 may create its local tx on the LocalTransactionChain before tx2,
138      * which results in tx2 failing b/c tx3 isn't ready yet. So maintaining ordering prevents this issue
139      * (see Bug 4774).
140      * <p/>
141      * A Promise is added via newReadOnlyTransaction. When the parent class completes the primary shard
142      * lookup and creates the TransactionContext (either success or failure), onTransactionContextCreated is
143      * called which completes the Promise. A write tx that is created prior to completion will wait on the
144      * Promise's Future via findPrimaryShard.
145      */
146     private final ConcurrentMap<TransactionIdentifier, Promise<Object>> priorReadOnlyTxPromises =
147             new ConcurrentHashMap<>();
148
149     TransactionChainProxy(final TransactionContextFactory parent, final LocalHistoryIdentifier historyId) {
150         super(parent.getActorUtils(), historyId);
151         this.parent = parent;
152     }
153
154     @Override
155     public DOMStoreReadTransaction newReadOnlyTransaction() {
156         currentState.checkReady();
157         TransactionProxy transactionProxy = new TransactionProxy(this, TransactionType.READ_ONLY);
158         priorReadOnlyTxPromises.put(transactionProxy.getIdentifier(), Futures.<Object>promise());
159         return transactionProxy;
160     }
161
162     @Override
163     public DOMStoreReadWriteTransaction newReadWriteTransaction() {
164         getActorUtils().acquireTxCreationPermit();
165         return allocateWriteTransaction(TransactionType.READ_WRITE);
166     }
167
168     @Override
169     public DOMStoreWriteTransaction newWriteOnlyTransaction() {
170         getActorUtils().acquireTxCreationPermit();
171         return allocateWriteTransaction(TransactionType.WRITE_ONLY);
172     }
173
174     @Override
175     public void close() {
176         currentState = CLOSED_STATE;
177
178         // Send a close transaction chain request to each and every shard
179
180         getActorUtils().broadcast(version -> new CloseTransactionChain(getHistoryId(), version).toSerializable(),
181                 CloseTransactionChain.class);
182     }
183
184     private TransactionProxy allocateWriteTransaction(final TransactionType type) {
185         State localState = currentState;
186         localState.checkReady();
187
188         final TransactionProxy ret = new TransactionProxy(this, type);
189         currentState = new Allocated(ret.getIdentifier(), localState.previousFuture());
190         return ret;
191     }
192
193     @Override
194     protected LocalTransactionChain factoryForShard(final String shardName, final ActorSelection shardLeader,
195             final ReadOnlyDataTree dataTree) {
196         final LocalTransactionChain ret = new LocalTransactionChain(this, shardLeader, dataTree);
197         LOG.debug("Allocated transaction chain {} for shard {} leader {}", ret, shardName, shardLeader);
198         return ret;
199     }
200
201     /**
202      * This method is overridden to ensure the previous Tx's ready operations complete
203      * before we initiate the next Tx in the chain to avoid creation failures if the
204      * previous Tx's ready operations haven't completed yet.
205      */
206     @SuppressWarnings({ "unchecked", "rawtypes" })
207     @Override
208     protected Future<PrimaryShardInfo> findPrimaryShard(final String shardName, final TransactionIdentifier txId) {
209         // Read current state atomically
210         final State localState = currentState;
211
212         // There are no outstanding futures, shortcut
213         Future<?> previous = localState.previousFuture();
214         if (previous == null) {
215             return combineFutureWithPossiblePriorReadOnlyTxFutures(parent.findPrimaryShard(shardName, txId), txId);
216         }
217
218         final String previousTransactionId;
219
220         if (localState instanceof Pending) {
221             previousTransactionId = ((Pending) localState).getIdentifier().toString();
222             LOG.debug("Tx: {} - waiting for ready futures with pending Tx {}", txId, previousTransactionId);
223         } else {
224             previousTransactionId = "";
225             LOG.debug("Waiting for ready futures on chain {}", getHistoryId());
226         }
227
228         previous = combineFutureWithPossiblePriorReadOnlyTxFutures(previous, txId);
229
230         // Add a callback for completion of the combined Futures.
231         final Promise<PrimaryShardInfo> returnPromise = Futures.promise();
232
233         final OnComplete onComplete = new OnComplete() {
234             @Override
235             public void onComplete(final Throwable failure, final Object notUsed) {
236                 if (failure != null) {
237                     // A Ready Future failed so fail the returned Promise.
238                     LOG.error("Tx: {} - ready future failed for previous Tx {}", txId, previousTransactionId);
239                     returnPromise.failure(failure);
240                 } else {
241                     LOG.debug("Tx: {} - previous Tx {} readied - proceeding to FindPrimaryShard",
242                             txId, previousTransactionId);
243
244                     // Send the FindPrimaryShard message and use the resulting Future to complete the
245                     // returned Promise.
246                     returnPromise.completeWith(parent.findPrimaryShard(shardName, txId));
247                 }
248             }
249         };
250
251         previous.onComplete(onComplete, getActorUtils().getClientDispatcher());
252         return returnPromise.future();
253     }
254
255     private <T> Future<T> combineFutureWithPossiblePriorReadOnlyTxFutures(final Future<T> future,
256             final TransactionIdentifier txId) {
257         if (!priorReadOnlyTxPromises.containsKey(txId) && !priorReadOnlyTxPromises.isEmpty()) {
258             Collection<Entry<TransactionIdentifier, Promise<Object>>> priorReadOnlyTxPromiseEntries =
259                     new ArrayList<>(priorReadOnlyTxPromises.entrySet());
260             if (priorReadOnlyTxPromiseEntries.isEmpty()) {
261                 return future;
262             }
263
264             List<Future<Object>> priorReadOnlyTxFutures = new ArrayList<>(priorReadOnlyTxPromiseEntries.size());
265             for (Entry<TransactionIdentifier, Promise<Object>> entry: priorReadOnlyTxPromiseEntries) {
266                 LOG.debug("Tx: {} - waiting on future for prior read-only Tx {}", txId, entry.getKey());
267                 priorReadOnlyTxFutures.add(entry.getValue().future());
268             }
269
270             Future<Iterable<Object>> combinedFutures = Futures.sequence(priorReadOnlyTxFutures,
271                     getActorUtils().getClientDispatcher());
272
273             final Promise<T> returnPromise = Futures.promise();
274             final OnComplete<Iterable<Object>> onComplete = new OnComplete<Iterable<Object>>() {
275                 @Override
276                 public void onComplete(final Throwable failure, final Iterable<Object> notUsed) {
277                     LOG.debug("Tx: {} - prior read-only Tx futures complete", txId);
278
279                     // Complete the returned Promise with the original Future.
280                     returnPromise.completeWith(future);
281                 }
282             };
283
284             combinedFutures.onComplete(onComplete, getActorUtils().getClientDispatcher());
285             return returnPromise.future();
286         } else {
287             return future;
288         }
289     }
290
291     @Override
292     protected <T> void onTransactionReady(final TransactionIdentifier transaction,
293             final Collection<Future<T>> cohortFutures) {
294         final State localState = currentState;
295         Preconditions.checkState(localState instanceof Allocated, "Readying transaction %s while state is %s",
296                 transaction, localState);
297         final TransactionIdentifier currentTx = ((Allocated)localState).getIdentifier();
298         Preconditions.checkState(transaction.equals(currentTx), "Readying transaction %s while %s is allocated",
299                 transaction, currentTx);
300
301         // Transaction ready and we are not waiting for futures -- go to idle
302         if (cohortFutures.isEmpty()) {
303             currentState = IDLE_STATE;
304             return;
305         }
306
307         // Combine the ready Futures into 1
308         final Future<Iterable<T>> combined = Futures.sequence(cohortFutures, getActorUtils().getClientDispatcher());
309
310         // Record the we have outstanding futures
311         final State newState = new Submitted(transaction, combined);
312         currentState = newState;
313
314         // Attach a completion reset, but only if we do not allocate a transaction
315         // in-between
316         combined.onComplete(new OnComplete<Iterable<T>>() {
317             @Override
318             public void onComplete(final Throwable arg0, final Iterable<T> arg1) {
319                 STATE_UPDATER.compareAndSet(TransactionChainProxy.this, newState, IDLE_STATE);
320             }
321         }, getActorUtils().getClientDispatcher());
322     }
323
324     @Override
325     protected void onTransactionContextCreated(TransactionIdentifier transactionId) {
326         Promise<Object> promise = priorReadOnlyTxPromises.remove(transactionId);
327         if (promise != null) {
328             promise.success(null);
329         }
330     }
331 }