Renamed controller.md.sal.dom.api to mdsal.dom.api
[mdsal.git] / dom / mdsal-dom-broker / src / main / java / org / opendaylight / controller / md / sal / dom / broker / impl / PingPongTransactionChain.java
1 /*
2  * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.controller.md.sal.dom.broker.impl;
9
10 import org.opendaylight.mdsal.dom.api.DOMDataBroker;
11 import org.opendaylight.mdsal.dom.api.DOMDataReadOnlyTransaction;
12 import org.opendaylight.mdsal.dom.api.DOMDataReadWriteTransaction;
13 import org.opendaylight.mdsal.dom.api.DOMDataWriteTransaction;
14 import org.opendaylight.mdsal.dom.api.DOMTransactionChain;
15
16 import com.google.common.base.Optional;
17 import com.google.common.base.Preconditions;
18 import com.google.common.util.concurrent.CheckedFuture;
19 import com.google.common.util.concurrent.FutureCallback;
20 import com.google.common.util.concurrent.Futures;
21 import com.google.common.util.concurrent.ListenableFuture;
22 import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
23 import javax.annotation.Nonnull;
24 import javax.annotation.concurrent.GuardedBy;
25 import org.opendaylight.controller.md.sal.common.api.TransactionStatus;
26 import org.opendaylight.controller.md.sal.common.api.data.AsyncTransaction;
27 import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
28 import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
29 import org.opendaylight.controller.md.sal.common.api.data.TransactionChain;
30 import org.opendaylight.controller.md.sal.common.api.data.TransactionChainListener;
31 import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException;
32 import org.opendaylight.controller.md.sal.dom.spi.ForwardingDOMDataReadWriteTransaction;
33 import org.opendaylight.yangtools.yang.common.RpcResult;
34 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
35 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
36 import org.slf4j.Logger;
37 import org.slf4j.LoggerFactory;
38
39 /**
40  * An implementation of {@link DOMTransactionChain}, which has a very specific
41  * behavior, which some users may find surprising. If keeps the general
42  * intent of the contract, but it makes sure there are never more than two
43  * transactions allocated at any given time: one of them is being committed,
44  * and while that is happening, the other one acts as the scratch pad. Once
45  * the committing transaction completes successfully, the scratch transaction
46  * is enqueued as soon as it is ready.
47  *
48  * This mode of operation means that there is no inherent isolation between
49  * the front-end transactions and transactions cannot be reasonably cancelled.
50  *
51  * It furthermore means that the transactions returned by {@link #newReadOnlyTransaction()}
52  * counts as an outstanding transaction and the user may not allocate multiple
53  * read-only transactions at the same time.
54  */
55 public final class PingPongTransactionChain implements DOMTransactionChain {
56     private static final Logger LOG = LoggerFactory.getLogger(PingPongTransactionChain.class);
57     private final DOMTransactionChain delegate;
58
59     @GuardedBy("this")
60     private boolean failed;
61
62     /**
63      * This updater is used to manipulate the "ready" transaction. We perform only atomic
64      * get-and-set on it.
65      */
66     private static final AtomicReferenceFieldUpdater<PingPongTransactionChain, PingPongTransaction> READY_UPDATER =
67             AtomicReferenceFieldUpdater.newUpdater(PingPongTransactionChain.class, PingPongTransaction.class, "readyTx");
68     private volatile PingPongTransaction readyTx;
69
70     /**
71      * This updater is used to manipulate the "locked" transaction. A locked transaction
72      * means we know that the user still holds a transaction and should at some point call
73      * us. We perform on compare-and-swap to ensure we properly detect when a user is
74      * attempting to allocated multiple transactions concurrently.
75      */
76     private static final AtomicReferenceFieldUpdater<PingPongTransactionChain, PingPongTransaction> LOCKED_UPDATER =
77             AtomicReferenceFieldUpdater.newUpdater(PingPongTransactionChain.class, PingPongTransaction.class, "lockedTx");
78     private volatile PingPongTransaction lockedTx;
79
80     /**
81      * This updater is used to manipulate the "inflight" transaction. There can be at most
82      * one of these at any given time. We perform only compare-and-swap on these.
83      */
84     private static final AtomicReferenceFieldUpdater<PingPongTransactionChain, PingPongTransaction> INFLIGHT_UPDATER =
85             AtomicReferenceFieldUpdater.newUpdater(PingPongTransactionChain.class, PingPongTransaction.class, "inflightTx");
86     private volatile PingPongTransaction inflightTx;
87
88     PingPongTransactionChain(final DOMDataBroker broker, final TransactionChainListener listener) {
89         this.delegate = broker.createTransactionChain(new TransactionChainListener() {
90             @Override
91             public void onTransactionChainFailed(final TransactionChain<?, ?> chain, final AsyncTransaction<?, ?> transaction, final Throwable cause) {
92                 LOG.debug("Delegate chain {} reported failure in {}", chain, transaction, cause);
93
94                 final DOMDataReadWriteTransaction frontend;
95                 final PingPongTransaction tx = inflightTx;
96                 if (tx == null) {
97                     LOG.warn("Transaction chain {} failed with no pending transactions", chain);
98                     frontend = null;
99                 } else {
100                     frontend = tx.getFrontendTransaction();
101                 }
102
103                 listener.onTransactionChainFailed(PingPongTransactionChain.this, frontend, cause);
104                 delegateFailed();
105             }
106
107             @Override
108             public void onTransactionChainSuccessful(final TransactionChain<?, ?> chain) {
109                 listener.onTransactionChainSuccessful(PingPongTransactionChain.this);
110             }
111         });
112     }
113
114     private synchronized void delegateFailed() {
115         failed = true;
116
117         /*
118          * If we do not have a locked transaction, we need to ensure that
119          * the backend transaction is cancelled. Otherwise we can defer
120          * until the user calls us.
121          */
122         if (lockedTx == null) {
123             processIfReady();
124         }
125     }
126
127     private synchronized PingPongTransaction slowAllocateTransaction() {
128         final DOMDataReadWriteTransaction delegateTx = delegate.newReadWriteTransaction();
129         final PingPongTransaction newTx = new PingPongTransaction(delegateTx);
130
131         if (!LOCKED_UPDATER.compareAndSet(this, null, newTx)) {
132             delegateTx.cancel();
133             throw new IllegalStateException(String.format("New transaction %s raced with transacion %s", newTx, lockedTx));
134         }
135
136         return newTx;
137     }
138
139     private PingPongTransaction allocateTransaction() {
140         // Step 1: acquire current state
141         final PingPongTransaction oldTx = READY_UPDATER.getAndSet(this, null);
142
143         // Slow path: allocate a delegate transaction
144         if (oldTx == null) {
145             return slowAllocateTransaction();
146         }
147
148         // Fast path: reuse current transaction. We will check
149         //            failures and similar on submit().
150         if (!LOCKED_UPDATER.compareAndSet(this, null, oldTx)) {
151             // Ouch. Delegate chain has not detected a duplicate
152             // transaction allocation. This is the best we can do.
153             oldTx.getTransaction().cancel();
154             throw new IllegalStateException(String.format("Reusable transaction %s raced with transaction %s", oldTx, lockedTx));
155         }
156
157         return oldTx;
158     }
159
160     /*
161      * This forces allocateTransaction() on a slow path, which has to happen after
162      * this method has completed executing. Also inflightTx may be updated outside
163      * the lock, hence we need to re-check.
164      */
165     @GuardedBy("this")
166     private void processIfReady() {
167         if (inflightTx == null) {
168             final PingPongTransaction tx = READY_UPDATER.getAndSet(this, null);
169             if (tx != null) {
170                 processTransaction(tx);
171             }
172         }
173     }
174
175     /**
176      * Process a ready transaction. The caller needs to ensure that
177      * each transaction is seen only once by this method.
178      *
179      * @param tx Transaction which needs processing.
180      */
181     @GuardedBy("this")
182     private void processTransaction(@Nonnull final PingPongTransaction tx) {
183         if (failed) {
184             LOG.debug("Cancelling transaction {}", tx);
185             tx.getTransaction().cancel();
186             return;
187         }
188
189         LOG.debug("Submitting transaction {}", tx);
190         if (!INFLIGHT_UPDATER.compareAndSet(this, null, tx)) {
191             LOG.warn("Submitting transaction {} while {} is still running", tx, inflightTx);
192         }
193
194         Futures.addCallback(tx.getTransaction().submit(), new FutureCallback<Void>() {
195             @Override
196             public void onSuccess(final Void result) {
197                 transactionSuccessful(tx, result);
198             }
199
200             @Override
201             public void onFailure(final Throwable t) {
202                 transactionFailed(tx, t);
203             }
204         });
205     }
206
207     private void transactionSuccessful(final PingPongTransaction tx, final Void result) {
208         LOG.debug("Transaction {} completed successfully", tx);
209
210         final boolean success = INFLIGHT_UPDATER.compareAndSet(this, tx, null);
211         Preconditions.checkState(success, "Successful transaction %s while %s was submitted", tx, inflightTx);
212
213         synchronized (this) {
214             processIfReady();
215         }
216
217         // Can run unsynchronized
218         tx.onSuccess(result);
219     }
220
221     private void transactionFailed(final PingPongTransaction tx, final Throwable t) {
222         LOG.debug("Transaction {} failed", tx, t);
223
224         final boolean success = INFLIGHT_UPDATER.compareAndSet(this, tx, null);
225         Preconditions.checkState(success, "Failed transaction %s while %s was submitted", tx, inflightTx);
226
227         tx.onFailure(t);
228     }
229
230     private void readyTransaction(@Nonnull final PingPongTransaction tx) {
231         // First mark the transaction as not locked.
232         final boolean lockedMatch = LOCKED_UPDATER.compareAndSet(this, tx, null);
233         Preconditions.checkState(lockedMatch, "Attempted to submit transaction %s while we have %s", tx, lockedTx);
234         LOG.debug("Transaction {} unlocked", tx);
235
236         /*
237          * The transaction is ready. It will then be picked up by either next allocation,
238          * or a background transaction completion callback.
239          */
240         final boolean success = READY_UPDATER.compareAndSet(this, null, tx);
241         Preconditions.checkState(success, "Transaction %s collided on ready state", tx, readyTx);
242         LOG.debug("Transaction {} readied", tx);
243
244         /*
245          * We do not see a transaction being in-flight, so we need to take care of dispatching
246          * the transaction to the backend. We are in the ready case, we cannot short-cut
247          * the checking of readyTx, as an in-flight transaction may have completed between us
248          * setting the field above and us checking.
249          */
250         if (inflightTx == null) {
251             synchronized (this) {
252                 processIfReady();
253             }
254         }
255     }
256
257     @Override
258     public synchronized void close() {
259         final PingPongTransaction notLocked = lockedTx;
260         Preconditions.checkState(notLocked == null, "Attempted to close chain with outstanding transaction %s", notLocked);
261
262         // Force allocations on slow path. We will complete the rest
263         final PingPongTransaction tx = READY_UPDATER.getAndSet(this, null);
264
265         // Make sure no transaction is outstanding. Otherwise sleep a bit and retry
266         while (inflightTx != null) {
267             LOG.debug("Busy-waiting for in-flight transaction {} to complete", inflightTx);
268             Thread.yield();
269             continue;
270         }
271
272         // If we have an outstanding transaction, send it down
273         if (tx != null) {
274             processTransaction(tx);
275         }
276
277         // All done, close the delegate. All new allocations should fail.
278         delegate.close();
279     }
280
281     @Override
282     public DOMDataReadOnlyTransaction newReadOnlyTransaction() {
283         final PingPongTransaction tx = allocateTransaction();
284
285         return new DOMDataReadOnlyTransaction() {
286             @Override
287             public CheckedFuture<Optional<NormalizedNode<?, ?>>, ReadFailedException> read(final LogicalDatastoreType store,
288                     final YangInstanceIdentifier path) {
289                 return tx.getTransaction().read(store, path);
290             }
291
292             @Override
293             public CheckedFuture<Boolean, ReadFailedException> exists(final LogicalDatastoreType store,
294                     final YangInstanceIdentifier path) {
295                 return tx.getTransaction().exists(store, path);
296             }
297
298             @Override
299             public Object getIdentifier() {
300                 return tx.getTransaction().getIdentifier();
301             }
302
303             @Override
304             public void close() {
305                 readyTransaction(tx);
306             }
307         };
308     }
309
310     @Override
311     public DOMDataReadWriteTransaction newReadWriteTransaction() {
312         final PingPongTransaction tx = allocateTransaction();
313         final DOMDataReadWriteTransaction ret = new ForwardingDOMDataReadWriteTransaction() {
314             @Override
315             protected DOMDataReadWriteTransaction delegate() {
316                 return tx.getTransaction();
317             }
318
319             @Override
320             public CheckedFuture<Void, TransactionCommitFailedException> submit() {
321                 readyTransaction(tx);
322                 return tx.getSubmitFuture();
323             }
324
325             @Override
326             public ListenableFuture<RpcResult<TransactionStatus>> commit() {
327                 readyTransaction(tx);
328                 return tx.getCommitFuture();
329             }
330
331             @Override
332             public boolean cancel() {
333                 throw new UnsupportedOperationException("Transaction cancellation is not supported");
334             }
335         };
336
337         tx.recordFrontendTransaction(ret);
338         return ret;
339     }
340
341     @Override
342     public DOMDataWriteTransaction newWriteOnlyTransaction() {
343         return newReadWriteTransaction();
344     }
345 }