Remove (DOM)TransactionChainListener
[mdsal.git] / dom / mdsal-dom-spi / src / main / java / org / opendaylight / mdsal / dom / spi / AbstractPingPongTransactionChain.java
1 /*
2  * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.mdsal.dom.spi;
9
10 import static com.google.common.base.Preconditions.checkState;
11 import static com.google.common.base.Verify.verify;
12 import static java.util.Objects.requireNonNull;
13
14 import com.google.common.util.concurrent.FluentFuture;
15 import com.google.common.util.concurrent.FutureCallback;
16 import com.google.common.util.concurrent.ListenableFuture;
17 import com.google.common.util.concurrent.MoreExecutors;
18 import com.google.common.util.concurrent.SettableFuture;
19 import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
20 import java.lang.invoke.MethodHandles;
21 import java.lang.invoke.VarHandle;
22 import java.util.Map;
23 import java.util.Map.Entry;
24 import java.util.Optional;
25 import java.util.concurrent.CancellationException;
26 import org.checkerframework.checker.lock.qual.GuardedBy;
27 import org.checkerframework.checker.lock.qual.Holding;
28 import org.eclipse.jdt.annotation.NonNull;
29 import org.eclipse.jdt.annotation.Nullable;
30 import org.opendaylight.mdsal.common.api.CommitInfo;
31 import org.opendaylight.mdsal.common.api.LogicalDatastoreType;
32 import org.opendaylight.mdsal.dom.api.DOMDataTreeReadTransaction;
33 import org.opendaylight.mdsal.dom.api.DOMDataTreeReadWriteTransaction;
34 import org.opendaylight.mdsal.dom.api.DOMDataTreeWriteTransaction;
35 import org.opendaylight.mdsal.dom.api.DOMTransactionChain;
36 import org.opendaylight.yangtools.yang.common.Empty;
37 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
38 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
39 import org.slf4j.Logger;
40 import org.slf4j.LoggerFactory;
41
42 /**
43  * The actual implementation of {@link PingPongTransactionChain}. Split out to allow deeper testing while keeping the
44  * externally-visible implementation final.
45  */
46 abstract class AbstractPingPongTransactionChain implements DOMTransactionChain {
47     private static final Logger LOG = LoggerFactory.getLogger(AbstractPingPongTransactionChain.class);
48
49     private final @NonNull SettableFuture<Empty> future = SettableFuture.create();
50     private final @NonNull DOMTransactionChain delegate;
51
52     @GuardedBy("this")
53     private boolean closed;
54     @GuardedBy("this")
55     private boolean failed;
56     @GuardedBy("this")
57     private PingPongTransaction shutdownTx;
58     @GuardedBy("this")
59     private Entry<PingPongTransaction, Throwable> deadTx;
60
61     //  This VarHandle is used to manipulate the "ready" transaction. We perform only atomic get-and-set on it.
62     private static final VarHandle READY_TX;
63     @SuppressWarnings("unused")
64     @SuppressFBWarnings(value = "UUF_UNUSED_FIELD", justification = "https://github.com/spotbugs/spotbugs/issues/2749")
65     private volatile PingPongTransaction readyTx;
66
67     /*
68      * This VarHandle is used to manipulate the "locked" transaction. A locked transaction means we know that the user
69      * still holds a transaction and should at some point call us. We perform on compare-and-swap to ensure we properly
70      * detect when a user is attempting to allocated multiple transactions concurrently.
71      */
72     private static final VarHandle LOCKED_TX;
73     @SuppressFBWarnings(value = "UWF_UNWRITTEN_FIELD",
74         justification = "https://github.com/spotbugs/spotbugs/issues/2749")
75     private volatile PingPongTransaction lockedTx;
76
77     /*
78      * This updater is used to manipulate the "inflight" transaction. There can be at most one of these at any given
79      * time. We perform only compare-and-swap on these.
80      */
81     private static final VarHandle INFLIGHT_TX;
82     @SuppressFBWarnings(value = "UWF_UNWRITTEN_FIELD",
83         justification = "https://github.com/spotbugs/spotbugs/issues/2749")
84     private volatile PingPongTransaction inflightTx;
85
86     static {
87         final var lookup = MethodHandles.lookup();
88         try {
89             INFLIGHT_TX = lookup.findVarHandle(AbstractPingPongTransactionChain.class, "inflightTx",
90                 PingPongTransaction.class);
91             LOCKED_TX = lookup.findVarHandle(AbstractPingPongTransactionChain.class, "lockedTx",
92                 PingPongTransaction.class);
93             READY_TX = lookup.findVarHandle(AbstractPingPongTransactionChain.class, "readyTx",
94                 PingPongTransaction.class);
95         } catch (NoSuchFieldException | IllegalAccessException e) {
96             throw new ExceptionInInitializerError(e);
97         }
98     }
99
100     AbstractPingPongTransactionChain(final DOMTransactionChain delegate) {
101         this.delegate = requireNonNull(delegate);
102         delegate.addCallback(new FutureCallback<>() {
103             @Override
104             public void onSuccess(final Empty result) {
105                 delegateSuccessful();
106             }
107
108             @Override
109             public void onFailure(final Throwable cause) {
110                 delegateFailed(cause);
111             }
112         });
113     }
114
115     @Override
116     public final ListenableFuture<Empty> future() {
117         return future;
118     }
119
120     private void delegateSuccessful() {
121         final Entry<PingPongTransaction, Throwable> canceled;
122         synchronized (this) {
123             // This looks weird, but we need not hold the lock while invoking callbacks
124             canceled = deadTx;
125         }
126
127         if (canceled == null) {
128             future.set(Empty.value());
129             return;
130         }
131
132         // Backend shutdown successful, but we have a batch of transactions we have to report as dead due to the
133         // user calling cancel().
134         final var tx = canceled.getKey();
135         final var cause = canceled.getValue();
136         LOG.debug("Transaction chain {} successful, failing cancelled transaction {}", delegate, tx, cause);
137
138         future.setException(cause);
139         tx.onFailure(cause);
140     }
141
142     private void delegateFailed(final Throwable cause) {
143         LOG.debug("Transaction chain {} reported failure", delegate, cause);
144
145         final var tx = inflightTx;
146         if (tx == null) {
147             LOG.warn("Transaction chain {} failed with no pending transactions", delegate);
148         }
149         future.setException(cause);
150
151         synchronized (this) {
152             failed = true;
153
154             /*
155              * If we do not have a locked transaction, we need to ensure that the backend transaction is cancelled.
156              * Otherwise we can defer until the user calls us.
157              */
158             if (lockedTx == null) {
159                 processIfReady();
160             }
161         }
162     }
163
164     private synchronized @NonNull PingPongTransaction slowAllocateTransaction() {
165         checkState(shutdownTx == null, "Transaction chain %s has been shut down", this);
166
167         if (deadTx != null) {
168             throw new IllegalStateException(String.format(
169                 "Transaction chain %s has failed due to transaction %s being canceled", this, deadTx.getKey()),
170                 deadTx.getValue());
171         }
172
173         final DOMDataTreeReadWriteTransaction delegateTx = delegate.newReadWriteTransaction();
174         final PingPongTransaction newTx = new PingPongTransaction(delegateTx);
175
176         final Object witness = LOCKED_TX.compareAndExchange(this, null, newTx);
177         if (witness != null) {
178             delegateTx.cancel();
179             throw new IllegalStateException(
180                     String.format("New transaction %s raced with transaction %s", newTx, witness));
181         }
182
183         return newTx;
184     }
185
186     private @Nullable PingPongTransaction acquireReadyTx() {
187         return (PingPongTransaction) READY_TX.getAndSet(this, null);
188     }
189
190     private @NonNull PingPongTransaction allocateTransaction() {
191         // Step 1: acquire current state
192         final PingPongTransaction oldTx = acquireReadyTx();
193
194         // Slow path: allocate a delegate transaction
195         if (oldTx == null) {
196             return slowAllocateTransaction();
197         }
198
199         // Fast path: reuse current transaction. We will check failures and similar on commit().
200         final Object witness = LOCKED_TX.compareAndExchange(this, null, oldTx);
201         if (witness != null) {
202             // Ouch. Delegate chain has not detected a duplicate transaction allocation. This is the best we can do.
203             oldTx.getTransaction().cancel();
204             throw new IllegalStateException(String.format("Reusable transaction %s raced with transaction %s", oldTx,
205                 witness));
206         }
207
208         return oldTx;
209     }
210
211     /**
212      * This forces allocateTransaction() on a slow path, which has to happen after this method has completed executing.
213      * Also inflightTx may be updated outside the lock, hence we need to re-check.
214      */
215     @Holding("this")
216     private void processIfReady() {
217         if (inflightTx == null) {
218             final PingPongTransaction tx = acquireReadyTx();
219             if (tx != null) {
220                 processTransaction(tx);
221             }
222         }
223     }
224
225     /**
226      * Process a ready transaction. The caller needs to ensure that each transaction is seen only once by this method.
227      *
228      * @param tx Transaction which needs processing.
229      */
230     @Holding("this")
231     private void processTransaction(final @NonNull PingPongTransaction tx) {
232         if (failed) {
233             LOG.debug("Cancelling transaction {}", tx);
234             tx.getTransaction().cancel();
235             return;
236         }
237
238         LOG.debug("Submitting transaction {}", tx);
239         final Object witness = INFLIGHT_TX.compareAndExchange(this, null, tx);
240         if (witness != null) {
241             LOG.warn("Submitting transaction {} while {} is still running", tx, witness);
242         }
243
244         tx.getTransaction().commit().addCallback(new FutureCallback<CommitInfo>() {
245             @Override
246             public void onSuccess(final CommitInfo result) {
247                 transactionSuccessful(tx, result);
248             }
249
250             @Override
251             public void onFailure(final Throwable throwable) {
252                 transactionFailed(tx, throwable);
253             }
254         }, MoreExecutors.directExecutor());
255     }
256
257     /*
258      * We got invoked from the data store thread. We need to do two things:
259      * 1) release the in-flight transaction
260      * 2) process the potential next transaction
261      *
262      * We have to perform 2) under lock. We could perform 1) without locking, but that means the CAS result may
263      * not be accurate, as a user thread may submit the ready transaction before we acquire the lock -- and checking
264      * for next transaction is not enough, as that may have also be allocated (as a result of a quick
265      * submit/allocate/submit between 1) and 2)). Hence we'd end up doing the following:
266      * 1) CAS of inflightTx
267      * 2) take lock
268      * 3) volatile read of inflightTx
269      *
270      * Rather than doing that, we keep this method synchronized, hence performing only:
271      * 1) take lock
272      * 2) CAS of inflightTx
273      *
274      * Since the user thread is barred from submitting the transaction (in processIfReady), we can then proceed with
275      * the knowledge that inflightTx is null -- processTransaction() will still do a CAS, but that is only for
276      * correctness.
277      */
278     private synchronized void processNextTransaction(final PingPongTransaction tx) {
279         final Object witness = INFLIGHT_TX.compareAndExchange(this, tx, null);
280         checkState(witness == tx, "Completed transaction %s while %s was submitted", tx, witness);
281
282         final PingPongTransaction nextTx = acquireReadyTx();
283         if (nextTx == null) {
284             final PingPongTransaction local = shutdownTx;
285             if (local != null) {
286                 processTransaction(local);
287                 delegate.close();
288                 shutdownTx = null;
289             }
290         } else {
291             processTransaction(nextTx);
292         }
293     }
294
295     private void transactionSuccessful(final PingPongTransaction tx, final CommitInfo result) {
296         LOG.debug("Transaction {} completed successfully", tx);
297
298         tx.onSuccess(result);
299         processNextTransaction(tx);
300     }
301
302     private void transactionFailed(final PingPongTransaction tx, final Throwable throwable) {
303         LOG.debug("Transaction {} failed", tx, throwable);
304
305         tx.onFailure(throwable);
306         processNextTransaction(tx);
307     }
308
309     private void readyTransaction(final @NonNull PingPongTransaction tx) {
310         // First mark the transaction as not locked.
311         final Object lockedWitness = LOCKED_TX.compareAndExchange(this, tx, null);
312         checkState(lockedWitness == tx, "Attempted to submit transaction %s while we have %s", tx, lockedWitness);
313         LOG.debug("Transaction {} unlocked", tx);
314
315         /*
316          * The transaction is ready. It will then be picked up by either next allocation,
317          * or a background transaction completion callback.
318          */
319         final Object readyWitness = READY_TX.compareAndExchange(this, null, tx);
320         checkState(readyWitness == null, "Transaction %s collided on ready state with %s", tx, readyWitness);
321         LOG.debug("Transaction {} readied", tx);
322
323         /*
324          * We do not see a transaction being in-flight, so we need to take care of dispatching
325          * the transaction to the backend. We are in the ready case, we cannot short-cut
326          * the checking of readyTx, as an in-flight transaction may have completed between us
327          * setting the field above and us checking.
328          */
329         if (inflightTx == null) {
330             synchronized (this) {
331                 processIfReady();
332             }
333         }
334     }
335
336     /**
337      * Transaction cancellation is a heavyweight operation. We only support cancelation of a locked transaction
338      * and return false for everything else. Cancelling such a transaction will result in all transactions in the
339      * batch to be cancelled.
340      *
341      * @param tx Backend shared transaction
342      * @param frontendTx transaction
343      * @return {@code true} if the transaction was cancelled successfully
344      */
345     private synchronized boolean cancelTransaction(final PingPongTransaction tx,
346             final DOMDataTreeReadWriteTransaction frontendTx) {
347         // Attempt to unlock the operation.
348         final Object witness = LOCKED_TX.compareAndExchange(this, tx, null);
349         verify(witness == tx, "Cancelling transaction %s collided with locked transaction %s", tx, witness);
350
351         // Cancel the backend transaction, so we do not end up leaking it.
352         final boolean backendCancelled = tx.getTransaction().cancel();
353
354         if (failed) {
355             // The transaction has failed, this is probably the user just clearing up the transaction they had. We have
356             // already cancelled the transaction anyway,
357             return true;
358         }
359
360         // We have dealt with cancelling the backend transaction and have unlocked the transaction. Since we are still
361         // inside the synchronized block, any allocations are blocking on the slow path. Now we have to decide the fate
362         // of this transaction chain.
363         //
364         // If there are no other frontend transactions in this batch we are aligned with backend state and we can
365         // continue processing.
366         if (frontendTx.equals(tx.getFrontendTransaction())) {
367             if (backendCancelled) {
368                 LOG.debug("Cancelled transaction {} was head of the batch, resuming processing", tx);
369                 return true;
370             }
371
372             // Backend refused to cancel the transaction. Reinstate it to locked state.
373             final Object reinstateWitness = LOCKED_TX.compareAndExchange(this, null, tx);
374             verify(reinstateWitness == null, "Reinstating transaction %s collided with locked transaction %s", tx,
375                 reinstateWitness);
376             return false;
377         }
378
379         if (!backendCancelled) {
380             LOG.warn("Backend transaction cannot be cancelled during cancellation of {}, attempting to continue", tx);
381         }
382
383         // There are multiple frontend transactions in this batch. We have to report them as failed, which dooms this
384         // transaction chain, too. Since we just came off of a locked transaction, we do not have a ready transaction
385         // at the moment, but there may be some transaction in-flight. So we proceed to shutdown the backend chain
386         // and mark the fact that we should be turning its completion into a failure.
387         deadTx = Map.entry(tx, new CancellationException("Transaction " + frontendTx + " canceled").fillInStackTrace());
388         delegate.close();
389         return true;
390     }
391
392     @Override
393     public final synchronized void close() {
394         if (closed) {
395             LOG.debug("Attempted to close an already-closed chain");
396             return;
397         }
398
399         // Note: we do not derive from AbstractRegistration due to ordering of this check
400         final var notLocked = lockedTx;
401         if (notLocked != null) {
402             throw new IllegalStateException("Attempted to close chain with outstanding transaction " + notLocked);
403         }
404         closed = true;
405
406         // This may be a reaction to our failure callback, in that case the backend is already shutdown
407         if (deadTx != null) {
408             LOG.debug("Delegate {} is already closed due to failure {}", delegate, deadTx);
409             return;
410         }
411
412         // Force allocations on slow path, picking up a potentially-outstanding transaction
413         final var tx = acquireReadyTx();
414         if (tx != null) {
415             // We have one more transaction, which needs to be processed somewhere. If we do not
416             // a transaction in-flight, we need to push it down ourselves.
417             // If there is an in-flight transaction we will schedule this last one into a dedicated
418             // slot. Allocation slow path will check its presence and fail, the in-flight path will
419             // pick it up, submit and immediately close the chain.
420             if (inflightTx == null) {
421                 processTransaction(tx);
422                 delegate.close();
423             } else {
424                 shutdownTx = tx;
425             }
426         } else {
427             // Nothing outstanding, we can safely shutdown
428             delegate.close();
429         }
430     }
431
432     @Override
433     public final DOMDataTreeReadTransaction newReadOnlyTransaction() {
434         return new PingPongReadTransaction(allocateTransaction());
435     }
436
437     @Override
438     public final DOMDataTreeReadWriteTransaction newReadWriteTransaction() {
439         final PingPongTransaction tx = allocateTransaction();
440         final DOMDataTreeReadWriteTransaction ret = new PingPongReadWriteTransaction(tx);
441         tx.recordFrontendTransaction(ret);
442         return ret;
443     }
444
445     @Override
446     public final DOMDataTreeWriteTransaction newWriteOnlyTransaction() {
447         return newReadWriteTransaction();
448     }
449
450     private final class PingPongReadTransaction implements DOMDataTreeReadTransaction {
451         private final @NonNull PingPongTransaction tx;
452
453         PingPongReadTransaction(final PingPongTransaction tx) {
454             this.tx = requireNonNull(tx);
455         }
456
457         @Override
458         public FluentFuture<Optional<NormalizedNode>> read(final LogicalDatastoreType store,
459                 final YangInstanceIdentifier path) {
460             return tx.getTransaction().read(store, path);
461         }
462
463         @Override
464         public FluentFuture<Boolean> exists(final LogicalDatastoreType store, final YangInstanceIdentifier path) {
465             return tx.getTransaction().exists(store, path);
466         }
467
468         @Override
469         public Object getIdentifier() {
470             return tx.getTransaction().getIdentifier();
471         }
472
473         @Override
474         public void close() {
475             readyTransaction(tx);
476         }
477     }
478
479     private final class PingPongReadWriteTransaction extends ForwardingDOMDataReadWriteTransaction {
480         private final @NonNull PingPongTransaction tx;
481
482         private boolean isOpen = true;
483
484         PingPongReadWriteTransaction(final PingPongTransaction tx) {
485             this.tx = requireNonNull(tx);
486         }
487
488         @Override
489         public FluentFuture<? extends CommitInfo> commit() {
490             readyTransaction(tx);
491             isOpen = false;
492             return tx.getCommitFuture().transform(ignored -> CommitInfo.empty(), MoreExecutors.directExecutor());
493         }
494
495         @Override
496         public boolean cancel() {
497             if (isOpen && cancelTransaction(tx, this)) {
498                 isOpen = false;
499                 return true;
500             }
501             return false;
502         }
503
504         @Override
505         protected DOMDataTreeReadWriteTransaction delegate() {
506             return tx.getTransaction();
507         }
508     }
509 }