2 * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
3 * This program and the accompanying materials are made available under the
4 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
5 * and is available at http://www.eclipse.org/legal/epl-v10.html
7 package org.opendaylight.controller.md.sal.dom.broker.impl;
10 import java.util.concurrent.Callable;
11 import java.util.concurrent.ExecutionException;
13 import javax.annotation.concurrent.GuardedBy;
15 import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException;
16 import org.opendaylight.controller.md.sal.dom.api.DOMDataWriteTransaction;
17 import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
18 import org.slf4j.Logger;
19 import org.slf4j.LoggerFactory;
21 import com.google.common.base.Function;
22 import com.google.common.base.Optional;
23 import com.google.common.base.Preconditions;
24 import com.google.common.base.Throwables;
25 import com.google.common.collect.ImmutableList;
26 import com.google.common.collect.ImmutableList.Builder;
27 import com.google.common.util.concurrent.CheckedFuture;
28 import com.google.common.util.concurrent.Futures;
29 import com.google.common.util.concurrent.ListenableFuture;
30 import com.google.common.util.concurrent.ListeningExecutorService;
34 * Implementation of blocking three phase commit coordinator, which which
35 * supports coordination on multiple {@link DOMStoreThreePhaseCommitCohort}.
37 * This implementation does not support cancelation of commit,
39 * In order to advance to next phase of three phase commit all subtasks of
40 * previous step must be finish.
42 * This executor does not have an upper bound on subtask timeout.
46 public class DOMDataCommitCoordinatorImpl implements DOMDataCommitExecutor {
48 private static final Logger LOG = LoggerFactory.getLogger(DOMDataCommitCoordinatorImpl.class);
51 * Runs AND binary operation between all booleans in supplied iteration of booleans.
53 * This method will stop evaluating iterables if first found is false.
55 private static final Function<Iterable<Boolean>, Boolean> AND_FUNCTION = new Function<Iterable<Boolean>, Boolean>() {
58 public Boolean apply(final Iterable<Boolean> input) {
59 for(boolean value : input) {
68 private final ListeningExecutorService executor;
72 * Construct DOMDataCommitCoordinator which uses supplied executor to
73 * process commit coordinations.
77 public DOMDataCommitCoordinatorImpl(final ListeningExecutorService executor) {
78 this.executor = Preconditions.checkNotNull(executor, "executor must not be null.");
82 public CheckedFuture<Void,TransactionCommitFailedException> submit(final DOMDataWriteTransaction transaction,
83 final Iterable<DOMStoreThreePhaseCommitCohort> cohorts, final Optional<DOMDataCommitErrorListener> listener) {
84 Preconditions.checkArgument(transaction != null, "Transaction must not be null.");
85 Preconditions.checkArgument(cohorts != null, "Cohorts must not be null.");
86 Preconditions.checkArgument(listener != null, "Listener must not be null");
87 LOG.debug("Tx: {} is submitted for execution.", transaction.getIdentifier());
88 ListenableFuture<Void> commitFuture = executor.submit(new CommitCoordinationTask(
89 transaction, cohorts, listener));
90 if (listener.isPresent()) {
91 Futures.addCallback(commitFuture, new DOMDataCommitErrorInvoker(transaction, listener.get()));
94 return Futures.makeChecked(commitFuture, TransactionCommitFailedExceptionMapper.COMMIT_ERROR_MAPPER);
101 * Represents phase of 3PC Commit
105 private static enum CommitPhase {
108 * Commit Coordination Task is submitted for executing
113 * Commit Coordination Task is in can commit phase of 3PC
118 * Commit Coordination Task is in pre-commit phase of 3PC
123 * Commit Coordination Task is in commit phase of 3PC
128 * Commit Coordination Task is in abort phase of 3PC
136 * Implementation of blocking three-phase commit-coordination tasks without
137 * support of cancelation.
140 private static class CommitCoordinationTask implements Callable<Void> {
142 private final DOMDataWriteTransaction tx;
143 private final Iterable<DOMStoreThreePhaseCommitCohort> cohorts;
146 private CommitPhase currentPhase;
148 public CommitCoordinationTask(final DOMDataWriteTransaction transaction,
149 final Iterable<DOMStoreThreePhaseCommitCohort> cohorts,
150 final Optional<DOMDataCommitErrorListener> listener) {
151 this.tx = Preconditions.checkNotNull(transaction, "transaction must not be null");
152 this.cohorts = Preconditions.checkNotNull(cohorts, "cohorts must not be null");
153 this.currentPhase = CommitPhase.SUBMITTED;
157 public Void call() throws TransactionCommitFailedException {
164 } catch (TransactionCommitFailedException e) {
165 LOG.warn("Tx: {} Error during phase {}, starting Abort", tx.getIdentifier(), currentPhase, e);
173 * Invokes canCommit on underlying cohorts and blocks till
174 * all results are returned.
176 * Valid state transition is from SUBMITTED to CAN_COMMIT,
177 * if currentPhase is not SUBMITTED throws IllegalStateException.
179 * @throws TransactionCommitFailedException
180 * If one of cohorts failed can Commit
183 private void canCommitBlocking() throws TransactionCommitFailedException {
184 final Boolean canCommitResult = canCommitAll().checkedGet();
185 if (!canCommitResult) {
186 throw new TransactionCommitFailedException("Can Commit failed, no detailed cause available.");
192 * Invokes preCommit on underlying cohorts and blocks till
193 * all results are returned.
195 * Valid state transition is from CAN_COMMIT to PRE_COMMIT, if current
196 * state is not CAN_COMMIT
197 * throws IllegalStateException.
199 * @throws TransactionCommitFailedException
200 * If one of cohorts failed preCommit
203 private void preCommitBlocking() throws TransactionCommitFailedException {
204 preCommitAll().checkedGet();
209 * Invokes commit on underlying cohorts and blocks till
210 * all results are returned.
212 * Valid state transition is from PRE_COMMIT to COMMIT, if not throws
213 * IllegalStateException.
215 * @throws TransactionCommitFailedException
216 * If one of cohorts failed preCommit
219 private void commitBlocking() throws TransactionCommitFailedException {
220 commitAll().checkedGet();
224 * Aborts transaction.
226 * Invokes {@link DOMStoreThreePhaseCommitCohort#abort()} on all
228 * for all results. If any of the abort failed throws
229 * IllegalStateException,
230 * which will contains originalCause as suppressed Exception.
232 * If aborts we're successful throws supplied exception
234 * @param originalCause
235 * Exception which should be used to fail transaction for
236 * consumers of transaction
237 * future and listeners of transaction failure.
238 * @throws TransactionCommitFailedException
239 * on invocation of this method.
241 * @throws IllegalStateException
244 private void abortBlocking(final TransactionCommitFailedException originalCause)
245 throws TransactionCommitFailedException {
246 LOG.warn("Tx: {} Error during phase {}, starting Abort", tx.getIdentifier(), currentPhase, originalCause);
247 Exception cause = originalCause;
249 abortAsyncAll().get();
250 } catch (InterruptedException | ExecutionException e) {
251 LOG.error("Tx: {} Error during Abort.", tx.getIdentifier(), e);
252 cause = new IllegalStateException("Abort failed.", e);
253 cause.addSuppressed(e);
255 Throwables.propagateIfPossible(cause, TransactionCommitFailedException.class);
260 * Invokes preCommit on underlying cohorts and returns future
261 * which will complete once all preCommit on cohorts completed or
265 * Valid state transition is from CAN_COMMIT to PRE_COMMIT, if current
266 * state is not CAN_COMMIT
267 * throws IllegalStateException.
269 * @return Future which will complete once all cohorts completed
271 * Future throws TransactionCommitFailedException
272 * If any of cohorts failed preCommit
275 private CheckedFuture<Void, TransactionCommitFailedException> preCommitAll() {
276 changeStateFrom(CommitPhase.CAN_COMMIT, CommitPhase.PRE_COMMIT);
277 Builder<ListenableFuture<Void>> ops = ImmutableList.builder();
278 for (DOMStoreThreePhaseCommitCohort cohort : cohorts) {
279 ops.add(cohort.preCommit());
282 * We are returing all futures as list, not only succeeded ones in
283 * order to fail composite future if any of them failed.
284 * See Futures.allAsList for this description.
286 @SuppressWarnings({ "unchecked", "rawtypes" })
287 ListenableFuture<Void> compositeResult = (ListenableFuture) Futures.allAsList(ops.build());
288 return Futures.makeChecked(compositeResult, TransactionCommitFailedExceptionMapper.PRE_COMMIT_MAPPER);
293 * Invokes commit on underlying cohorts and returns future which
295 * once all commits on cohorts are completed.
297 * Valid state transition is from PRE_COMMIT to COMMIT, if not throws
298 * IllegalStateException
300 * @return Future which will complete once all cohorts completed
302 * Future throws TransactionCommitFailedException
303 * If any of cohorts failed preCommit
306 private CheckedFuture<Void, TransactionCommitFailedException> commitAll() {
307 changeStateFrom(CommitPhase.PRE_COMMIT, CommitPhase.COMMIT);
308 Builder<ListenableFuture<Void>> ops = ImmutableList.builder();
309 for (DOMStoreThreePhaseCommitCohort cohort : cohorts) {
310 ops.add(cohort.commit());
313 * We are returing all futures as list, not only succeeded ones in
314 * order to fail composite future if any of them failed.
315 * See Futures.allAsList for this description.
317 @SuppressWarnings({ "unchecked", "rawtypes" })
318 ListenableFuture<Void> compositeResult = (ListenableFuture) Futures.allAsList(ops.build());
319 return Futures.makeChecked(compositeResult, TransactionCommitFailedExceptionMapper.COMMIT_ERROR_MAPPER);
324 * Invokes canCommit on underlying cohorts and returns composite future
325 * which will contains {@link Boolean#TRUE} only and only if
326 * all cohorts returned true.
328 * Valid state transition is from SUBMITTED to CAN_COMMIT,
329 * if currentPhase is not SUBMITTED throws IllegalStateException.
331 * @return Future which will complete once all cohorts completed
333 * Future throws TransactionCommitFailedException
334 * If any of cohorts failed preCommit
337 private CheckedFuture<Boolean, TransactionCommitFailedException> canCommitAll() {
338 changeStateFrom(CommitPhase.SUBMITTED, CommitPhase.CAN_COMMIT);
339 Builder<ListenableFuture<Boolean>> canCommitOperations = ImmutableList.builder();
340 for (DOMStoreThreePhaseCommitCohort cohort : cohorts) {
341 canCommitOperations.add(cohort.canCommit());
343 ListenableFuture<List<Boolean>> allCanCommits = Futures.allAsList(canCommitOperations.build());
344 ListenableFuture<Boolean> allSuccessFuture = Futures.transform(allCanCommits, AND_FUNCTION);
346 .makeChecked(allSuccessFuture, TransactionCommitFailedExceptionMapper.CAN_COMMIT_ERROR_MAPPER);
352 * Invokes abort on underlying cohorts and returns future which
354 * once all abort on cohorts are completed.
356 * @return Future which will complete once all cohorts completed
360 private ListenableFuture<Void> abortAsyncAll() {
361 changeStateFrom(currentPhase, CommitPhase.ABORT);
362 Builder<ListenableFuture<Void>> ops = ImmutableList.builder();
363 for (DOMStoreThreePhaseCommitCohort cohort : cohorts) {
364 ops.add(cohort.abort());
367 * We are returing all futures as list, not only succeeded ones in
368 * order to fail composite future if any of them failed.
369 * See Futures.allAsList for this description.
371 @SuppressWarnings({ "unchecked", "rawtypes" })
372 ListenableFuture<Void> compositeResult = (ListenableFuture) Futures.allAsList(ops.build());
373 return compositeResult;
377 * Change phase / state of transaction from expected value to new value
379 * This method checks state and updates state to new state of
380 * of this task if current state equals expected state.
381 * If expected state and current state are different raises
382 * IllegalStateException
383 * which means there is probably bug in implementation of commit
386 * If transition is successful, it logs transition on DEBUG level.
388 * @param currentExpected
389 * Required phase for change of state
391 * New Phase which will be entered by transaction.
392 * @throws IllegalStateException
393 * If currentState of task does not match expected state
395 private synchronized void changeStateFrom(final CommitPhase currentExpected, final CommitPhase newState) {
396 Preconditions.checkState(currentPhase.equals(currentExpected),
397 "Invalid state transition: Tx: %s current state: %s new state: %s", tx.getIdentifier(),
398 currentPhase, newState);
399 LOG.debug("Transaction {}: Phase {} Started ", tx.getIdentifier(), newState);
400 currentPhase = newState;