2 * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
3 * This program and the accompanying materials are made available under the
4 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
5 * and is available at http://www.eclipse.org/legal/epl-v10.html
7 package org.opendaylight.controller.md.sal.dom.broker.impl;
9 import com.google.common.base.Preconditions;
10 import com.google.common.base.Throwables;
11 import com.google.common.collect.Iterables;
12 import com.google.common.util.concurrent.CheckedFuture;
13 import com.google.common.util.concurrent.Futures;
14 import com.google.common.util.concurrent.ListenableFuture;
15 import com.google.common.util.concurrent.ListeningExecutorService;
16 import java.util.concurrent.Callable;
17 import java.util.concurrent.ExecutionException;
18 import java.util.concurrent.RejectedExecutionException;
19 import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
20 import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException;
21 import org.opendaylight.controller.md.sal.dom.api.DOMDataWriteTransaction;
22 import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
23 import org.opendaylight.yangtools.util.DurationStatisticsTracker;
24 import org.opendaylight.yangtools.util.concurrent.MappingCheckedFuture;
25 import org.slf4j.Logger;
26 import org.slf4j.LoggerFactory;
30 * Implementation of blocking three phase commit coordinator, which which
31 * supports coordination on multiple {@link DOMStoreThreePhaseCommitCohort}.
33 * This implementation does not support cancelation of commit,
35 * In order to advance to next phase of three phase commit all subtasks of
36 * previous step must be finish.
38 * This executor does not have an upper bound on subtask timeout.
42 public class DOMDataCommitCoordinatorImpl implements DOMDataCommitExecutor {
44 private static final Logger LOG = LoggerFactory.getLogger(DOMDataCommitCoordinatorImpl.class);
45 private final DurationStatisticsTracker commitStatsTracker = DurationStatisticsTracker.createConcurrent();
46 private final ListeningExecutorService executor;
50 * Construct DOMDataCommitCoordinator which uses supplied executor to
51 * process commit coordinations.
55 public DOMDataCommitCoordinatorImpl(final ListeningExecutorService executor) {
56 this.executor = Preconditions.checkNotNull(executor, "executor must not be null.");
59 public DurationStatisticsTracker getCommitStatsTracker() {
60 return commitStatsTracker;
64 public CheckedFuture<Void,TransactionCommitFailedException> submit(final DOMDataWriteTransaction transaction,
65 final Iterable<DOMStoreThreePhaseCommitCohort> cohorts) {
66 Preconditions.checkArgument(transaction != null, "Transaction must not be null.");
67 Preconditions.checkArgument(cohorts != null, "Cohorts must not be null.");
68 LOG.debug("Tx: {} is submitted for execution.", transaction.getIdentifier());
70 ListenableFuture<Void> commitFuture = null;
72 commitFuture = executor.submit(new CommitCoordinationTask(transaction, cohorts,
74 } catch(RejectedExecutionException e) {
75 LOG.error("The commit executor's queue is full - submit task was rejected. \n" +
77 return Futures.immediateFailedCheckedFuture(
78 new TransactionCommitFailedException(
79 "Could not submit the commit task - the commit queue capacity has been exceeded.", e));
82 return MappingCheckedFuture.create(commitFuture,
83 TransactionCommitFailedExceptionMapper.COMMIT_ERROR_MAPPER);
90 * Represents phase of 3PC Commit
94 private static enum CommitPhase {
97 * Commit Coordination Task is submitted for executing
102 * Commit Coordination Task is in can commit phase of 3PC
107 * Commit Coordination Task is in pre-commit phase of 3PC
112 * Commit Coordination Task is in commit phase of 3PC
117 * Commit Coordination Task is in abort phase of 3PC
124 * Implementation of blocking three-phase commit-coordination tasks without
125 * support of cancellation.
127 private static final class CommitCoordinationTask implements Callable<Void> {
128 private static final AtomicReferenceFieldUpdater<CommitCoordinationTask, CommitPhase> PHASE_UPDATER =
129 AtomicReferenceFieldUpdater.newUpdater(CommitCoordinationTask.class, CommitPhase.class, "currentPhase");
130 private final DOMDataWriteTransaction tx;
131 private final Iterable<DOMStoreThreePhaseCommitCohort> cohorts;
132 private final DurationStatisticsTracker commitStatTracker;
133 private final int cohortSize;
134 private volatile CommitPhase currentPhase = CommitPhase.SUBMITTED;
136 public CommitCoordinationTask(final DOMDataWriteTransaction transaction,
137 final Iterable<DOMStoreThreePhaseCommitCohort> cohorts,
138 final DurationStatisticsTracker commitStatsTracker) {
139 this.tx = Preconditions.checkNotNull(transaction, "transaction must not be null");
140 this.cohorts = Preconditions.checkNotNull(cohorts, "cohorts must not be null");
141 this.commitStatTracker = commitStatsTracker;
142 this.cohortSize = Iterables.size(cohorts);
146 public Void call() throws TransactionCommitFailedException {
147 final long startTime = commitStatTracker != null ? System.nanoTime() : 0;
154 } catch (TransactionCommitFailedException e) {
155 final CommitPhase phase = currentPhase;
156 LOG.warn("Tx: {} Error during phase {}, starting Abort", tx.getIdentifier(), phase, e);
157 abortBlocking(e, phase);
160 if (commitStatTracker != null) {
161 commitStatTracker.addDuration(System.nanoTime() - startTime);
168 * Invokes canCommit on underlying cohorts and blocks till
169 * all results are returned.
171 * Valid state transition is from SUBMITTED to CAN_COMMIT,
172 * if currentPhase is not SUBMITTED throws IllegalStateException.
174 * @throws TransactionCommitFailedException
175 * If one of cohorts failed can Commit
178 private void canCommitBlocking() throws TransactionCommitFailedException {
179 for (ListenableFuture<?> canCommit : canCommitAll()) {
181 final Boolean result = (Boolean)canCommit.get();
182 if (result == null || !result) {
183 throw new TransactionCommitFailedException("Can Commit failed, no detailed cause available.");
185 } catch (InterruptedException | ExecutionException e) {
186 throw TransactionCommitFailedExceptionMapper.CAN_COMMIT_ERROR_MAPPER.apply(e);
193 * Invokes canCommit on underlying cohorts and returns composite future
194 * which will contains {@link Boolean#TRUE} only and only if
195 * all cohorts returned true.
197 * Valid state transition is from SUBMITTED to CAN_COMMIT,
198 * if currentPhase is not SUBMITTED throws IllegalStateException.
200 * @return List of all cohorts futures from can commit phase.
203 private ListenableFuture<?>[] canCommitAll() {
204 changeStateFrom(CommitPhase.SUBMITTED, CommitPhase.CAN_COMMIT);
206 final ListenableFuture<?>[] ops = new ListenableFuture<?>[cohortSize];
208 for (DOMStoreThreePhaseCommitCohort cohort : cohorts) {
209 ops[i++] = cohort.canCommit();
216 * Invokes preCommit on underlying cohorts and blocks till
217 * all results are returned.
219 * Valid state transition is from CAN_COMMIT to PRE_COMMIT, if current
220 * state is not CAN_COMMIT
221 * throws IllegalStateException.
223 * @throws TransactionCommitFailedException
224 * If one of cohorts failed preCommit
227 private void preCommitBlocking() throws TransactionCommitFailedException {
228 final ListenableFuture<?>[] preCommitFutures = preCommitAll();
230 for(ListenableFuture<?> future : preCommitFutures) {
233 } catch (InterruptedException | ExecutionException e) {
234 throw TransactionCommitFailedExceptionMapper.PRE_COMMIT_MAPPER.apply(e);
240 * Invokes preCommit on underlying cohorts and returns future
241 * which will complete once all preCommit on cohorts completed or
245 * Valid state transition is from CAN_COMMIT to PRE_COMMIT, if current
246 * state is not CAN_COMMIT
247 * throws IllegalStateException.
249 * @return List of all cohorts futures from can commit phase.
252 private ListenableFuture<?>[] preCommitAll() {
253 changeStateFrom(CommitPhase.CAN_COMMIT, CommitPhase.PRE_COMMIT);
255 final ListenableFuture<?>[] ops = new ListenableFuture<?>[cohortSize];
257 for (DOMStoreThreePhaseCommitCohort cohort : cohorts) {
258 ops[i++] = cohort.preCommit();
265 * Invokes commit on underlying cohorts and blocks till
266 * all results are returned.
268 * Valid state transition is from PRE_COMMIT to COMMIT, if not throws
269 * IllegalStateException.
271 * @throws TransactionCommitFailedException
272 * If one of cohorts failed preCommit
275 private void commitBlocking() throws TransactionCommitFailedException {
276 final ListenableFuture<?>[] commitFutures = commitAll();
278 for(ListenableFuture<?> future : commitFutures) {
281 } catch (InterruptedException | ExecutionException e) {
282 throw TransactionCommitFailedExceptionMapper.COMMIT_ERROR_MAPPER.apply(e);
288 * Invokes commit on underlying cohorts and returns future which
290 * once all commits on cohorts are completed.
292 * Valid state transition is from PRE_COMMIT to COMMIT, if not throws
293 * IllegalStateException
295 * @return List of all cohorts futures from can commit phase.
298 private ListenableFuture<?>[] commitAll() {
299 changeStateFrom(CommitPhase.PRE_COMMIT, CommitPhase.COMMIT);
301 final ListenableFuture<?>[] ops = new ListenableFuture<?>[cohortSize];
303 for (DOMStoreThreePhaseCommitCohort cohort : cohorts) {
304 ops[i++] = cohort.commit();
310 * Aborts transaction.
312 * Invokes {@link DOMStoreThreePhaseCommitCohort#abort()} on all
314 * for all results. If any of the abort failed throws
315 * IllegalStateException,
316 * which will contains originalCause as suppressed Exception.
318 * If aborts we're successful throws supplied exception
320 * @param originalCause
321 * Exception which should be used to fail transaction for
322 * consumers of transaction
323 * future and listeners of transaction failure.
324 * @param phase phase in which the problem ensued
325 * @throws TransactionCommitFailedException
326 * on invocation of this method.
328 * @throws IllegalStateException
331 private void abortBlocking(final TransactionCommitFailedException originalCause, final CommitPhase phase)
332 throws TransactionCommitFailedException {
333 LOG.warn("Tx: {} Error during phase {}, starting Abort", tx.getIdentifier(), phase, originalCause);
334 Exception cause = originalCause;
336 abortAsyncAll(phase).get();
337 } catch (InterruptedException | ExecutionException e) {
338 LOG.error("Tx: {} Error during Abort.", tx.getIdentifier(), e);
339 cause = new IllegalStateException("Abort failed.", e);
340 cause.addSuppressed(e);
342 Throwables.propagateIfPossible(cause, TransactionCommitFailedException.class);
346 * Invokes abort on underlying cohorts and returns future which
347 * completes once all abort on cohorts are completed.
349 * @param phase phase in which the problem ensued
350 * @return Future which will complete once all cohorts completed
353 private ListenableFuture<Void> abortAsyncAll(final CommitPhase phase) {
354 changeStateFrom(phase, CommitPhase.ABORT);
356 final ListenableFuture<?>[] ops = new ListenableFuture<?>[cohortSize];
358 for (DOMStoreThreePhaseCommitCohort cohort : cohorts) {
359 ops[i++] = cohort.abort();
363 * We are returning all futures as list, not only succeeded ones in
364 * order to fail composite future if any of them failed.
365 * See Futures.allAsList for this description.
367 @SuppressWarnings({ "unchecked", "rawtypes" })
368 ListenableFuture<Void> compositeResult = (ListenableFuture) Futures.allAsList(ops);
369 return compositeResult;
373 * Change phase / state of transaction from expected value to new value
375 * This method checks state and updates state to new state of
376 * of this task if current state equals expected state.
377 * If expected state and current state are different raises
378 * IllegalStateException
379 * which means there is probably bug in implementation of commit
382 * If transition is successful, it logs transition on DEBUG level.
384 * @param currentExpected
385 * Required phase for change of state
387 * New Phase which will be entered by transaction.
388 * @throws IllegalStateException
389 * If currentState of task does not match expected state
391 private void changeStateFrom(final CommitPhase currentExpected, final CommitPhase newState) {
392 final boolean success = PHASE_UPDATER.compareAndSet(this, currentExpected, newState);
393 Preconditions.checkState(success, "Invalid state transition: Tx: %s expected: %s current: %s target: %s",
394 tx.getIdentifier(), currentExpected, currentPhase, newState);
396 LOG.debug("Transaction {}: Phase {} Started", tx.getIdentifier(), newState);