2 * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
3 * This program and the accompanying materials are made available under the
4 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
5 * and is available at http://www.eclipse.org/legal/epl-v10.html
7 package org.opendaylight.controller.md.sal.dom.broker.impl;
10 import java.util.concurrent.Callable;
11 import java.util.concurrent.ExecutionException;
12 import java.util.concurrent.RejectedExecutionException;
14 import javax.annotation.concurrent.GuardedBy;
16 import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException;
17 import org.opendaylight.controller.md.sal.dom.api.DOMDataWriteTransaction;
18 import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
19 import org.opendaylight.yangtools.util.DurationStatsTracker;
20 import org.opendaylight.yangtools.util.concurrent.MappingCheckedFuture;
21 import org.slf4j.Logger;
22 import org.slf4j.LoggerFactory;
24 import com.google.common.base.Function;
25 import com.google.common.base.Optional;
26 import com.google.common.base.Preconditions;
27 import com.google.common.base.Throwables;
28 import com.google.common.collect.ImmutableList;
29 import com.google.common.collect.ImmutableList.Builder;
30 import com.google.common.util.concurrent.CheckedFuture;
31 import com.google.common.util.concurrent.Futures;
32 import com.google.common.util.concurrent.ListenableFuture;
33 import com.google.common.util.concurrent.ListeningExecutorService;
37 * Implementation of blocking three phase commit coordinator, which which
38 * supports coordination on multiple {@link DOMStoreThreePhaseCommitCohort}.
40 * This implementation does not support cancelation of commit,
42 * In order to advance to next phase of three phase commit all subtasks of
43 * previous step must be finish.
45 * This executor does not have an upper bound on subtask timeout.
49 public class DOMDataCommitCoordinatorImpl implements DOMDataCommitExecutor {
51 private static final Logger LOG = LoggerFactory.getLogger(DOMDataCommitCoordinatorImpl.class);
54 * Runs AND binary operation between all booleans in supplied iteration of booleans.
56 * This method will stop evaluating iterables if first found is false.
58 private static final Function<Iterable<Boolean>, Boolean> AND_FUNCTION = new Function<Iterable<Boolean>, Boolean>() {
61 public Boolean apply(final Iterable<Boolean> input) {
62 for(boolean value : input) {
71 private final ListeningExecutorService executor;
73 private final DurationStatsTracker commitStatsTracker = new DurationStatsTracker();
77 * Construct DOMDataCommitCoordinator which uses supplied executor to
78 * process commit coordinations.
82 public DOMDataCommitCoordinatorImpl(final ListeningExecutorService executor) {
83 this.executor = Preconditions.checkNotNull(executor, "executor must not be null.");
86 public DurationStatsTracker getCommitStatsTracker() {
87 return commitStatsTracker;
91 public CheckedFuture<Void,TransactionCommitFailedException> submit(final DOMDataWriteTransaction transaction,
92 final Iterable<DOMStoreThreePhaseCommitCohort> cohorts, final Optional<DOMDataCommitErrorListener> listener) {
93 Preconditions.checkArgument(transaction != null, "Transaction must not be null.");
94 Preconditions.checkArgument(cohorts != null, "Cohorts must not be null.");
95 Preconditions.checkArgument(listener != null, "Listener must not be null");
96 LOG.debug("Tx: {} is submitted for execution.", transaction.getIdentifier());
98 ListenableFuture<Void> commitFuture = null;
100 commitFuture = executor.submit(new CommitCoordinationTask(transaction, cohorts,
101 listener, commitStatsTracker));
102 } catch(RejectedExecutionException e) {
103 LOG.error("The commit executor's queue is full - submit task was rejected. \n" +
105 return Futures.immediateFailedCheckedFuture(
106 new TransactionCommitFailedException(
107 "Could not submit the commit task - the commit queue capacity has been exceeded.", e));
110 if (listener.isPresent()) {
111 Futures.addCallback(commitFuture, new DOMDataCommitErrorInvoker(transaction, listener.get()));
114 return MappingCheckedFuture.create(commitFuture,
115 TransactionCommitFailedExceptionMapper.COMMIT_ERROR_MAPPER);
120 * Phase of 3PC commit
122 * Represents phase of 3PC Commit
126 private static enum CommitPhase {
129 * Commit Coordination Task is submitted for executing
134 * Commit Coordination Task is in can commit phase of 3PC
139 * Commit Coordination Task is in pre-commit phase of 3PC
144 * Commit Coordination Task is in commit phase of 3PC
149 * Commit Coordination Task is in abort phase of 3PC
157 * Implementation of blocking three-phase commit-coordination tasks without
158 * support of cancelation.
161 private static class CommitCoordinationTask implements Callable<Void> {
163 private final DOMDataWriteTransaction tx;
164 private final Iterable<DOMStoreThreePhaseCommitCohort> cohorts;
165 private final DurationStatsTracker commitStatTracker;
168 private CommitPhase currentPhase;
170 public CommitCoordinationTask(final DOMDataWriteTransaction transaction,
171 final Iterable<DOMStoreThreePhaseCommitCohort> cohorts,
172 final Optional<DOMDataCommitErrorListener> listener,
173 final DurationStatsTracker commitStatTracker) {
174 this.tx = Preconditions.checkNotNull(transaction, "transaction must not be null");
175 this.cohorts = Preconditions.checkNotNull(cohorts, "cohorts must not be null");
176 this.currentPhase = CommitPhase.SUBMITTED;
177 this.commitStatTracker = commitStatTracker;
181 public Void call() throws TransactionCommitFailedException {
183 long startTime = System.nanoTime();
189 } catch (TransactionCommitFailedException e) {
190 LOG.warn("Tx: {} Error during phase {}, starting Abort", tx.getIdentifier(), currentPhase, e);
194 if(commitStatTracker != null) {
195 commitStatTracker.addDuration(System.nanoTime() - startTime);
202 * Invokes canCommit on underlying cohorts and blocks till
203 * all results are returned.
205 * Valid state transition is from SUBMITTED to CAN_COMMIT,
206 * if currentPhase is not SUBMITTED throws IllegalStateException.
208 * @throws TransactionCommitFailedException
209 * If one of cohorts failed can Commit
212 private void canCommitBlocking() throws TransactionCommitFailedException {
213 final Boolean canCommitResult = canCommitAll().checkedGet();
214 if (!canCommitResult) {
215 throw new TransactionCommitFailedException("Can Commit failed, no detailed cause available.");
221 * Invokes preCommit on underlying cohorts and blocks till
222 * all results are returned.
224 * Valid state transition is from CAN_COMMIT to PRE_COMMIT, if current
225 * state is not CAN_COMMIT
226 * throws IllegalStateException.
228 * @throws TransactionCommitFailedException
229 * If one of cohorts failed preCommit
232 private void preCommitBlocking() throws TransactionCommitFailedException {
233 preCommitAll().checkedGet();
238 * Invokes commit on underlying cohorts and blocks till
239 * all results are returned.
241 * Valid state transition is from PRE_COMMIT to COMMIT, if not throws
242 * IllegalStateException.
244 * @throws TransactionCommitFailedException
245 * If one of cohorts failed preCommit
248 private void commitBlocking() throws TransactionCommitFailedException {
249 commitAll().checkedGet();
253 * Aborts transaction.
255 * Invokes {@link DOMStoreThreePhaseCommitCohort#abort()} on all
257 * for all results. If any of the abort failed throws
258 * IllegalStateException,
259 * which will contains originalCause as suppressed Exception.
261 * If aborts we're successful throws supplied exception
263 * @param originalCause
264 * Exception which should be used to fail transaction for
265 * consumers of transaction
266 * future and listeners of transaction failure.
267 * @throws TransactionCommitFailedException
268 * on invocation of this method.
270 * @throws IllegalStateException
273 private void abortBlocking(final TransactionCommitFailedException originalCause)
274 throws TransactionCommitFailedException {
275 LOG.warn("Tx: {} Error during phase {}, starting Abort", tx.getIdentifier(), currentPhase, originalCause);
276 Exception cause = originalCause;
278 abortAsyncAll().get();
279 } catch (InterruptedException | ExecutionException e) {
280 LOG.error("Tx: {} Error during Abort.", tx.getIdentifier(), e);
281 cause = new IllegalStateException("Abort failed.", e);
282 cause.addSuppressed(e);
284 Throwables.propagateIfPossible(cause, TransactionCommitFailedException.class);
289 * Invokes preCommit on underlying cohorts and returns future
290 * which will complete once all preCommit on cohorts completed or
294 * Valid state transition is from CAN_COMMIT to PRE_COMMIT, if current
295 * state is not CAN_COMMIT
296 * throws IllegalStateException.
298 * @return Future which will complete once all cohorts completed
300 * Future throws TransactionCommitFailedException
301 * If any of cohorts failed preCommit
304 private CheckedFuture<Void, TransactionCommitFailedException> preCommitAll() {
305 changeStateFrom(CommitPhase.CAN_COMMIT, CommitPhase.PRE_COMMIT);
306 Builder<ListenableFuture<Void>> ops = ImmutableList.builder();
307 for (DOMStoreThreePhaseCommitCohort cohort : cohorts) {
308 ops.add(cohort.preCommit());
311 * We are returing all futures as list, not only succeeded ones in
312 * order to fail composite future if any of them failed.
313 * See Futures.allAsList for this description.
315 @SuppressWarnings({ "unchecked", "rawtypes" })
316 ListenableFuture<Void> compositeResult = (ListenableFuture) Futures.allAsList(ops.build());
317 return MappingCheckedFuture.create(compositeResult,
318 TransactionCommitFailedExceptionMapper.PRE_COMMIT_MAPPER);
323 * Invokes commit on underlying cohorts and returns future which
325 * once all commits on cohorts are completed.
327 * Valid state transition is from PRE_COMMIT to COMMIT, if not throws
328 * IllegalStateException
330 * @return Future which will complete once all cohorts completed
332 * Future throws TransactionCommitFailedException
333 * If any of cohorts failed preCommit
336 private CheckedFuture<Void, TransactionCommitFailedException> commitAll() {
337 changeStateFrom(CommitPhase.PRE_COMMIT, CommitPhase.COMMIT);
338 Builder<ListenableFuture<Void>> ops = ImmutableList.builder();
339 for (DOMStoreThreePhaseCommitCohort cohort : cohorts) {
340 ops.add(cohort.commit());
343 * We are returing all futures as list, not only succeeded ones in
344 * order to fail composite future if any of them failed.
345 * See Futures.allAsList for this description.
347 @SuppressWarnings({ "unchecked", "rawtypes" })
348 ListenableFuture<Void> compositeResult = (ListenableFuture) Futures.allAsList(ops.build());
349 return MappingCheckedFuture.create(compositeResult,
350 TransactionCommitFailedExceptionMapper.COMMIT_ERROR_MAPPER);
355 * Invokes canCommit on underlying cohorts and returns composite future
356 * which will contains {@link Boolean#TRUE} only and only if
357 * all cohorts returned true.
359 * Valid state transition is from SUBMITTED to CAN_COMMIT,
360 * if currentPhase is not SUBMITTED throws IllegalStateException.
362 * @return Future which will complete once all cohorts completed
364 * Future throws TransactionCommitFailedException
365 * If any of cohorts failed preCommit
368 private CheckedFuture<Boolean, TransactionCommitFailedException> canCommitAll() {
369 changeStateFrom(CommitPhase.SUBMITTED, CommitPhase.CAN_COMMIT);
370 Builder<ListenableFuture<Boolean>> canCommitOperations = ImmutableList.builder();
371 for (DOMStoreThreePhaseCommitCohort cohort : cohorts) {
372 canCommitOperations.add(cohort.canCommit());
374 ListenableFuture<List<Boolean>> allCanCommits = Futures.allAsList(canCommitOperations.build());
375 ListenableFuture<Boolean> allSuccessFuture = Futures.transform(allCanCommits, AND_FUNCTION);
376 return MappingCheckedFuture.create(allSuccessFuture,
377 TransactionCommitFailedExceptionMapper.CAN_COMMIT_ERROR_MAPPER);
383 * Invokes abort on underlying cohorts and returns future which
385 * once all abort on cohorts are completed.
387 * @return Future which will complete once all cohorts completed
391 private ListenableFuture<Void> abortAsyncAll() {
392 changeStateFrom(currentPhase, CommitPhase.ABORT);
393 Builder<ListenableFuture<Void>> ops = ImmutableList.builder();
394 for (DOMStoreThreePhaseCommitCohort cohort : cohorts) {
395 ops.add(cohort.abort());
398 * We are returing all futures as list, not only succeeded ones in
399 * order to fail composite future if any of them failed.
400 * See Futures.allAsList for this description.
402 @SuppressWarnings({ "unchecked", "rawtypes" })
403 ListenableFuture<Void> compositeResult = (ListenableFuture) Futures.allAsList(ops.build());
404 return compositeResult;
408 * Change phase / state of transaction from expected value to new value
410 * This method checks state and updates state to new state of
411 * of this task if current state equals expected state.
412 * If expected state and current state are different raises
413 * IllegalStateException
414 * which means there is probably bug in implementation of commit
417 * If transition is successful, it logs transition on DEBUG level.
419 * @param currentExpected
420 * Required phase for change of state
422 * New Phase which will be entered by transaction.
423 * @throws IllegalStateException
424 * If currentState of task does not match expected state
426 private synchronized void changeStateFrom(final CommitPhase currentExpected, final CommitPhase newState) {
427 Preconditions.checkState(currentPhase.equals(currentExpected),
428 "Invalid state transition: Tx: %s current state: %s new state: %s", tx.getIdentifier(),
429 currentPhase, newState);
430 LOG.debug("Transaction {}: Phase {} Started ", tx.getIdentifier(), newState);
431 currentPhase = newState;