26b5685094f42e2fc484867a8ac44c8ecab9f5d7
[controller.git] / opendaylight / md-sal / sal-dom-broker / src / main / java / org / opendaylight / controller / md / sal / dom / broker / impl / PingPongTransactionChain.java
1 /*
2  * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.controller.md.sal.dom.broker.impl;
9
10 import com.google.common.base.Optional;
11 import com.google.common.base.Preconditions;
12 import com.google.common.util.concurrent.CheckedFuture;
13 import com.google.common.util.concurrent.FutureCallback;
14 import com.google.common.util.concurrent.Futures;
15 import com.google.common.util.concurrent.ListenableFuture;
16 import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
17 import javax.annotation.Nonnull;
18 import javax.annotation.concurrent.GuardedBy;
19 import org.opendaylight.controller.md.sal.common.api.TransactionStatus;
20 import org.opendaylight.controller.md.sal.common.api.data.AsyncTransaction;
21 import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
22 import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
23 import org.opendaylight.controller.md.sal.common.api.data.TransactionChain;
24 import org.opendaylight.controller.md.sal.common.api.data.TransactionChainListener;
25 import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException;
26 import org.opendaylight.controller.md.sal.dom.api.DOMDataBroker;
27 import org.opendaylight.controller.md.sal.dom.api.DOMDataReadOnlyTransaction;
28 import org.opendaylight.controller.md.sal.dom.api.DOMDataReadWriteTransaction;
29 import org.opendaylight.controller.md.sal.dom.api.DOMDataWriteTransaction;
30 import org.opendaylight.controller.md.sal.dom.api.DOMTransactionChain;
31 import org.opendaylight.controller.md.sal.dom.spi.ForwardingDOMDataReadWriteTransaction;
32 import org.opendaylight.yangtools.yang.common.RpcResult;
33 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
34 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
35 import org.slf4j.Logger;
36 import org.slf4j.LoggerFactory;
37
38 /**
39  * An implementation of {@link DOMTransactionChain}, which has a very specific
40  * behavior, which some users may find surprising. If keeps the general
41  * intent of the contract, but it makes sure there are never more than two
42  * transactions allocated at any given time: one of them is being committed,
43  * and while that is happening, the other one acts as the scratch pad. Once
44  * the committing transaction completes successfully, the scratch transaction
45  * is enqueued as soon as it is ready.
46  *
47  * This mode of operation means that there is no inherent isolation between
48  * the front-end transactions and transactions cannot be reasonably cancelled.
49  *
50  * It furthermore means that the transactions returned by {@link #newReadOnlyTransaction()}
51  * counts as an outstanding transaction and the user may not allocate multiple
52  * read-only transactions at the same time.
53  */
54 public final class PingPongTransactionChain implements DOMTransactionChain {
55     private static final Logger LOG = LoggerFactory.getLogger(PingPongTransactionChain.class);
56     private final DOMTransactionChain delegate;
57
58     @GuardedBy("this")
59     private boolean failed;
60     @GuardedBy("this")
61     private PingPongTransaction shutdownTx;
62
63     /**
64      * This updater is used to manipulate the "ready" transaction. We perform only atomic
65      * get-and-set on it.
66      */
67     private static final AtomicReferenceFieldUpdater<PingPongTransactionChain, PingPongTransaction> READY_UPDATER =
68             AtomicReferenceFieldUpdater.newUpdater(PingPongTransactionChain.class, PingPongTransaction.class, "readyTx");
69     private volatile PingPongTransaction readyTx;
70
71     /**
72      * This updater is used to manipulate the "locked" transaction. A locked transaction
73      * means we know that the user still holds a transaction and should at some point call
74      * us. We perform on compare-and-swap to ensure we properly detect when a user is
75      * attempting to allocated multiple transactions concurrently.
76      */
77     private static final AtomicReferenceFieldUpdater<PingPongTransactionChain, PingPongTransaction> LOCKED_UPDATER =
78             AtomicReferenceFieldUpdater.newUpdater(PingPongTransactionChain.class, PingPongTransaction.class, "lockedTx");
79     private volatile PingPongTransaction lockedTx;
80
81     /**
82      * This updater is used to manipulate the "inflight" transaction. There can be at most
83      * one of these at any given time. We perform only compare-and-swap on these.
84      */
85     private static final AtomicReferenceFieldUpdater<PingPongTransactionChain, PingPongTransaction> INFLIGHT_UPDATER =
86             AtomicReferenceFieldUpdater.newUpdater(PingPongTransactionChain.class, PingPongTransaction.class, "inflightTx");
87     private volatile PingPongTransaction inflightTx;
88
89     PingPongTransactionChain(final DOMDataBroker broker, final TransactionChainListener listener) {
90         this.delegate = broker.createTransactionChain(new TransactionChainListener() {
91             @Override
92             public void onTransactionChainFailed(final TransactionChain<?, ?> chain, final AsyncTransaction<?, ?> transaction, final Throwable cause) {
93                 LOG.debug("Delegate chain {} reported failure in {}", chain, transaction, cause);
94
95                 final DOMDataReadWriteTransaction frontend;
96                 final PingPongTransaction tx = inflightTx;
97                 if (tx == null) {
98                     LOG.warn("Transaction chain {} failed with no pending transactions", chain);
99                     frontend = null;
100                 } else {
101                     frontend = tx.getFrontendTransaction();
102                 }
103
104                 listener.onTransactionChainFailed(PingPongTransactionChain.this, frontend, cause);
105                 delegateFailed();
106             }
107
108             @Override
109             public void onTransactionChainSuccessful(final TransactionChain<?, ?> chain) {
110                 listener.onTransactionChainSuccessful(PingPongTransactionChain.this);
111             }
112         });
113     }
114
115     private synchronized void delegateFailed() {
116         failed = true;
117
118         /*
119          * If we do not have a locked transaction, we need to ensure that
120          * the backend transaction is cancelled. Otherwise we can defer
121          * until the user calls us.
122          */
123         if (lockedTx == null) {
124             processIfReady();
125         }
126     }
127
128     private synchronized PingPongTransaction slowAllocateTransaction() {
129         Preconditions.checkState(shutdownTx == null, "Transaction chain %s has been shut down", this);
130
131         final DOMDataReadWriteTransaction delegateTx = delegate.newReadWriteTransaction();
132         final PingPongTransaction newTx = new PingPongTransaction(delegateTx);
133
134         if (!LOCKED_UPDATER.compareAndSet(this, null, newTx)) {
135             delegateTx.cancel();
136             throw new IllegalStateException(String.format("New transaction %s raced with transacion %s", newTx, lockedTx));
137         }
138
139         return newTx;
140     }
141
142     private PingPongTransaction allocateTransaction() {
143         // Step 1: acquire current state
144         final PingPongTransaction oldTx = READY_UPDATER.getAndSet(this, null);
145
146         // Slow path: allocate a delegate transaction
147         if (oldTx == null) {
148             return slowAllocateTransaction();
149         }
150
151         // Fast path: reuse current transaction. We will check
152         //            failures and similar on submit().
153         if (!LOCKED_UPDATER.compareAndSet(this, null, oldTx)) {
154             // Ouch. Delegate chain has not detected a duplicate
155             // transaction allocation. This is the best we can do.
156             oldTx.getTransaction().cancel();
157             throw new IllegalStateException(String.format("Reusable transaction %s raced with transaction %s", oldTx, lockedTx));
158         }
159
160         return oldTx;
161     }
162
163     /*
164      * This forces allocateTransaction() on a slow path, which has to happen after
165      * this method has completed executing. Also inflightTx may be updated outside
166      * the lock, hence we need to re-check.
167      */
168     @GuardedBy("this")
169     private void processIfReady() {
170         if (inflightTx == null) {
171             final PingPongTransaction tx = READY_UPDATER.getAndSet(this, null);
172             if (tx != null) {
173                 processTransaction(tx);
174             }
175         }
176     }
177
178     /**
179      * Process a ready transaction. The caller needs to ensure that
180      * each transaction is seen only once by this method.
181      *
182      * @param tx Transaction which needs processing.
183      */
184     @GuardedBy("this")
185     private void processTransaction(@Nonnull final PingPongTransaction tx) {
186         if (failed) {
187             LOG.debug("Cancelling transaction {}", tx);
188             tx.getTransaction().cancel();
189             return;
190         }
191
192         LOG.debug("Submitting transaction {}", tx);
193         if (!INFLIGHT_UPDATER.compareAndSet(this, null, tx)) {
194             LOG.warn("Submitting transaction {} while {} is still running", tx, inflightTx);
195         }
196
197         Futures.addCallback(tx.getTransaction().submit(), new FutureCallback<Void>() {
198             @Override
199             public void onSuccess(final Void result) {
200                 transactionSuccessful(tx, result);
201             }
202
203             @Override
204             public void onFailure(final Throwable t) {
205                 transactionFailed(tx, t);
206             }
207         });
208     }
209
210     private void processNextTransaction(final PingPongTransaction tx) {
211         final boolean success = INFLIGHT_UPDATER.compareAndSet(this, tx, null);
212         Preconditions.checkState(success, "Completed transaction %s while %s was submitted", tx, inflightTx);
213
214         synchronized (this) {
215             final PingPongTransaction nextTx = READY_UPDATER.getAndSet(this, null);
216             if (nextTx != null) {
217                 processTransaction(nextTx);
218             } else if (shutdownTx != null) {
219                 processTransaction(shutdownTx);
220                 delegate.close();
221                 shutdownTx = null;
222             }
223         }
224     }
225
226     private void transactionSuccessful(final PingPongTransaction tx, final Void result) {
227         LOG.debug("Transaction {} completed successfully", tx);
228
229         tx.onSuccess(result);
230         processNextTransaction(tx);
231     }
232
233     private void transactionFailed(final PingPongTransaction tx, final Throwable t) {
234         LOG.debug("Transaction {} failed", tx, t);
235
236         tx.onFailure(t);
237         processNextTransaction(tx);
238     }
239
240     private void readyTransaction(@Nonnull final PingPongTransaction tx) {
241         // First mark the transaction as not locked.
242         final boolean lockedMatch = LOCKED_UPDATER.compareAndSet(this, tx, null);
243         Preconditions.checkState(lockedMatch, "Attempted to submit transaction %s while we have %s", tx, lockedTx);
244         LOG.debug("Transaction {} unlocked", tx);
245
246         /*
247          * The transaction is ready. It will then be picked up by either next allocation,
248          * or a background transaction completion callback.
249          */
250         final boolean success = READY_UPDATER.compareAndSet(this, null, tx);
251         Preconditions.checkState(success, "Transaction %s collided on ready state", tx, readyTx);
252         LOG.debug("Transaction {} readied", tx);
253
254         /*
255          * We do not see a transaction being in-flight, so we need to take care of dispatching
256          * the transaction to the backend. We are in the ready case, we cannot short-cut
257          * the checking of readyTx, as an in-flight transaction may have completed between us
258          * setting the field above and us checking.
259          */
260         if (inflightTx == null) {
261             synchronized (this) {
262                 processIfReady();
263             }
264         }
265     }
266
267     @Override
268     public synchronized void close() {
269         final PingPongTransaction notLocked = lockedTx;
270         Preconditions.checkState(notLocked == null, "Attempted to close chain with outstanding transaction %s", notLocked);
271
272         // This is not reliable, but if we observe it to be null and the process has already completed,
273         // the backend transaction chain will throw the appropriate error.
274         Preconditions.checkState(shutdownTx == null, "Attempted to close an already-closed chain");
275
276         // Force allocations on slow path, picking up a potentially-outstanding transaction
277         final PingPongTransaction tx = READY_UPDATER.getAndSet(this, null);
278
279         if (tx != null) {
280             // We have one more transaction, which needs to be processed somewhere. If we do not
281             // a transaction in-flight, we need to push it down ourselves.
282             // If there is an in-flight transaction we will schedule this last one into a dedicated
283             // slot. Allocation slow path will check its presence and fail, the in-flight path will
284             // pick it up, submit and immediately close the chain.
285             if (inflightTx == null) {
286                 processTransaction(tx);
287                 delegate.close();
288             } else {
289                 shutdownTx = tx;
290             }
291         } else {
292             // Nothing outstanding, we can safely shutdown
293             delegate.close();
294         }
295     }
296
297     @Override
298     public DOMDataReadOnlyTransaction newReadOnlyTransaction() {
299         final PingPongTransaction tx = allocateTransaction();
300
301         return new DOMDataReadOnlyTransaction() {
302             @Override
303             public CheckedFuture<Optional<NormalizedNode<?, ?>>, ReadFailedException> read(final LogicalDatastoreType store,
304                     final YangInstanceIdentifier path) {
305                 return tx.getTransaction().read(store, path);
306             }
307
308             @Override
309             public CheckedFuture<Boolean, ReadFailedException> exists(final LogicalDatastoreType store,
310                     final YangInstanceIdentifier path) {
311                 return tx.getTransaction().exists(store, path);
312             }
313
314             @Override
315             public Object getIdentifier() {
316                 return tx.getTransaction().getIdentifier();
317             }
318
319             @Override
320             public void close() {
321                 readyTransaction(tx);
322             }
323         };
324     }
325
326     @Override
327     public DOMDataReadWriteTransaction newReadWriteTransaction() {
328         final PingPongTransaction tx = allocateTransaction();
329         final DOMDataReadWriteTransaction ret = new ForwardingDOMDataReadWriteTransaction() {
330             @Override
331             protected DOMDataReadWriteTransaction delegate() {
332                 return tx.getTransaction();
333             }
334
335             @Override
336             public CheckedFuture<Void, TransactionCommitFailedException> submit() {
337                 readyTransaction(tx);
338                 return tx.getSubmitFuture();
339             }
340
341             @Override
342             public ListenableFuture<RpcResult<TransactionStatus>> commit() {
343                 readyTransaction(tx);
344                 return tx.getCommitFuture();
345             }
346
347             @Override
348             public boolean cancel() {
349                 throw new UnsupportedOperationException("Transaction cancellation is not supported");
350             }
351         };
352
353         tx.recordFrontendTransaction(ret);
354         return ret;
355     }
356
357     @Override
358     public DOMDataWriteTransaction newWriteOnlyTransaction() {
359         return newReadWriteTransaction();
360     }
361 }