Added type-safe DataObjectModification#getModifiedChildren
[mdsal.git] / dom / mdsal-dom-broker / src / main / java / org / opendaylight / mdsal / dom / broker / PingPongTransactionChain.java
1 /*
2  * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.mdsal.dom.broker;
9
10 import org.opendaylight.mdsal.dom.spi.ForwardingDOMDataReadWriteTransaction;
11
12 import org.opendaylight.mdsal.common.api.AsyncTransaction;
13 import org.opendaylight.mdsal.common.api.LogicalDatastoreType;
14 import org.opendaylight.mdsal.common.api.ReadFailedException;
15 import org.opendaylight.mdsal.common.api.TransactionChain;
16 import org.opendaylight.mdsal.common.api.TransactionChainListener;
17 import org.opendaylight.mdsal.common.api.TransactionCommitFailedException;
18 import com.google.common.base.Optional;
19 import com.google.common.base.Preconditions;
20 import com.google.common.util.concurrent.CheckedFuture;
21 import com.google.common.util.concurrent.FutureCallback;
22 import com.google.common.util.concurrent.Futures;
23 import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
24 import javax.annotation.Nonnull;
25 import javax.annotation.concurrent.GuardedBy;
26 import org.opendaylight.mdsal.dom.api.DOMDataBroker;
27 import org.opendaylight.mdsal.dom.api.DOMDataReadOnlyTransaction;
28 import org.opendaylight.mdsal.dom.api.DOMDataReadWriteTransaction;
29 import org.opendaylight.mdsal.dom.api.DOMDataWriteTransaction;
30 import org.opendaylight.mdsal.dom.api.DOMTransactionChain;
31 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
32 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
33 import org.slf4j.Logger;
34 import org.slf4j.LoggerFactory;
35
36 /**
37  * An implementation of {@link DOMTransactionChain}, which has a very specific
38  * behavior, which some users may find surprising. If keeps the general
39  * intent of the contract, but it makes sure there are never more than two
40  * transactions allocated at any given time: one of them is being committed,
41  * and while that is happening, the other one acts as the scratch pad. Once
42  * the committing transaction completes successfully, the scratch transaction
43  * is enqueued as soon as it is ready.
44  *
45  * This mode of operation means that there is no inherent isolation between
46  * the front-end transactions and transactions cannot be reasonably cancelled.
47  *
48  * It furthermore means that the transactions returned by {@link #newReadOnlyTransaction()}
49  * counts as an outstanding transaction and the user may not allocate multiple
50  * read-only transactions at the same time.
51  */
52 public final class PingPongTransactionChain implements DOMTransactionChain {
53     private static final Logger LOG = LoggerFactory.getLogger(PingPongTransactionChain.class);
54     private final DOMTransactionChain delegate;
55
56     @GuardedBy("this")
57     private boolean failed;
58
59     /**
60      * This updater is used to manipulate the "ready" transaction. We perform only atomic
61      * get-and-set on it.
62      */
63     private static final AtomicReferenceFieldUpdater<PingPongTransactionChain, PingPongTransaction> READY_UPDATER =
64             AtomicReferenceFieldUpdater.newUpdater(PingPongTransactionChain.class, PingPongTransaction.class, "readyTx");
65     private volatile PingPongTransaction readyTx;
66
67     /**
68      * This updater is used to manipulate the "locked" transaction. A locked transaction
69      * means we know that the user still holds a transaction and should at some point call
70      * us. We perform on compare-and-swap to ensure we properly detect when a user is
71      * attempting to allocated multiple transactions concurrently.
72      */
73     private static final AtomicReferenceFieldUpdater<PingPongTransactionChain, PingPongTransaction> LOCKED_UPDATER =
74             AtomicReferenceFieldUpdater.newUpdater(PingPongTransactionChain.class, PingPongTransaction.class, "lockedTx");
75     private volatile PingPongTransaction lockedTx;
76
77     /**
78      * This updater is used to manipulate the "inflight" transaction. There can be at most
79      * one of these at any given time. We perform only compare-and-swap on these.
80      */
81     private static final AtomicReferenceFieldUpdater<PingPongTransactionChain, PingPongTransaction> INFLIGHT_UPDATER =
82             AtomicReferenceFieldUpdater.newUpdater(PingPongTransactionChain.class, PingPongTransaction.class, "inflightTx");
83     private volatile PingPongTransaction inflightTx;
84
85     PingPongTransactionChain(final DOMDataBroker broker, final TransactionChainListener listener) {
86         this.delegate = broker.createTransactionChain(new TransactionChainListener() {
87             @Override
88             public void onTransactionChainFailed(final TransactionChain<?, ?> chain, final AsyncTransaction<?, ?> transaction, final Throwable cause) {
89                 LOG.debug("Delegate chain {} reported failure in {}", chain, transaction, cause);
90
91                 final DOMDataReadWriteTransaction frontend;
92                 final PingPongTransaction tx = inflightTx;
93                 if (tx == null) {
94                     LOG.warn("Transaction chain {} failed with no pending transactions", chain);
95                     frontend = null;
96                 } else {
97                     frontend = tx.getFrontendTransaction();
98                 }
99
100                 listener.onTransactionChainFailed(PingPongTransactionChain.this, frontend, cause);
101                 delegateFailed();
102             }
103
104             @Override
105             public void onTransactionChainSuccessful(final TransactionChain<?, ?> chain) {
106                 listener.onTransactionChainSuccessful(PingPongTransactionChain.this);
107             }
108         });
109     }
110
111     private synchronized void delegateFailed() {
112         failed = true;
113
114         /*
115          * If we do not have a locked transaction, we need to ensure that
116          * the backend transaction is cancelled. Otherwise we can defer
117          * until the user calls us.
118          */
119         if (lockedTx == null) {
120             processIfReady();
121         }
122     }
123
124     private synchronized PingPongTransaction slowAllocateTransaction() {
125         final DOMDataReadWriteTransaction delegateTx = delegate.newReadWriteTransaction();
126         final PingPongTransaction newTx = new PingPongTransaction(delegateTx);
127
128         if (!LOCKED_UPDATER.compareAndSet(this, null, newTx)) {
129             delegateTx.cancel();
130             throw new IllegalStateException(String.format("New transaction %s raced with transacion %s", newTx, lockedTx));
131         }
132
133         return newTx;
134     }
135
136     private PingPongTransaction allocateTransaction() {
137         // Step 1: acquire current state
138         final PingPongTransaction oldTx = READY_UPDATER.getAndSet(this, null);
139
140         // Slow path: allocate a delegate transaction
141         if (oldTx == null) {
142             return slowAllocateTransaction();
143         }
144
145         // Fast path: reuse current transaction. We will check
146         //            failures and similar on submit().
147         if (!LOCKED_UPDATER.compareAndSet(this, null, oldTx)) {
148             // Ouch. Delegate chain has not detected a duplicate
149             // transaction allocation. This is the best we can do.
150             oldTx.getTransaction().cancel();
151             throw new IllegalStateException(String.format("Reusable transaction %s raced with transaction %s", oldTx, lockedTx));
152         }
153
154         return oldTx;
155     }
156
157     /*
158      * This forces allocateTransaction() on a slow path, which has to happen after
159      * this method has completed executing. Also inflightTx may be updated outside
160      * the lock, hence we need to re-check.
161      */
162     @GuardedBy("this")
163     private void processIfReady() {
164         if (inflightTx == null) {
165             final PingPongTransaction tx = READY_UPDATER.getAndSet(this, null);
166             if (tx != null) {
167                 processTransaction(tx);
168             }
169         }
170     }
171
172     /**
173      * Process a ready transaction. The caller needs to ensure that
174      * each transaction is seen only once by this method.
175      *
176      * @param tx Transaction which needs processing.
177      */
178     @GuardedBy("this")
179     private void processTransaction(@Nonnull final PingPongTransaction tx) {
180         if (failed) {
181             LOG.debug("Cancelling transaction {}", tx);
182             tx.getTransaction().cancel();
183             return;
184         }
185
186         LOG.debug("Submitting transaction {}", tx);
187         if (!INFLIGHT_UPDATER.compareAndSet(this, null, tx)) {
188             LOG.warn("Submitting transaction {} while {} is still running", tx, inflightTx);
189         }
190
191         Futures.addCallback(tx.getTransaction().submit(), new FutureCallback<Void>() {
192             @Override
193             public void onSuccess(final Void result) {
194                 transactionSuccessful(tx, result);
195             }
196
197             @Override
198             public void onFailure(final Throwable t) {
199                 transactionFailed(tx, t);
200             }
201         });
202     }
203
204     private void transactionSuccessful(final PingPongTransaction tx, final Void result) {
205         LOG.debug("Transaction {} completed successfully", tx);
206
207         final boolean success = INFLIGHT_UPDATER.compareAndSet(this, tx, null);
208         Preconditions.checkState(success, "Successful transaction %s while %s was submitted", tx, inflightTx);
209
210         synchronized (this) {
211             processIfReady();
212         }
213
214         // Can run unsynchronized
215         tx.onSuccess(result);
216     }
217
218     private void transactionFailed(final PingPongTransaction tx, final Throwable t) {
219         LOG.debug("Transaction {} failed", tx, t);
220
221         final boolean success = INFLIGHT_UPDATER.compareAndSet(this, tx, null);
222         Preconditions.checkState(success, "Failed transaction %s while %s was submitted", tx, inflightTx);
223
224         tx.onFailure(t);
225     }
226
227     private void readyTransaction(@Nonnull final PingPongTransaction tx) {
228         // First mark the transaction as not locked.
229         final boolean lockedMatch = LOCKED_UPDATER.compareAndSet(this, tx, null);
230         Preconditions.checkState(lockedMatch, "Attempted to submit transaction %s while we have %s", tx, lockedTx);
231         LOG.debug("Transaction {} unlocked", tx);
232
233         /*
234          * The transaction is ready. It will then be picked up by either next allocation,
235          * or a background transaction completion callback.
236          */
237         final boolean success = READY_UPDATER.compareAndSet(this, null, tx);
238         Preconditions.checkState(success, "Transaction %s collided on ready state", tx, readyTx);
239         LOG.debug("Transaction {} readied", tx);
240
241         /*
242          * We do not see a transaction being in-flight, so we need to take care of dispatching
243          * the transaction to the backend. We are in the ready case, we cannot short-cut
244          * the checking of readyTx, as an in-flight transaction may have completed between us
245          * setting the field above and us checking.
246          */
247         if (inflightTx == null) {
248             synchronized (this) {
249                 processIfReady();
250             }
251         }
252     }
253
254     @Override
255     public synchronized void close() {
256         final PingPongTransaction notLocked = lockedTx;
257         Preconditions.checkState(notLocked == null, "Attempted to close chain with outstanding transaction %s", notLocked);
258
259         // Force allocations on slow path. We will complete the rest
260         final PingPongTransaction tx = READY_UPDATER.getAndSet(this, null);
261
262         // Make sure no transaction is outstanding. Otherwise sleep a bit and retry
263         while (inflightTx != null) {
264             LOG.debug("Busy-waiting for in-flight transaction {} to complete", inflightTx);
265             Thread.yield();
266             continue;
267         }
268
269         // If we have an outstanding transaction, send it down
270         if (tx != null) {
271             processTransaction(tx);
272         }
273
274         // All done, close the delegate. All new allocations should fail.
275         delegate.close();
276     }
277
278     @Override
279     public DOMDataReadOnlyTransaction newReadOnlyTransaction() {
280         final PingPongTransaction tx = allocateTransaction();
281
282         return new DOMDataReadOnlyTransaction() {
283             @Override
284             public CheckedFuture<Optional<NormalizedNode<?, ?>>, ReadFailedException> read(final LogicalDatastoreType store,
285                     final YangInstanceIdentifier path) {
286                 return tx.getTransaction().read(store, path);
287             }
288
289             @Override
290             public CheckedFuture<Boolean, ReadFailedException> exists(final LogicalDatastoreType store,
291                     final YangInstanceIdentifier path) {
292                 return tx.getTransaction().exists(store, path);
293             }
294
295             @Override
296             public Object getIdentifier() {
297                 return tx.getTransaction().getIdentifier();
298             }
299
300             @Override
301             public void close() {
302                 readyTransaction(tx);
303             }
304         };
305     }
306
307     @Override
308     public DOMDataReadWriteTransaction newReadWriteTransaction() {
309         final PingPongTransaction tx = allocateTransaction();
310         final DOMDataReadWriteTransaction ret = new ForwardingDOMDataReadWriteTransaction() {
311             @Override
312             protected DOMDataReadWriteTransaction delegate() {
313                 return tx.getTransaction();
314             }
315
316             @Override
317             public CheckedFuture<Void, TransactionCommitFailedException> submit() {
318                 readyTransaction(tx);
319                 return tx.getSubmitFuture();
320             }
321
322             @Override
323             public boolean cancel() {
324                 throw new UnsupportedOperationException("Transaction cancellation is not supported");
325             }
326         };
327
328         tx.recordFrontendTransaction(ret);
329         return ret;
330     }
331
332     @Override
333     public DOMDataWriteTransaction newWriteOnlyTransaction() {
334         return newReadWriteTransaction();
335     }
336 }