2 * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
3 * This program and the accompanying materials are made available under the
4 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
5 * and is available at http://www.eclipse.org/legal/epl-v10.html
7 package org.opendaylight.controller.md.sal.dom.broker.impl;
10 import java.util.concurrent.Callable;
11 import java.util.concurrent.ExecutionException;
12 import java.util.concurrent.RejectedExecutionException;
14 import javax.annotation.concurrent.GuardedBy;
16 import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException;
17 import org.opendaylight.controller.md.sal.dom.api.DOMDataWriteTransaction;
18 import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
19 import org.opendaylight.yangtools.util.concurrent.MappingCheckedFuture;
20 import org.slf4j.Logger;
21 import org.slf4j.LoggerFactory;
23 import com.google.common.base.Function;
24 import com.google.common.base.Optional;
25 import com.google.common.base.Preconditions;
26 import com.google.common.base.Throwables;
27 import com.google.common.collect.ImmutableList;
28 import com.google.common.collect.ImmutableList.Builder;
29 import com.google.common.util.concurrent.CheckedFuture;
30 import com.google.common.util.concurrent.Futures;
31 import com.google.common.util.concurrent.ListenableFuture;
32 import com.google.common.util.concurrent.ListeningExecutorService;
36 * Implementation of blocking three phase commit coordinator, which which
37 * supports coordination on multiple {@link DOMStoreThreePhaseCommitCohort}.
39 * This implementation does not support cancelation of commit,
41 * In order to advance to next phase of three phase commit all subtasks of
42 * previous step must be finish.
44 * This executor does not have an upper bound on subtask timeout.
48 public class DOMDataCommitCoordinatorImpl implements DOMDataCommitExecutor {
50 private static final Logger LOG = LoggerFactory.getLogger(DOMDataCommitCoordinatorImpl.class);
53 * Runs AND binary operation between all booleans in supplied iteration of booleans.
55 * This method will stop evaluating iterables if first found is false.
57 private static final Function<Iterable<Boolean>, Boolean> AND_FUNCTION = new Function<Iterable<Boolean>, Boolean>() {
60 public Boolean apply(final Iterable<Boolean> input) {
61 for(boolean value : input) {
70 private final ListeningExecutorService executor;
74 * Construct DOMDataCommitCoordinator which uses supplied executor to
75 * process commit coordinations.
79 public DOMDataCommitCoordinatorImpl(final ListeningExecutorService executor) {
80 this.executor = Preconditions.checkNotNull(executor, "executor must not be null.");
84 public CheckedFuture<Void,TransactionCommitFailedException> submit(final DOMDataWriteTransaction transaction,
85 final Iterable<DOMStoreThreePhaseCommitCohort> cohorts, final Optional<DOMDataCommitErrorListener> listener) {
86 Preconditions.checkArgument(transaction != null, "Transaction must not be null.");
87 Preconditions.checkArgument(cohorts != null, "Cohorts must not be null.");
88 Preconditions.checkArgument(listener != null, "Listener must not be null");
89 LOG.debug("Tx: {} is submitted for execution.", transaction.getIdentifier());
91 ListenableFuture<Void> commitFuture = null;
93 commitFuture = executor.submit(new CommitCoordinationTask(transaction, cohorts, listener));
94 } catch(RejectedExecutionException e) {
95 LOG.error("The commit executor's queue is full - submit task was rejected. \n" +
97 return Futures.immediateFailedCheckedFuture(
98 new TransactionCommitFailedException(
99 "Could not submit the commit task - the commit queue capacity has been exceeded.", e));
102 if (listener.isPresent()) {
103 Futures.addCallback(commitFuture, new DOMDataCommitErrorInvoker(transaction, listener.get()));
106 return MappingCheckedFuture.create(commitFuture,
107 TransactionCommitFailedExceptionMapper.COMMIT_ERROR_MAPPER);
112 * Phase of 3PC commit
114 * Represents phase of 3PC Commit
118 private static enum CommitPhase {
121 * Commit Coordination Task is submitted for executing
126 * Commit Coordination Task is in can commit phase of 3PC
131 * Commit Coordination Task is in pre-commit phase of 3PC
136 * Commit Coordination Task is in commit phase of 3PC
141 * Commit Coordination Task is in abort phase of 3PC
149 * Implementation of blocking three-phase commit-coordination tasks without
150 * support of cancelation.
153 private static class CommitCoordinationTask implements Callable<Void> {
155 private final DOMDataWriteTransaction tx;
156 private final Iterable<DOMStoreThreePhaseCommitCohort> cohorts;
159 private CommitPhase currentPhase;
161 public CommitCoordinationTask(final DOMDataWriteTransaction transaction,
162 final Iterable<DOMStoreThreePhaseCommitCohort> cohorts,
163 final Optional<DOMDataCommitErrorListener> listener) {
164 this.tx = Preconditions.checkNotNull(transaction, "transaction must not be null");
165 this.cohorts = Preconditions.checkNotNull(cohorts, "cohorts must not be null");
166 this.currentPhase = CommitPhase.SUBMITTED;
170 public Void call() throws TransactionCommitFailedException {
177 } catch (TransactionCommitFailedException e) {
178 LOG.warn("Tx: {} Error during phase {}, starting Abort", tx.getIdentifier(), currentPhase, e);
186 * Invokes canCommit on underlying cohorts and blocks till
187 * all results are returned.
189 * Valid state transition is from SUBMITTED to CAN_COMMIT,
190 * if currentPhase is not SUBMITTED throws IllegalStateException.
192 * @throws TransactionCommitFailedException
193 * If one of cohorts failed can Commit
196 private void canCommitBlocking() throws TransactionCommitFailedException {
197 final Boolean canCommitResult = canCommitAll().checkedGet();
198 if (!canCommitResult) {
199 throw new TransactionCommitFailedException("Can Commit failed, no detailed cause available.");
205 * Invokes preCommit on underlying cohorts and blocks till
206 * all results are returned.
208 * Valid state transition is from CAN_COMMIT to PRE_COMMIT, if current
209 * state is not CAN_COMMIT
210 * throws IllegalStateException.
212 * @throws TransactionCommitFailedException
213 * If one of cohorts failed preCommit
216 private void preCommitBlocking() throws TransactionCommitFailedException {
217 preCommitAll().checkedGet();
222 * Invokes commit on underlying cohorts and blocks till
223 * all results are returned.
225 * Valid state transition is from PRE_COMMIT to COMMIT, if not throws
226 * IllegalStateException.
228 * @throws TransactionCommitFailedException
229 * If one of cohorts failed preCommit
232 private void commitBlocking() throws TransactionCommitFailedException {
233 commitAll().checkedGet();
237 * Aborts transaction.
239 * Invokes {@link DOMStoreThreePhaseCommitCohort#abort()} on all
241 * for all results. If any of the abort failed throws
242 * IllegalStateException,
243 * which will contains originalCause as suppressed Exception.
245 * If aborts we're successful throws supplied exception
247 * @param originalCause
248 * Exception which should be used to fail transaction for
249 * consumers of transaction
250 * future and listeners of transaction failure.
251 * @throws TransactionCommitFailedException
252 * on invocation of this method.
254 * @throws IllegalStateException
257 private void abortBlocking(final TransactionCommitFailedException originalCause)
258 throws TransactionCommitFailedException {
259 LOG.warn("Tx: {} Error during phase {}, starting Abort", tx.getIdentifier(), currentPhase, originalCause);
260 Exception cause = originalCause;
262 abortAsyncAll().get();
263 } catch (InterruptedException | ExecutionException e) {
264 LOG.error("Tx: {} Error during Abort.", tx.getIdentifier(), e);
265 cause = new IllegalStateException("Abort failed.", e);
266 cause.addSuppressed(e);
268 Throwables.propagateIfPossible(cause, TransactionCommitFailedException.class);
273 * Invokes preCommit on underlying cohorts and returns future
274 * which will complete once all preCommit on cohorts completed or
278 * Valid state transition is from CAN_COMMIT to PRE_COMMIT, if current
279 * state is not CAN_COMMIT
280 * throws IllegalStateException.
282 * @return Future which will complete once all cohorts completed
284 * Future throws TransactionCommitFailedException
285 * If any of cohorts failed preCommit
288 private CheckedFuture<Void, TransactionCommitFailedException> preCommitAll() {
289 changeStateFrom(CommitPhase.CAN_COMMIT, CommitPhase.PRE_COMMIT);
290 Builder<ListenableFuture<Void>> ops = ImmutableList.builder();
291 for (DOMStoreThreePhaseCommitCohort cohort : cohorts) {
292 ops.add(cohort.preCommit());
295 * We are returing all futures as list, not only succeeded ones in
296 * order to fail composite future if any of them failed.
297 * See Futures.allAsList for this description.
299 @SuppressWarnings({ "unchecked", "rawtypes" })
300 ListenableFuture<Void> compositeResult = (ListenableFuture) Futures.allAsList(ops.build());
301 return MappingCheckedFuture.create(compositeResult,
302 TransactionCommitFailedExceptionMapper.PRE_COMMIT_MAPPER);
307 * Invokes commit on underlying cohorts and returns future which
309 * once all commits on cohorts are completed.
311 * Valid state transition is from PRE_COMMIT to COMMIT, if not throws
312 * IllegalStateException
314 * @return Future which will complete once all cohorts completed
316 * Future throws TransactionCommitFailedException
317 * If any of cohorts failed preCommit
320 private CheckedFuture<Void, TransactionCommitFailedException> commitAll() {
321 changeStateFrom(CommitPhase.PRE_COMMIT, CommitPhase.COMMIT);
322 Builder<ListenableFuture<Void>> ops = ImmutableList.builder();
323 for (DOMStoreThreePhaseCommitCohort cohort : cohorts) {
324 ops.add(cohort.commit());
327 * We are returing all futures as list, not only succeeded ones in
328 * order to fail composite future if any of them failed.
329 * See Futures.allAsList for this description.
331 @SuppressWarnings({ "unchecked", "rawtypes" })
332 ListenableFuture<Void> compositeResult = (ListenableFuture) Futures.allAsList(ops.build());
333 return MappingCheckedFuture.create(compositeResult,
334 TransactionCommitFailedExceptionMapper.COMMIT_ERROR_MAPPER);
339 * Invokes canCommit on underlying cohorts and returns composite future
340 * which will contains {@link Boolean#TRUE} only and only if
341 * all cohorts returned true.
343 * Valid state transition is from SUBMITTED to CAN_COMMIT,
344 * if currentPhase is not SUBMITTED throws IllegalStateException.
346 * @return Future which will complete once all cohorts completed
348 * Future throws TransactionCommitFailedException
349 * If any of cohorts failed preCommit
352 private CheckedFuture<Boolean, TransactionCommitFailedException> canCommitAll() {
353 changeStateFrom(CommitPhase.SUBMITTED, CommitPhase.CAN_COMMIT);
354 Builder<ListenableFuture<Boolean>> canCommitOperations = ImmutableList.builder();
355 for (DOMStoreThreePhaseCommitCohort cohort : cohorts) {
356 canCommitOperations.add(cohort.canCommit());
358 ListenableFuture<List<Boolean>> allCanCommits = Futures.allAsList(canCommitOperations.build());
359 ListenableFuture<Boolean> allSuccessFuture = Futures.transform(allCanCommits, AND_FUNCTION);
360 return MappingCheckedFuture.create(allSuccessFuture,
361 TransactionCommitFailedExceptionMapper.CAN_COMMIT_ERROR_MAPPER);
367 * Invokes abort on underlying cohorts and returns future which
369 * once all abort on cohorts are completed.
371 * @return Future which will complete once all cohorts completed
375 private ListenableFuture<Void> abortAsyncAll() {
376 changeStateFrom(currentPhase, CommitPhase.ABORT);
377 Builder<ListenableFuture<Void>> ops = ImmutableList.builder();
378 for (DOMStoreThreePhaseCommitCohort cohort : cohorts) {
379 ops.add(cohort.abort());
382 * We are returing all futures as list, not only succeeded ones in
383 * order to fail composite future if any of them failed.
384 * See Futures.allAsList for this description.
386 @SuppressWarnings({ "unchecked", "rawtypes" })
387 ListenableFuture<Void> compositeResult = (ListenableFuture) Futures.allAsList(ops.build());
388 return compositeResult;
392 * Change phase / state of transaction from expected value to new value
394 * This method checks state and updates state to new state of
395 * of this task if current state equals expected state.
396 * If expected state and current state are different raises
397 * IllegalStateException
398 * which means there is probably bug in implementation of commit
401 * If transition is successful, it logs transition on DEBUG level.
403 * @param currentExpected
404 * Required phase for change of state
406 * New Phase which will be entered by transaction.
407 * @throws IllegalStateException
408 * If currentState of task does not match expected state
410 private synchronized void changeStateFrom(final CommitPhase currentExpected, final CommitPhase newState) {
411 Preconditions.checkState(currentPhase.equals(currentExpected),
412 "Invalid state transition: Tx: %s current state: %s new state: %s", tx.getIdentifier(),
413 currentPhase, newState);
414 LOG.debug("Transaction {}: Phase {} Started ", tx.getIdentifier(), newState);
415 currentPhase = newState;