CDS: Implement front-end support for local transactions
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / main / java / org / opendaylight / controller / cluster / datastore / ConcurrentDOMDataBroker.java
1 /*
2  * Copyright (c) 2014 Brocade Communications Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.controller.cluster.datastore;
9
10 import com.google.common.base.Preconditions;
11 import com.google.common.util.concurrent.AbstractFuture;
12 import com.google.common.util.concurrent.CheckedFuture;
13 import com.google.common.util.concurrent.FutureCallback;
14 import com.google.common.util.concurrent.Futures;
15 import com.google.common.util.concurrent.ListenableFuture;
16 import com.google.common.util.concurrent.MoreExecutors;
17 import java.util.Collection;
18 import java.util.Iterator;
19 import java.util.List;
20 import java.util.Map;
21 import java.util.concurrent.Executor;
22 import org.opendaylight.controller.cluster.databroker.AbstractDOMBroker;
23 import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
24 import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException;
25 import org.opendaylight.controller.md.sal.dom.api.DOMDataWriteTransaction;
26 import org.opendaylight.controller.md.sal.dom.broker.impl.TransactionCommitFailedExceptionMapper;
27 import org.opendaylight.controller.sal.core.spi.data.DOMStore;
28 import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
29 import org.opendaylight.yangtools.util.DurationStatisticsTracker;
30 import org.opendaylight.yangtools.util.concurrent.MappingCheckedFuture;
31 import org.slf4j.Logger;
32 import org.slf4j.LoggerFactory;
33
34 /**
35  * ConcurrentDOMDataBroker commits transactions concurrently. The 3
36  * commit phases (canCommit, preCommit, and commit) are performed serially and non-blocking
37  * (ie async) per transaction but multiple transaction commits can run concurrent.
38  *
39  * @author Thomas Pantelis
40  */
41 public class ConcurrentDOMDataBroker extends AbstractDOMBroker {
42     private static final Logger LOG = LoggerFactory.getLogger(ConcurrentDOMDataBroker.class);
43     private static final String CAN_COMMIT = "CAN_COMMIT";
44     private static final String PRE_COMMIT = "PRE_COMMIT";
45     private static final String COMMIT = "COMMIT";
46
47     private final DurationStatisticsTracker commitStatsTracker = DurationStatisticsTracker.createConcurrent();
48
49     /**
50      * This executor is used to execute Future listener callback Runnables async.
51      */
52     private final Executor clientFutureCallbackExecutor;
53
54     public ConcurrentDOMDataBroker(final Map<LogicalDatastoreType, DOMStore> datastores, Executor listenableFutureExecutor) {
55         super(datastores);
56         this.clientFutureCallbackExecutor = Preconditions.checkNotNull(listenableFutureExecutor);
57     }
58
59     public DurationStatisticsTracker getCommitStatsTracker() {
60         return commitStatsTracker;
61     }
62
63     @Override
64     protected CheckedFuture<Void, TransactionCommitFailedException> submit(DOMDataWriteTransaction transaction,
65             Collection<DOMStoreThreePhaseCommitCohort> cohorts) {
66
67         Preconditions.checkArgument(transaction != null, "Transaction must not be null.");
68         Preconditions.checkArgument(cohorts != null, "Cohorts must not be null.");
69         LOG.debug("Tx: {} is submitted for execution.", transaction.getIdentifier());
70
71         if(cohorts.isEmpty()){
72             return Futures.immediateCheckedFuture(null);
73         }
74
75         final AsyncNotifyingSettableFuture clientSubmitFuture =
76                 new AsyncNotifyingSettableFuture(clientFutureCallbackExecutor);
77
78         doCanCommit(clientSubmitFuture, transaction, cohorts);
79
80         return MappingCheckedFuture.create(clientSubmitFuture,
81                 TransactionCommitFailedExceptionMapper.COMMIT_ERROR_MAPPER);
82     }
83
84     private void doCanCommit(final AsyncNotifyingSettableFuture clientSubmitFuture,
85             final DOMDataWriteTransaction transaction,
86             final Collection<DOMStoreThreePhaseCommitCohort> cohorts) {
87
88         final long startTime = System.nanoTime();
89
90         final Iterator<DOMStoreThreePhaseCommitCohort> cohortIterator = cohorts.iterator();
91
92         // Not using Futures.allAsList here to avoid its internal overhead.
93         FutureCallback<Boolean> futureCallback = new FutureCallback<Boolean>() {
94             @Override
95             public void onSuccess(Boolean result) {
96                 if (result == null || !result) {
97                     handleException(clientSubmitFuture, transaction, cohorts,
98                             CAN_COMMIT, TransactionCommitFailedExceptionMapper.CAN_COMMIT_ERROR_MAPPER,
99                             new TransactionCommitFailedException(
100                                             "Can Commit failed, no detailed cause available."));
101                 } else {
102                     if(!cohortIterator.hasNext()) {
103                         // All cohorts completed successfully - we can move on to the preCommit phase
104                         doPreCommit(startTime, clientSubmitFuture, transaction, cohorts);
105                     } else {
106                         ListenableFuture<Boolean> canCommitFuture = cohortIterator.next().canCommit();
107                         Futures.addCallback(canCommitFuture, this, MoreExecutors.directExecutor());
108                     }
109                 }
110             }
111
112             @Override
113             public void onFailure(Throwable t) {
114                 handleException(clientSubmitFuture, transaction, cohorts, CAN_COMMIT,
115                         TransactionCommitFailedExceptionMapper.CAN_COMMIT_ERROR_MAPPER, t);
116             }
117         };
118
119         ListenableFuture<Boolean> canCommitFuture = cohortIterator.next().canCommit();
120         Futures.addCallback(canCommitFuture, futureCallback, MoreExecutors.directExecutor());
121     }
122
123     private void doPreCommit(final long startTime, final AsyncNotifyingSettableFuture clientSubmitFuture,
124             final DOMDataWriteTransaction transaction,
125             final Collection<DOMStoreThreePhaseCommitCohort> cohorts) {
126
127         final Iterator<DOMStoreThreePhaseCommitCohort> cohortIterator = cohorts.iterator();
128
129         // Not using Futures.allAsList here to avoid its internal overhead.
130         FutureCallback<Void> futureCallback = new FutureCallback<Void>() {
131             @Override
132             public void onSuccess(Void notUsed) {
133                 if(!cohortIterator.hasNext()) {
134                     // All cohorts completed successfully - we can move on to the commit phase
135                     doCommit(startTime, clientSubmitFuture, transaction, cohorts);
136                 } else {
137                     ListenableFuture<Void> preCommitFuture = cohortIterator.next().preCommit();
138                     Futures.addCallback(preCommitFuture, this, MoreExecutors.directExecutor());
139                 }
140             }
141
142             @Override
143             public void onFailure(Throwable t) {
144                 handleException(clientSubmitFuture, transaction, cohorts, PRE_COMMIT,
145                         TransactionCommitFailedExceptionMapper.PRE_COMMIT_MAPPER, t);
146             }
147         };
148
149         ListenableFuture<Void> preCommitFuture = cohortIterator.next().preCommit();
150         Futures.addCallback(preCommitFuture, futureCallback, MoreExecutors.directExecutor());
151     }
152
153     private void doCommit(final long startTime, final AsyncNotifyingSettableFuture clientSubmitFuture,
154             final DOMDataWriteTransaction transaction,
155             final Collection<DOMStoreThreePhaseCommitCohort> cohorts) {
156
157         final Iterator<DOMStoreThreePhaseCommitCohort> cohortIterator = cohorts.iterator();
158
159         // Not using Futures.allAsList here to avoid its internal overhead.
160         FutureCallback<Void> futureCallback = new FutureCallback<Void>() {
161             @Override
162             public void onSuccess(Void notUsed) {
163                 if(!cohortIterator.hasNext()) {
164                     // All cohorts completed successfully - we're done.
165                     commitStatsTracker.addDuration(System.nanoTime() - startTime);
166
167                     clientSubmitFuture.set();
168                 } else {
169                     ListenableFuture<Void> commitFuture = cohortIterator.next().commit();
170                     Futures.addCallback(commitFuture, this, MoreExecutors.directExecutor());
171                 }
172             }
173
174             @Override
175             public void onFailure(Throwable t) {
176                 handleException(clientSubmitFuture, transaction, cohorts, COMMIT,
177                         TransactionCommitFailedExceptionMapper.COMMIT_ERROR_MAPPER, t);
178             }
179         };
180
181         ListenableFuture<Void> commitFuture = cohortIterator.next().commit();
182         Futures.addCallback(commitFuture, futureCallback, MoreExecutors.directExecutor());
183     }
184
185     private static void handleException(final AsyncNotifyingSettableFuture clientSubmitFuture,
186             final DOMDataWriteTransaction transaction,
187             final Collection<DOMStoreThreePhaseCommitCohort> cohorts,
188             final String phase, final TransactionCommitFailedExceptionMapper exMapper,
189             final Throwable t) {
190
191         if (clientSubmitFuture.isDone()) {
192             // We must have had failures from multiple cohorts.
193             return;
194         }
195
196         LOG.warn("Tx: {} Error during phase {}, starting Abort", transaction.getIdentifier(), phase, t);
197         final Exception e;
198         if (t instanceof Exception) {
199             e = (Exception)t;
200         } else {
201             e = new RuntimeException("Unexpected error occurred", t);
202         }
203
204         final TransactionCommitFailedException clientException = exMapper.apply(e);
205
206         // Transaction failed - tell all cohorts to abort.
207
208         @SuppressWarnings("unchecked")
209         ListenableFuture<Void>[] canCommitFutures = new ListenableFuture[cohorts.size()];
210         int i = 0;
211         for (DOMStoreThreePhaseCommitCohort cohort : cohorts) {
212             canCommitFutures[i++] = cohort.abort();
213         }
214
215         ListenableFuture<List<Void>> combinedFuture = Futures.allAsList(canCommitFutures);
216         Futures.addCallback(combinedFuture, new FutureCallback<List<Void>>() {
217             @Override
218             public void onSuccess(List<Void> notUsed) {
219                 // Propagate the original exception to the client.
220                 clientSubmitFuture.setException(clientException);
221             }
222
223             @Override
224             public void onFailure(Throwable t) {
225                 LOG.error("Tx: {} Error during Abort.", transaction.getIdentifier(), t);
226
227                 // Propagate the original exception as that is what caused the Tx to fail and is
228                 // what's interesting to the client.
229                 clientSubmitFuture.setException(clientException);
230             }
231         }, MoreExecutors.directExecutor());
232     }
233
234     /**
235      * A settable future that uses an {@link Executor} to execute listener callback Runnables,
236      * registered via {@link #addListener}, asynchronously when this future completes. This is
237      * done to guarantee listener executions are off-loaded onto another thread to avoid blocking
238      * the thread that completed this future, as a common use case is to pass an executor that runs
239      * tasks in the same thread as the caller (ie MoreExecutors#sameThreadExecutor)
240      * to {@link #addListener}.
241      *
242      * FIXME: This class should probably be moved to yangtools common utils for re-usability and
243      * unified with AsyncNotifyingListenableFutureTask.
244      */
245     private static class AsyncNotifyingSettableFuture extends AbstractFuture<Void> {
246
247         /**
248          * ThreadLocal used to detect if the task completion thread is running the future listener Runnables.
249          */
250         private static final ThreadLocal<Boolean> ON_TASK_COMPLETION_THREAD_TL = new ThreadLocal<Boolean>();
251
252         private final Executor listenerExecutor;
253
254         AsyncNotifyingSettableFuture(Executor listenerExecutor) {
255             this.listenerExecutor = Preconditions.checkNotNull(listenerExecutor);
256         }
257
258         @Override
259         public void addListener(final Runnable listener, final Executor executor) {
260             // Wrap the listener Runnable in a DelegatingRunnable. If the specified executor is one
261             // that runs tasks in the same thread as the caller submitting the task
262             // (e.g. {@link com.google.common.util.concurrent.MoreExecutors#sameThreadExecutor}) and
263             // the listener is executed from the #set methods, then the DelegatingRunnable will detect
264             // this via the ThreadLocal and submit the listener Runnable to the listenerExecutor.
265             //
266             // On the other hand, if this task is already complete, the call to ExecutionList#add in
267             // superclass will execute the listener Runnable immediately and, since the ThreadLocal
268             // won't be set, the DelegatingRunnable will run the listener Runnable inline.
269             super.addListener(new DelegatingRunnable(listener, listenerExecutor), executor);
270         }
271
272         boolean set() {
273             ON_TASK_COMPLETION_THREAD_TL.set(Boolean.TRUE);
274             try {
275                 return super.set(null);
276             } finally {
277                 ON_TASK_COMPLETION_THREAD_TL.set(null);
278             }
279         }
280
281         @Override
282         protected boolean setException(Throwable throwable) {
283             ON_TASK_COMPLETION_THREAD_TL.set(Boolean.TRUE);
284             try {
285                 return super.setException(throwable);
286             } finally {
287                 ON_TASK_COMPLETION_THREAD_TL.set(null);
288             }
289         }
290
291         private static final class DelegatingRunnable implements Runnable {
292             private final Runnable delegate;
293             private final Executor executor;
294
295             DelegatingRunnable(final Runnable delegate, final Executor executor) {
296                 this.delegate = Preconditions.checkNotNull(delegate);
297                 this.executor = Preconditions.checkNotNull(executor);
298             }
299
300             @Override
301             public void run() {
302                 if (ON_TASK_COMPLETION_THREAD_TL.get() != null) {
303                     // We're running on the task completion thread so off-load to the executor.
304                     LOG.trace("Submitting ListenenableFuture Runnable from thread {} to executor {}",
305                             Thread.currentThread().getName(), executor);
306                     executor.execute(delegate);
307                 } else {
308                     // We're not running on the task completion thread so run the delegate inline.
309                     LOG.trace("Executing ListenenableFuture Runnable on this thread: {}",
310                             Thread.currentThread().getName());
311                     delegate.run();
312                 }
313             }
314         }
315     }
316 }

©2013 OpenDaylight, A Linux Foundation Collaborative Project. All Rights Reserved.
OpenDaylight is a registered trademark of The OpenDaylight Project, Inc.
Linux Foundation and OpenDaylight are registered trademarks of the Linux Foundation.
Linux is a registered trademark of Linus Torvalds.