Remove lock out of the congested/reuse case
[controller.git] / opendaylight / md-sal / sal-dom-broker / src / main / java / org / opendaylight / controller / md / sal / dom / broker / impl / PingPongTransactionChain.java
1 /*
2  * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.controller.md.sal.dom.broker.impl;
9
10 import com.google.common.base.Optional;
11 import com.google.common.base.Preconditions;
12 import com.google.common.util.concurrent.CheckedFuture;
13 import com.google.common.util.concurrent.FutureCallback;
14 import com.google.common.util.concurrent.Futures;
15 import com.google.common.util.concurrent.ListenableFuture;
16 import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
17 import javax.annotation.Nonnull;
18 import javax.annotation.concurrent.GuardedBy;
19 import org.opendaylight.controller.md.sal.common.api.TransactionStatus;
20 import org.opendaylight.controller.md.sal.common.api.data.AsyncTransaction;
21 import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
22 import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
23 import org.opendaylight.controller.md.sal.common.api.data.TransactionChain;
24 import org.opendaylight.controller.md.sal.common.api.data.TransactionChainListener;
25 import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException;
26 import org.opendaylight.controller.md.sal.dom.api.DOMDataBroker;
27 import org.opendaylight.controller.md.sal.dom.api.DOMDataReadOnlyTransaction;
28 import org.opendaylight.controller.md.sal.dom.api.DOMDataReadWriteTransaction;
29 import org.opendaylight.controller.md.sal.dom.api.DOMDataWriteTransaction;
30 import org.opendaylight.controller.md.sal.dom.api.DOMTransactionChain;
31 import org.opendaylight.controller.md.sal.dom.spi.ForwardingDOMDataReadWriteTransaction;
32 import org.opendaylight.yangtools.yang.common.RpcResult;
33 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
34 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
35 import org.slf4j.Logger;
36 import org.slf4j.LoggerFactory;
37
38 /**
39  * An implementation of {@link DOMTransactionChain}, which has a very specific
40  * behavior, which some users may find surprising. If keeps the general
41  * intent of the contract, but it makes sure there are never more than two
42  * transactions allocated at any given time: one of them is being committed,
43  * and while that is happening, the other one acts as the scratch pad. Once
44  * the committing transaction completes successfully, the scratch transaction
45  * is enqueued as soon as it is ready.
46  *
47  * This mode of operation means that there is no inherent isolation between
48  * the front-end transactions and transactions cannot be reasonably cancelled.
49  *
50  * It furthermore means that the transactions returned by {@link #newReadOnlyTransaction()}
51  * counts as an outstanding transaction and the user may not allocate multiple
52  * read-only transactions at the same time.
53  */
54 public final class PingPongTransactionChain implements DOMTransactionChain {
55     private static final Logger LOG = LoggerFactory.getLogger(PingPongTransactionChain.class);
56     private final DOMTransactionChain delegate;
57
58     @GuardedBy("this")
59     private PingPongTransaction inflightTransaction;
60     @GuardedBy("this")
61     private boolean failed;
62
63     /**
64      * This updater is used to manipulate the "ready" transaction. We perform only atomic
65      * get-and-set on it.
66      */
67     private static final AtomicReferenceFieldUpdater<PingPongTransactionChain, PingPongTransaction> READY_UPDATER =
68             AtomicReferenceFieldUpdater.newUpdater(PingPongTransactionChain.class, PingPongTransaction.class, "readyTx");
69     @SuppressWarnings("unused") // Accessed via READY_UPDATER
70     private volatile PingPongTransaction readyTx;
71
72     /**
73      * This updater is used to manipulate the "locked" transaction. A locked transaction
74      * means we know that the user still holds a transaction and should at some point call
75      * us. We perform on compare-and-swap to ensure we properly detect when a user is
76      * attempting to allocated multiple transactions concurrently.
77      */
78     private static final AtomicReferenceFieldUpdater<PingPongTransactionChain, PingPongTransaction> LOCKED_UPDATER =
79             AtomicReferenceFieldUpdater.newUpdater(PingPongTransactionChain.class, PingPongTransaction.class, "lockedTx");
80     private volatile PingPongTransaction lockedTx;
81
82     PingPongTransactionChain(final DOMDataBroker broker, final TransactionChainListener listener) {
83         this.delegate = broker.createTransactionChain(new TransactionChainListener() {
84             @Override
85             public void onTransactionChainFailed(final TransactionChain<?, ?> chain, final AsyncTransaction<?, ?> transaction, final Throwable cause) {
86                 LOG.debug("Delegate chain {} reported failure in {}", chain, transaction, cause);
87
88                 final DOMDataReadWriteTransaction frontend;
89                 if (inflightTransaction == null) {
90                     LOG.warn("Transaction chain {} failed with no pending transactions", chain);
91                     frontend = null;
92                 } else {
93                     frontend = inflightTransaction.getFrontendTransaction();
94                 }
95
96                 listener.onTransactionChainFailed(PingPongTransactionChain.this, frontend , cause);
97                 delegateFailed();
98             }
99
100             @Override
101             public void onTransactionChainSuccessful(final TransactionChain<?, ?> chain) {
102                 listener.onTransactionChainSuccessful(PingPongTransactionChain.this);
103             }
104         });
105     }
106
107     private synchronized void delegateFailed() {
108         failed = true;
109
110         /*
111          * If we do not have a locked transaction, we need to ensure that
112          * the backend transaction is cancelled. Otherwise we can defer
113          * until the user calls us.
114          */
115         if (lockedTx == null) {
116             processIfReady();
117         }
118     }
119
120     private synchronized PingPongTransaction slowAllocateTransaction() {
121         final DOMDataReadWriteTransaction delegateTx = delegate.newReadWriteTransaction();
122         final PingPongTransaction newTx = new PingPongTransaction(delegateTx);
123
124         if (!LOCKED_UPDATER.compareAndSet(this, null, newTx)) {
125             delegateTx.cancel();
126             throw new IllegalStateException(String.format("New transaction %s raced with transacion %s", newTx, lockedTx));
127         }
128
129         return newTx;
130     }
131
132     private PingPongTransaction allocateTransaction() {
133         // Step 1: acquire current state
134         final PingPongTransaction oldTx = READY_UPDATER.getAndSet(this, null);
135
136         // Slow path: allocate a delegate transaction
137         if (oldTx == null) {
138             return slowAllocateTransaction();
139         }
140
141         // Fast path: reuse current transaction. We will check
142         //            failures and similar on submit().
143         if (!LOCKED_UPDATER.compareAndSet(this, null, oldTx)) {
144             // Ouch. Delegate chain has not detected a duplicate
145             // transaction allocation. This is the best we can do.
146             oldTx.getTransaction().cancel();
147             throw new IllegalStateException(String.format("Reusable transaction %s raced with transaction %s", oldTx, lockedTx));
148         }
149
150         return oldTx;
151     }
152
153     // This forces allocateTransaction() on a slow path
154     @GuardedBy("this")
155     private void processIfReady() {
156         final PingPongTransaction tx = READY_UPDATER.getAndSet(this, null);
157         if (tx != null) {
158             processTransaction(tx);
159         }
160     }
161
162     /**
163      * Process a ready transaction. The caller needs to ensure that
164      * each transaction is seen only once by this method.
165      *
166      * @param tx Transaction which needs processing.
167      */
168     @GuardedBy("this")
169     private void processTransaction(final @Nonnull PingPongTransaction tx) {
170         if (failed) {
171             LOG.debug("Cancelling transaction {}", tx);
172             tx.getTransaction().cancel();
173             return;
174         }
175
176         LOG.debug("Submitting transaction {}", tx);
177         final CheckedFuture<Void, ?> f = tx.getTransaction().submit();
178         inflightTransaction = tx;
179
180         Futures.addCallback(f, new FutureCallback<Void>() {
181             @Override
182             public void onSuccess(final Void result) {
183                 transactionSuccessful(tx, result);
184             }
185
186             @Override
187             public void onFailure(final Throwable t) {
188                 transactionFailed(tx, t);
189             }
190         });
191     }
192
193     private void transactionSuccessful(final PingPongTransaction tx, final Void result) {
194         LOG.debug("Transaction {} completed successfully", tx);
195
196         synchronized (this) {
197             Preconditions.checkState(inflightTransaction == tx, "Successful transaction %s while %s was submitted", tx, inflightTransaction);
198
199             inflightTransaction = null;
200             processIfReady();
201         }
202
203         // Can run unsynchronized
204         tx.onSuccess(result);
205     }
206
207     private void transactionFailed(final PingPongTransaction tx, final Throwable t) {
208         LOG.debug("Transaction {} failed", tx, t);
209
210         synchronized (this) {
211             Preconditions.checkState(inflightTransaction == tx, "Failed transaction %s while %s was submitted", tx, inflightTransaction);
212             inflightTransaction = null;
213         }
214
215         tx.onFailure(t);
216     }
217
218     private void readyTransaction(final @Nonnull PingPongTransaction tx) {
219         final boolean lockedMatch = LOCKED_UPDATER.compareAndSet(this, tx, null);
220         Preconditions.checkState(lockedMatch, "Attempted to submit transaction %s while we have %s", tx, lockedTx);
221
222         LOG.debug("Transaction {} unlocked", tx);
223
224         synchronized (this) {
225             if (inflightTransaction == null) {
226                 processTransaction(tx);
227             }
228         }
229     }
230
231     @Override
232     public void close() {
233         final PingPongTransaction notLocked = lockedTx;
234         Preconditions.checkState(notLocked == null, "Attempted to close chain with outstanding transaction %s", notLocked);
235
236         synchronized (this) {
237             processIfReady();
238             delegate.close();
239         }
240     }
241
242     @Override
243     public DOMDataReadOnlyTransaction newReadOnlyTransaction() {
244         final PingPongTransaction tx = allocateTransaction();
245
246         return new DOMDataReadOnlyTransaction() {
247             @Override
248             public CheckedFuture<Optional<NormalizedNode<?, ?>>, ReadFailedException> read(final LogicalDatastoreType store,
249                     final YangInstanceIdentifier path) {
250                 return tx.getTransaction().read(store, path);
251             }
252
253             @Override
254             public CheckedFuture<Boolean, ReadFailedException> exists(final LogicalDatastoreType store,
255                     final YangInstanceIdentifier path) {
256                 return tx.getTransaction().exists(store, path);
257             }
258
259             @Override
260             public Object getIdentifier() {
261                 return tx.getTransaction().getIdentifier();
262             }
263
264             @Override
265             public void close() {
266                 readyTransaction(tx);
267             }
268         };
269     }
270
271     @Override
272     public DOMDataReadWriteTransaction newReadWriteTransaction() {
273         final PingPongTransaction tx = allocateTransaction();
274         final DOMDataReadWriteTransaction ret = new ForwardingDOMDataReadWriteTransaction() {
275             @Override
276             protected DOMDataReadWriteTransaction delegate() {
277                 return tx.getTransaction();
278             }
279
280             @Override
281             public CheckedFuture<Void, TransactionCommitFailedException> submit() {
282                 readyTransaction(tx);
283                 return tx.getSubmitFuture();
284             }
285
286             @Override
287             public ListenableFuture<RpcResult<TransactionStatus>> commit() {
288                 readyTransaction(tx);
289                 return tx.getCommitFuture();
290             }
291
292             @Override
293             public boolean cancel() {
294                 throw new UnsupportedOperationException("Transaction cancellation is not supported");
295             }
296         };
297
298         tx.recordFrontendTransaction(ret);
299         return ret;
300     }
301
302     @Override
303     public DOMDataWriteTransaction newWriteOnlyTransaction() {
304         return newReadWriteTransaction();
305     }
306 }