2 * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
3 * This program and the accompanying materials are made available under the
4 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
5 * and is available at http://www.eclipse.org/legal/epl-v10.html
7 package org.opendaylight.controller.md.sal.dom.broker.impl;
9 import com.google.common.base.Optional;
10 import com.google.common.base.Preconditions;
11 import com.google.common.base.Throwables;
12 import com.google.common.collect.Iterables;
13 import com.google.common.util.concurrent.CheckedFuture;
14 import com.google.common.util.concurrent.Futures;
15 import com.google.common.util.concurrent.ListenableFuture;
16 import com.google.common.util.concurrent.ListeningExecutorService;
17 import java.util.concurrent.Callable;
18 import java.util.concurrent.ExecutionException;
19 import java.util.concurrent.RejectedExecutionException;
20 import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
21 import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException;
22 import org.opendaylight.controller.md.sal.dom.api.DOMDataWriteTransaction;
23 import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
24 import org.opendaylight.yangtools.util.DurationStatsTracker;
25 import org.opendaylight.yangtools.util.concurrent.MappingCheckedFuture;
26 import org.slf4j.Logger;
27 import org.slf4j.LoggerFactory;
31 * Implementation of blocking three phase commit coordinator, which which
32 * supports coordination on multiple {@link DOMStoreThreePhaseCommitCohort}.
34 * This implementation does not support cancelation of commit,
36 * In order to advance to next phase of three phase commit all subtasks of
37 * previous step must be finish.
39 * This executor does not have an upper bound on subtask timeout.
43 public class DOMDataCommitCoordinatorImpl implements DOMDataCommitExecutor {
45 private static final Logger LOG = LoggerFactory.getLogger(DOMDataCommitCoordinatorImpl.class);
46 private final DurationStatsTracker commitStatsTracker = new DurationStatsTracker();
47 private final ListeningExecutorService executor;
51 * Construct DOMDataCommitCoordinator which uses supplied executor to
52 * process commit coordinations.
56 public DOMDataCommitCoordinatorImpl(final ListeningExecutorService executor) {
57 this.executor = Preconditions.checkNotNull(executor, "executor must not be null.");
60 public DurationStatsTracker getCommitStatsTracker() {
61 return commitStatsTracker;
65 public CheckedFuture<Void,TransactionCommitFailedException> submit(final DOMDataWriteTransaction transaction,
66 final Iterable<DOMStoreThreePhaseCommitCohort> cohorts, final Optional<DOMDataCommitErrorListener> listener) {
67 Preconditions.checkArgument(transaction != null, "Transaction must not be null.");
68 Preconditions.checkArgument(cohorts != null, "Cohorts must not be null.");
69 Preconditions.checkArgument(listener != null, "Listener must not be null");
70 LOG.debug("Tx: {} is submitted for execution.", transaction.getIdentifier());
72 ListenableFuture<Void> commitFuture = null;
74 commitFuture = executor.submit(new CommitCoordinationTask(transaction, cohorts,
75 listener, commitStatsTracker));
76 } catch(RejectedExecutionException e) {
77 LOG.error("The commit executor's queue is full - submit task was rejected. \n" +
79 return Futures.immediateFailedCheckedFuture(
80 new TransactionCommitFailedException(
81 "Could not submit the commit task - the commit queue capacity has been exceeded.", e));
84 if (listener.isPresent()) {
85 Futures.addCallback(commitFuture, new DOMDataCommitErrorInvoker(transaction, listener.get()));
88 return MappingCheckedFuture.create(commitFuture,
89 TransactionCommitFailedExceptionMapper.COMMIT_ERROR_MAPPER);
96 * Represents phase of 3PC Commit
100 private static enum CommitPhase {
103 * Commit Coordination Task is submitted for executing
108 * Commit Coordination Task is in can commit phase of 3PC
113 * Commit Coordination Task is in pre-commit phase of 3PC
118 * Commit Coordination Task is in commit phase of 3PC
123 * Commit Coordination Task is in abort phase of 3PC
130 * Implementation of blocking three-phase commit-coordination tasks without
131 * support of cancellation.
133 private static final class CommitCoordinationTask implements Callable<Void> {
134 private static final AtomicReferenceFieldUpdater<CommitCoordinationTask, CommitPhase> PHASE_UPDATER =
135 AtomicReferenceFieldUpdater.newUpdater(CommitCoordinationTask.class, CommitPhase.class, "currentPhase");
136 private final DOMDataWriteTransaction tx;
137 private final Iterable<DOMStoreThreePhaseCommitCohort> cohorts;
138 private final DurationStatsTracker commitStatTracker;
139 private final int cohortSize;
140 private volatile CommitPhase currentPhase = CommitPhase.SUBMITTED;
142 public CommitCoordinationTask(final DOMDataWriteTransaction transaction,
143 final Iterable<DOMStoreThreePhaseCommitCohort> cohorts,
144 final Optional<DOMDataCommitErrorListener> listener,
145 final DurationStatsTracker commitStatTracker) {
146 this.tx = Preconditions.checkNotNull(transaction, "transaction must not be null");
147 this.cohorts = Preconditions.checkNotNull(cohorts, "cohorts must not be null");
148 this.commitStatTracker = commitStatTracker;
149 this.cohortSize = Iterables.size(cohorts);
153 public Void call() throws TransactionCommitFailedException {
154 final long startTime = commitStatTracker != null ? System.nanoTime() : 0;
161 } catch (TransactionCommitFailedException e) {
162 final CommitPhase phase = currentPhase;
163 LOG.warn("Tx: {} Error during phase {}, starting Abort", tx.getIdentifier(), phase, e);
164 abortBlocking(e, phase);
167 if (commitStatTracker != null) {
168 commitStatTracker.addDuration(System.nanoTime() - startTime);
175 * Invokes canCommit on underlying cohorts and blocks till
176 * all results are returned.
178 * Valid state transition is from SUBMITTED to CAN_COMMIT,
179 * if currentPhase is not SUBMITTED throws IllegalStateException.
181 * @throws TransactionCommitFailedException
182 * If one of cohorts failed can Commit
185 private void canCommitBlocking() throws TransactionCommitFailedException {
186 for (ListenableFuture<?> canCommit : canCommitAll()) {
188 final Boolean result = (Boolean)canCommit.get();
189 if (result == null || !result) {
190 throw new TransactionCommitFailedException("Can Commit failed, no detailed cause available.");
192 } catch (InterruptedException | ExecutionException e) {
193 throw TransactionCommitFailedExceptionMapper.CAN_COMMIT_ERROR_MAPPER.apply(e);
200 * Invokes canCommit on underlying cohorts and returns composite future
201 * which will contains {@link Boolean#TRUE} only and only if
202 * all cohorts returned true.
204 * Valid state transition is from SUBMITTED to CAN_COMMIT,
205 * if currentPhase is not SUBMITTED throws IllegalStateException.
207 * @return List of all cohorts futures from can commit phase.
210 private ListenableFuture<?>[] canCommitAll() {
211 changeStateFrom(CommitPhase.SUBMITTED, CommitPhase.CAN_COMMIT);
213 final ListenableFuture<?>[] ops = new ListenableFuture<?>[cohortSize];
215 for (DOMStoreThreePhaseCommitCohort cohort : cohorts) {
216 ops[i++] = cohort.canCommit();
223 * Invokes preCommit on underlying cohorts and blocks till
224 * all results are returned.
226 * Valid state transition is from CAN_COMMIT to PRE_COMMIT, if current
227 * state is not CAN_COMMIT
228 * throws IllegalStateException.
230 * @throws TransactionCommitFailedException
231 * If one of cohorts failed preCommit
234 private void preCommitBlocking() throws TransactionCommitFailedException {
235 final ListenableFuture<?>[] preCommitFutures = preCommitAll();
237 for(ListenableFuture<?> future : preCommitFutures) {
240 } catch (InterruptedException | ExecutionException e) {
241 throw TransactionCommitFailedExceptionMapper.PRE_COMMIT_MAPPER.apply(e);
247 * Invokes preCommit on underlying cohorts and returns future
248 * which will complete once all preCommit on cohorts completed or
252 * Valid state transition is from CAN_COMMIT to PRE_COMMIT, if current
253 * state is not CAN_COMMIT
254 * throws IllegalStateException.
256 * @return List of all cohorts futures from can commit phase.
259 private ListenableFuture<?>[] preCommitAll() {
260 changeStateFrom(CommitPhase.CAN_COMMIT, CommitPhase.PRE_COMMIT);
262 final ListenableFuture<?>[] ops = new ListenableFuture<?>[cohortSize];
264 for (DOMStoreThreePhaseCommitCohort cohort : cohorts) {
265 ops[i++] = cohort.preCommit();
272 * Invokes commit on underlying cohorts and blocks till
273 * all results are returned.
275 * Valid state transition is from PRE_COMMIT to COMMIT, if not throws
276 * IllegalStateException.
278 * @throws TransactionCommitFailedException
279 * If one of cohorts failed preCommit
282 private void commitBlocking() throws TransactionCommitFailedException {
283 final ListenableFuture<?>[] commitFutures = commitAll();
285 for(ListenableFuture<?> future : commitFutures) {
288 } catch (InterruptedException | ExecutionException e) {
289 throw TransactionCommitFailedExceptionMapper.COMMIT_ERROR_MAPPER.apply(e);
295 * Invokes commit on underlying cohorts and returns future which
297 * once all commits on cohorts are completed.
299 * Valid state transition is from PRE_COMMIT to COMMIT, if not throws
300 * IllegalStateException
302 * @return List of all cohorts futures from can commit phase.
305 private ListenableFuture<?>[] commitAll() {
306 changeStateFrom(CommitPhase.PRE_COMMIT, CommitPhase.COMMIT);
308 final ListenableFuture<?>[] ops = new ListenableFuture<?>[cohortSize];
310 for (DOMStoreThreePhaseCommitCohort cohort : cohorts) {
311 ops[i++] = cohort.commit();
317 * Aborts transaction.
319 * Invokes {@link DOMStoreThreePhaseCommitCohort#abort()} on all
321 * for all results. If any of the abort failed throws
322 * IllegalStateException,
323 * which will contains originalCause as suppressed Exception.
325 * If aborts we're successful throws supplied exception
327 * @param originalCause
328 * Exception which should be used to fail transaction for
329 * consumers of transaction
330 * future and listeners of transaction failure.
331 * @param phase phase in which the problem ensued
332 * @throws TransactionCommitFailedException
333 * on invocation of this method.
335 * @throws IllegalStateException
338 private void abortBlocking(final TransactionCommitFailedException originalCause, final CommitPhase phase)
339 throws TransactionCommitFailedException {
340 LOG.warn("Tx: {} Error during phase {}, starting Abort", tx.getIdentifier(), phase, originalCause);
341 Exception cause = originalCause;
343 abortAsyncAll(phase).get();
344 } catch (InterruptedException | ExecutionException e) {
345 LOG.error("Tx: {} Error during Abort.", tx.getIdentifier(), e);
346 cause = new IllegalStateException("Abort failed.", e);
347 cause.addSuppressed(e);
349 Throwables.propagateIfPossible(cause, TransactionCommitFailedException.class);
353 * Invokes abort on underlying cohorts and returns future which
354 * completes once all abort on cohorts are completed.
356 * @param phase phase in which the problem ensued
357 * @return Future which will complete once all cohorts completed
360 private ListenableFuture<Void> abortAsyncAll(final CommitPhase phase) {
361 changeStateFrom(phase, CommitPhase.ABORT);
363 final ListenableFuture<?>[] ops = new ListenableFuture<?>[cohortSize];
365 for (DOMStoreThreePhaseCommitCohort cohort : cohorts) {
366 ops[i++] = cohort.abort();
370 * We are returning all futures as list, not only succeeded ones in
371 * order to fail composite future if any of them failed.
372 * See Futures.allAsList for this description.
374 @SuppressWarnings({ "unchecked", "rawtypes" })
375 ListenableFuture<Void> compositeResult = (ListenableFuture) Futures.allAsList(ops);
376 return compositeResult;
380 * Change phase / state of transaction from expected value to new value
382 * This method checks state and updates state to new state of
383 * of this task if current state equals expected state.
384 * If expected state and current state are different raises
385 * IllegalStateException
386 * which means there is probably bug in implementation of commit
389 * If transition is successful, it logs transition on DEBUG level.
391 * @param currentExpected
392 * Required phase for change of state
394 * New Phase which will be entered by transaction.
395 * @throws IllegalStateException
396 * If currentState of task does not match expected state
398 private void changeStateFrom(final CommitPhase currentExpected, final CommitPhase newState) {
399 final boolean success = PHASE_UPDATER.compareAndSet(this, currentExpected, newState);
400 Preconditions.checkState(success, "Invalid state transition: Tx: %s expected: %s current: %s target: %s",
401 tx.getIdentifier(), currentExpected, currentPhase, newState);
403 LOG.debug("Transaction {}: Phase {} Started", tx.getIdentifier(), newState);