d8a7a0084c747e9f744d7b0ee0a52ff7afa69e8b
[controller.git] / opendaylight / md-sal / sal-dom-broker / src / main / java / org / opendaylight / controller / md / sal / dom / broker / impl / DOMConcurrentDataCommitCoordinator.java
1 /*
2  * Copyright (c) 2014 Brocade Communications Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.controller.md.sal.dom.broker.impl;
9
10 import com.google.common.base.Preconditions;
11 import com.google.common.collect.Iterables;
12 import com.google.common.util.concurrent.AbstractFuture;
13 import com.google.common.util.concurrent.AbstractListeningExecutorService;
14 import com.google.common.util.concurrent.CheckedFuture;
15 import com.google.common.util.concurrent.FutureCallback;
16 import com.google.common.util.concurrent.Futures;
17 import com.google.common.util.concurrent.ListenableFuture;
18 import java.util.List;
19 import java.util.concurrent.Executor;
20 import java.util.concurrent.ExecutorService;
21 import java.util.concurrent.TimeUnit;
22 import java.util.concurrent.atomic.AtomicInteger;
23 import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException;
24 import org.opendaylight.controller.md.sal.dom.api.DOMDataWriteTransaction;
25 import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
26 import org.opendaylight.yangtools.util.DurationStatisticsTracker;
27 import org.opendaylight.yangtools.util.concurrent.MappingCheckedFuture;
28 import org.slf4j.Logger;
29 import org.slf4j.LoggerFactory;
30
31 /**
32  * Implementation of DOMDataCommitExecutor that coordinates transaction commits concurrently. The 3
33  * commit phases (canCommit, preCommit, and commit) are performed serially and non-blocking
34  * (ie async) per transaction but multiple transaction commits can run concurrent.
35  *
36  * @author Thomas Pantelis
37  */
38 public class DOMConcurrentDataCommitCoordinator implements DOMDataCommitExecutor {
39
40     private static final String CAN_COMMIT = "CAN_COMMIT";
41     private static final String PRE_COMMIT = "PRE_COMMIT";
42     private static final String COMMIT = "COMMIT";
43
44     private static final Logger LOG = LoggerFactory.getLogger(DOMConcurrentDataCommitCoordinator.class);
45
46     private final DurationStatisticsTracker commitStatsTracker = DurationStatisticsTracker.createConcurrent();
47
48     /**
49      * This executor is used to execute Future listener callback Runnables async.
50      */
51     private final ExecutorService clientFutureCallbackExecutor;
52
53     /**
54      * This executor is re-used internally in calls to Futures#addCallback to avoid the overhead
55      * of Futures#addCallback creating a MoreExecutors#sameThreadExecutor for each call.
56      */
57     private final ExecutorService internalFutureCallbackExecutor = new SimpleSameThreadExecutor();
58
59     public DOMConcurrentDataCommitCoordinator(ExecutorService listenableFutureExecutor) {
60         this.clientFutureCallbackExecutor = Preconditions.checkNotNull(listenableFutureExecutor);
61     }
62
63     public DurationStatisticsTracker getCommitStatsTracker() {
64         return commitStatsTracker;
65     }
66
67     @Override
68     public CheckedFuture<Void, TransactionCommitFailedException> submit(DOMDataWriteTransaction transaction,
69             Iterable<DOMStoreThreePhaseCommitCohort> cohorts) {
70
71         Preconditions.checkArgument(transaction != null, "Transaction must not be null.");
72         Preconditions.checkArgument(cohorts != null, "Cohorts must not be null.");
73         LOG.debug("Tx: {} is submitted for execution.", transaction.getIdentifier());
74
75         final int cohortSize = Iterables.size(cohorts);
76         final AsyncNotifyingSettableFuture clientSubmitFuture =
77                 new AsyncNotifyingSettableFuture(clientFutureCallbackExecutor);
78
79         doCanCommit(clientSubmitFuture, transaction, cohorts, cohortSize);
80
81         return MappingCheckedFuture.create(clientSubmitFuture,
82                 TransactionCommitFailedExceptionMapper.COMMIT_ERROR_MAPPER);
83     }
84
85     private void doCanCommit(final AsyncNotifyingSettableFuture clientSubmitFuture,
86             final DOMDataWriteTransaction transaction,
87             final Iterable<DOMStoreThreePhaseCommitCohort> cohorts, final int cohortSize) {
88
89         final long startTime = System.nanoTime();
90
91         // Not using Futures.allAsList here to avoid its internal overhead.
92         final AtomicInteger remaining = new AtomicInteger(cohortSize);
93         FutureCallback<Boolean> futureCallback = new FutureCallback<Boolean>() {
94             @Override
95             public void onSuccess(Boolean result) {
96                 if (result == null || !result) {
97                     handleException(clientSubmitFuture, transaction, cohorts, cohortSize,
98                             CAN_COMMIT, TransactionCommitFailedExceptionMapper.CAN_COMMIT_ERROR_MAPPER,
99                             new TransactionCommitFailedException(
100                                             "Can Commit failed, no detailed cause available."));
101                 } else {
102                     if(remaining.decrementAndGet() == 0) {
103                         // All cohorts completed successfully - we can move on to the preCommit phase
104                         doPreCommit(startTime, clientSubmitFuture, transaction, cohorts, cohortSize);
105                     }
106                 }
107             }
108
109             @Override
110             public void onFailure(Throwable t) {
111                 handleException(clientSubmitFuture, transaction, cohorts, cohortSize, CAN_COMMIT,
112                         TransactionCommitFailedExceptionMapper.CAN_COMMIT_ERROR_MAPPER, t);
113             }
114         };
115
116         for(DOMStoreThreePhaseCommitCohort cohort: cohorts) {
117             ListenableFuture<Boolean> canCommitFuture = cohort.canCommit();
118             Futures.addCallback(canCommitFuture, futureCallback, internalFutureCallbackExecutor);
119         }
120     }
121
122     private void doPreCommit(final long startTime, final AsyncNotifyingSettableFuture clientSubmitFuture,
123             final DOMDataWriteTransaction transaction,
124             final Iterable<DOMStoreThreePhaseCommitCohort> cohorts, final int cohortSize) {
125
126         // Not using Futures.allAsList here to avoid its internal overhead.
127         final AtomicInteger remaining = new AtomicInteger(cohortSize);
128         FutureCallback<Void> futureCallback = new FutureCallback<Void>() {
129             @Override
130             public void onSuccess(Void notUsed) {
131                 if(remaining.decrementAndGet() == 0) {
132                     // All cohorts completed successfully - we can move on to the commit phase
133                     doCommit(startTime, clientSubmitFuture, transaction, cohorts, cohortSize);
134                 }
135             }
136
137             @Override
138             public void onFailure(Throwable t) {
139                 handleException(clientSubmitFuture, transaction, cohorts, cohortSize, PRE_COMMIT,
140                         TransactionCommitFailedExceptionMapper.PRE_COMMIT_MAPPER, t);
141             }
142         };
143
144         for(DOMStoreThreePhaseCommitCohort cohort: cohorts) {
145             ListenableFuture<Void> preCommitFuture = cohort.preCommit();
146             Futures.addCallback(preCommitFuture, futureCallback, internalFutureCallbackExecutor);
147         }
148     }
149
150     private void doCommit(final long startTime, final AsyncNotifyingSettableFuture clientSubmitFuture,
151             final DOMDataWriteTransaction transaction,
152             final Iterable<DOMStoreThreePhaseCommitCohort> cohorts, final int cohortSize) {
153
154         // Not using Futures.allAsList here to avoid its internal overhead.
155         final AtomicInteger remaining = new AtomicInteger(cohortSize);
156         FutureCallback<Void> futureCallback = new FutureCallback<Void>() {
157             @Override
158             public void onSuccess(Void notUsed) {
159                 if(remaining.decrementAndGet() == 0) {
160                     // All cohorts completed successfully - we're done.
161                     commitStatsTracker.addDuration(System.nanoTime() - startTime);
162
163                     clientSubmitFuture.set();
164                 }
165             }
166
167             @Override
168             public void onFailure(Throwable t) {
169                 handleException(clientSubmitFuture, transaction, cohorts, cohortSize, COMMIT,
170                         TransactionCommitFailedExceptionMapper.COMMIT_ERROR_MAPPER, t);
171             }
172         };
173
174         for(DOMStoreThreePhaseCommitCohort cohort: cohorts) {
175             ListenableFuture<Void> commitFuture = cohort.commit();
176             Futures.addCallback(commitFuture, futureCallback, internalFutureCallbackExecutor);
177         }
178     }
179
180     private void handleException(final AsyncNotifyingSettableFuture clientSubmitFuture,
181             final DOMDataWriteTransaction transaction,
182             final Iterable<DOMStoreThreePhaseCommitCohort> cohorts, int cohortSize,
183             final String phase, final TransactionCommitFailedExceptionMapper exMapper,
184             final Throwable t) {
185
186         if(clientSubmitFuture.isDone()) {
187             // We must have had failures from multiple cohorts.
188             return;
189         }
190
191         LOG.warn("Tx: {} Error during phase {}, starting Abort", transaction.getIdentifier(), phase, t);
192         Exception e;
193         if(t instanceof Exception) {
194             e = (Exception)t;
195         } else {
196             e = new RuntimeException("Unexpected error occurred", t);
197         }
198
199         final TransactionCommitFailedException clientException = exMapper.apply(e);
200
201         // Transaction failed - tell all cohorts to abort.
202
203         @SuppressWarnings("unchecked")
204         ListenableFuture<Void>[] canCommitFutures = new ListenableFuture[cohortSize];
205         int i = 0;
206         for(DOMStoreThreePhaseCommitCohort cohort: cohorts) {
207             canCommitFutures[i++] = cohort.abort();
208         }
209
210         ListenableFuture<List<Void>> combinedFuture = Futures.allAsList(canCommitFutures);
211         Futures.addCallback(combinedFuture, new FutureCallback<List<Void>>() {
212             @Override
213             public void onSuccess(List<Void> notUsed) {
214                 // Propagate the original exception to the client.
215                 clientSubmitFuture.setException(clientException);
216             }
217
218             @Override
219             public void onFailure(Throwable t) {
220                 LOG.error("Tx: {} Error during Abort.", transaction.getIdentifier(), t);
221
222                 // Propagate the original exception as that is what caused the Tx to fail and is
223                 // what's interesting to the client.
224                 clientSubmitFuture.setException(clientException);
225             }
226         }, internalFutureCallbackExecutor);
227     }
228
229     /**
230      * A settable future that uses an {@link Executor} to execute listener callback Runnables,
231      * registered via {@link #addListener}, asynchronously when this future completes. This is
232      * done to guarantee listener executions are off-loaded onto another thread to avoid blocking
233      * the thread that completed this future, as a common use case is to pass an executor that runs
234      * tasks in the same thread as the caller (ie MoreExecutors#sameThreadExecutor)
235      * to {@link #addListener}.
236      *
237      * FIXME: This class should probably be moved to yangtools common utils for re-usability and
238      * unified with AsyncNotifyingListenableFutureTask.
239      */
240     private static class AsyncNotifyingSettableFuture extends AbstractFuture<Void> {
241
242         /**
243          * ThreadLocal used to detect if the task completion thread is running the future listener Runnables.
244          */
245         private static final ThreadLocal<Boolean> ON_TASK_COMPLETION_THREAD_TL = new ThreadLocal<Boolean>();
246
247         private final ExecutorService listenerExecutor;
248
249         AsyncNotifyingSettableFuture(ExecutorService listenerExecutor) {
250             this.listenerExecutor = listenerExecutor;
251         }
252
253         @Override
254         public void addListener(final Runnable listener, final Executor executor) {
255             // Wrap the listener Runnable in a DelegatingRunnable. If the specified executor is one
256             // that runs tasks in the same thread as the caller submitting the task
257             // (e.g. {@link com.google.common.util.concurrent.MoreExecutors#sameThreadExecutor}) and
258             // the listener is executed from the #set methods, then the DelegatingRunnable will detect
259             // this via the ThreadLocal and submit the listener Runnable to the listenerExecutor.
260             //
261             // On the other hand, if this task is already complete, the call to ExecutionList#add in
262             // superclass will execute the listener Runnable immediately and, since the ThreadLocal
263             // won't be set, the DelegatingRunnable will run the listener Runnable inline.
264             super.addListener(new DelegatingRunnable(listener, listenerExecutor), executor);
265         }
266
267         boolean set() {
268             ON_TASK_COMPLETION_THREAD_TL.set(Boolean.TRUE);
269             try {
270                 return super.set(null);
271             } finally {
272                 ON_TASK_COMPLETION_THREAD_TL.set(null);
273             }
274         }
275
276         @Override
277         protected boolean setException(Throwable throwable) {
278             ON_TASK_COMPLETION_THREAD_TL.set(Boolean.TRUE);
279             try {
280                 return super.setException(throwable);
281             } finally {
282                 ON_TASK_COMPLETION_THREAD_TL.set(null);
283             }
284         }
285
286         private static final class DelegatingRunnable implements Runnable {
287             private final Runnable delegate;
288             private final Executor executor;
289
290             DelegatingRunnable(final Runnable delegate, final Executor executor) {
291                 this.delegate = Preconditions.checkNotNull(delegate);
292                 this.executor = Preconditions.checkNotNull(executor);
293             }
294
295             @Override
296             public void run() {
297                 if (ON_TASK_COMPLETION_THREAD_TL.get() != null) {
298                     // We're running on the task completion thread so off-load to the executor.
299                     LOG.trace("Submitting ListenenableFuture Runnable from thread {} to executor {}",
300                             Thread.currentThread().getName(), executor);
301                     executor.execute(delegate);
302                 } else {
303                     // We're not running on the task completion thread so run the delegate inline.
304                     LOG.trace("Executing ListenenableFuture Runnable on this thread: {}",
305                             Thread.currentThread().getName());
306                     delegate.run();
307                 }
308             }
309         }
310     }
311
312     /**
313      * A simple same-thread executor without the internal locking overhead that
314      * MoreExecutors#sameThreadExecutor has. The #execute method is the only one of concern - we
315      * don't shutdown the executor so the other methods irrelevant.
316      */
317     private static class SimpleSameThreadExecutor extends AbstractListeningExecutorService {
318
319         @Override
320         public void execute(Runnable command) {
321             command.run();
322         }
323
324         @Override
325         public boolean awaitTermination(long arg0, TimeUnit arg1) throws InterruptedException {
326             return true;
327         }
328
329         @Override
330         public boolean isShutdown() {
331             return false;
332         }
333
334         @Override
335         public boolean isTerminated() {
336             return false;
337         }
338
339         @Override
340         public void shutdown() {
341         }
342
343         @Override
344         public List<Runnable> shutdownNow() {
345             return null;
346         }
347     }
348 }