2 * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
3 * This program and the accompanying materials are made available under the
4 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
5 * and is available at http://www.eclipse.org/legal/epl-v10.html
7 package org.opendaylight.controller.md.sal.dom.broker.impl;
9 import java.util.Collections;
10 import java.util.List;
11 import java.util.concurrent.Callable;
12 import java.util.concurrent.ExecutionException;
14 import javax.annotation.concurrent.GuardedBy;
16 import org.opendaylight.controller.md.sal.common.api.TransactionStatus;
17 import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException;
18 import org.opendaylight.controller.md.sal.dom.api.DOMDataWriteTransaction;
19 import org.opendaylight.controller.sal.common.util.Rpcs;
20 import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
21 import org.opendaylight.yangtools.yang.common.RpcError;
22 import org.opendaylight.yangtools.yang.common.RpcResult;
23 import org.slf4j.Logger;
24 import org.slf4j.LoggerFactory;
26 import com.google.common.base.Function;
27 import com.google.common.base.Optional;
28 import com.google.common.base.Preconditions;
29 import com.google.common.base.Throwables;
30 import com.google.common.collect.ImmutableList;
31 import com.google.common.collect.ImmutableList.Builder;
32 import com.google.common.util.concurrent.CheckedFuture;
33 import com.google.common.util.concurrent.Futures;
34 import com.google.common.util.concurrent.ListenableFuture;
35 import com.google.common.util.concurrent.ListeningExecutorService;
39 * Implementation of blocking three phase commit coordinator, which which
40 * supports coordination on multiple {@link DOMStoreThreePhaseCommitCohort}.
42 * This implementation does not support cancelation of commit,
44 * In order to advance to next phase of three phase commit all subtasks of
45 * previous step must be finish.
47 * This executor does not have an upper bound on subtask timeout.
51 public class DOMDataCommitCoordinatorImpl implements DOMDataCommitExecutor {
53 private static final Logger LOG = LoggerFactory.getLogger(DOMDataCommitCoordinatorImpl.class);
56 * Runs AND binary operation between all booleans in supplied iteration of booleans.
58 * This method will stop evaluating iterables if first found is false.
60 private static final Function<Iterable<Boolean>, Boolean> AND_FUNCTION = new Function<Iterable<Boolean>, Boolean>() {
63 public Boolean apply(final Iterable<Boolean> input) {
64 for(boolean value : input) {
73 private final ListeningExecutorService executor;
77 * Construct DOMDataCommitCoordinator which uses supplied executor to
78 * process commit coordinations.
82 public DOMDataCommitCoordinatorImpl(final ListeningExecutorService executor) {
83 this.executor = Preconditions.checkNotNull(executor, "executor must not be null.");
87 public ListenableFuture<RpcResult<TransactionStatus>> submit(final DOMDataWriteTransaction transaction,
88 final Iterable<DOMStoreThreePhaseCommitCohort> cohorts, final Optional<DOMDataCommitErrorListener> listener) {
89 Preconditions.checkArgument(transaction != null, "Transaction must not be null.");
90 Preconditions.checkArgument(cohorts != null, "Cohorts must not be null.");
91 Preconditions.checkArgument(listener != null, "Listener must not be null");
92 LOG.debug("Tx: {} is submitted for execution.", transaction.getIdentifier());
93 ListenableFuture<RpcResult<TransactionStatus>> commitFuture = executor.submit(new CommitCoordinationTask(
94 transaction, cohorts, listener));
95 if (listener.isPresent()) {
96 Futures.addCallback(commitFuture, new DOMDataCommitErrorInvoker(transaction, listener.get()));
103 * Phase of 3PC commit
105 * Represents phase of 3PC Commit
109 private static enum CommitPhase {
112 * Commit Coordination Task is submitted for executing
117 * Commit Coordination Task is in can commit phase of 3PC
122 * Commit Coordination Task is in pre-commit phase of 3PC
127 * Commit Coordination Task is in commit phase of 3PC
132 * Commit Coordination Task is in abort phase of 3PC
140 * Implementation of blocking three-phase commit-coordination tasks without
141 * support of cancelation.
144 private static class CommitCoordinationTask implements Callable<RpcResult<TransactionStatus>> {
146 private final DOMDataWriteTransaction tx;
147 private final Iterable<DOMStoreThreePhaseCommitCohort> cohorts;
150 private CommitPhase currentPhase;
152 public CommitCoordinationTask(final DOMDataWriteTransaction transaction,
153 final Iterable<DOMStoreThreePhaseCommitCohort> cohorts,
154 final Optional<DOMDataCommitErrorListener> listener) {
155 this.tx = Preconditions.checkNotNull(transaction, "transaction must not be null");
156 this.cohorts = Preconditions.checkNotNull(cohorts, "cohorts must not be null");
157 this.currentPhase = CommitPhase.SUBMITTED;
161 public RpcResult<TransactionStatus> call() throws TransactionCommitFailedException {
166 return commitBlocking();
167 } catch (TransactionCommitFailedException e) {
168 LOG.warn("Tx: {} Error during phase {}, starting Abort", tx.getIdentifier(), currentPhase, e);
176 * Invokes canCommit on underlying cohorts and blocks till
177 * all results are returned.
179 * Valid state transition is from SUBMITTED to CAN_COMMIT,
180 * if currentPhase is not SUBMITTED throws IllegalStateException.
182 * @throws TransactionCommitFailedException
183 * If one of cohorts failed can Commit
186 private void canCommitBlocking() throws TransactionCommitFailedException {
187 final Boolean canCommitResult = canCommitAll().checkedGet();
188 if (!canCommitResult) {
189 throw new TransactionCommitFailedException("Can Commit failed, no detailed cause available.");
195 * Invokes preCommit on underlying cohorts and blocks till
196 * all results are returned.
198 * Valid state transition is from CAN_COMMIT to PRE_COMMIT, if current
199 * state is not CAN_COMMIT
200 * throws IllegalStateException.
202 * @throws TransactionCommitFailedException
203 * If one of cohorts failed preCommit
206 private void preCommitBlocking() throws TransactionCommitFailedException {
207 preCommitAll().checkedGet();
212 * Invokes commit on underlying cohorts and blocks till
213 * all results are returned.
215 * Valid state transition is from PRE_COMMIT to COMMIT, if not throws
216 * IllegalStateException.
218 * @throws TransactionCommitFailedException
219 * If one of cohorts failed preCommit
222 private RpcResult<TransactionStatus> commitBlocking() throws TransactionCommitFailedException {
223 commitAll().checkedGet();
224 return Rpcs.getRpcResult(true, TransactionStatus.COMMITED, Collections.<RpcError> emptySet());
228 * Aborts transaction.
230 * Invokes {@link DOMStoreThreePhaseCommitCohort#abort()} on all
232 * for all results. If any of the abort failed throws
233 * IllegalStateException,
234 * which will contains originalCause as suppressed Exception.
236 * If aborts we're successful throws supplied exception
238 * @param originalCause
239 * Exception which should be used to fail transaction for
240 * consumers of transaction
241 * future and listeners of transaction failure.
242 * @throws TransactionCommitFailedException
243 * on invocation of this method.
245 * @throws IllegalStateException
248 private void abortBlocking(final TransactionCommitFailedException originalCause)
249 throws TransactionCommitFailedException {
250 LOG.warn("Tx: {} Error during phase {}, starting Abort", tx.getIdentifier(), currentPhase, originalCause);
251 Exception cause = originalCause;
253 abortAsyncAll().get();
254 } catch (InterruptedException | ExecutionException e) {
255 LOG.error("Tx: {} Error during Abort.", tx.getIdentifier(), e);
256 cause = new IllegalStateException("Abort failed.", e);
257 cause.addSuppressed(e);
259 Throwables.propagateIfPossible(cause, TransactionCommitFailedException.class);
264 * Invokes preCommit on underlying cohorts and returns future
265 * which will complete once all preCommit on cohorts completed or
269 * Valid state transition is from CAN_COMMIT to PRE_COMMIT, if current
270 * state is not CAN_COMMIT
271 * throws IllegalStateException.
273 * @return Future which will complete once all cohorts completed
275 * Future throws TransactionCommitFailedException
276 * If any of cohorts failed preCommit
279 private CheckedFuture<Void, TransactionCommitFailedException> preCommitAll() {
280 changeStateFrom(CommitPhase.CAN_COMMIT, CommitPhase.PRE_COMMIT);
281 Builder<ListenableFuture<Void>> ops = ImmutableList.builder();
282 for (DOMStoreThreePhaseCommitCohort cohort : cohorts) {
283 ops.add(cohort.preCommit());
286 * We are returing all futures as list, not only succeeded ones in
287 * order to fail composite future if any of them failed.
288 * See Futures.allAsList for this description.
290 @SuppressWarnings({ "unchecked", "rawtypes" })
291 ListenableFuture<Void> compositeResult = (ListenableFuture) Futures.allAsList(ops.build());
292 return Futures.makeChecked(compositeResult, TransactionCommitFailedExceptionMapper.PRE_COMMIT_MAPPER);
297 * Invokes commit on underlying cohorts and returns future which
299 * once all commits on cohorts are completed.
301 * Valid state transition is from PRE_COMMIT to COMMIT, if not throws
302 * IllegalStateException
304 * @return Future which will complete once all cohorts completed
306 * Future throws TransactionCommitFailedException
307 * If any of cohorts failed preCommit
310 private CheckedFuture<Void, TransactionCommitFailedException> commitAll() {
311 changeStateFrom(CommitPhase.PRE_COMMIT, CommitPhase.COMMIT);
312 Builder<ListenableFuture<Void>> ops = ImmutableList.builder();
313 for (DOMStoreThreePhaseCommitCohort cohort : cohorts) {
314 ops.add(cohort.commit());
317 * We are returing all futures as list, not only succeeded ones in
318 * order to fail composite future if any of them failed.
319 * See Futures.allAsList for this description.
321 @SuppressWarnings({ "unchecked", "rawtypes" })
322 ListenableFuture<Void> compositeResult = (ListenableFuture) Futures.allAsList(ops.build());
323 return Futures.makeChecked(compositeResult, TransactionCommitFailedExceptionMapper.COMMIT_ERROR_MAPPER);
328 * Invokes canCommit on underlying cohorts and returns composite future
329 * which will contains {@link Boolean#TRUE} only and only if
330 * all cohorts returned true.
332 * Valid state transition is from SUBMITTED to CAN_COMMIT,
333 * if currentPhase is not SUBMITTED throws IllegalStateException.
335 * @return Future which will complete once all cohorts completed
337 * Future throws TransactionCommitFailedException
338 * If any of cohorts failed preCommit
341 private CheckedFuture<Boolean, TransactionCommitFailedException> canCommitAll() {
342 changeStateFrom(CommitPhase.SUBMITTED, CommitPhase.CAN_COMMIT);
343 Builder<ListenableFuture<Boolean>> canCommitOperations = ImmutableList.builder();
344 for (DOMStoreThreePhaseCommitCohort cohort : cohorts) {
345 canCommitOperations.add(cohort.canCommit());
347 ListenableFuture<List<Boolean>> allCanCommits = Futures.allAsList(canCommitOperations.build());
348 ListenableFuture<Boolean> allSuccessFuture = Futures.transform(allCanCommits, AND_FUNCTION);
350 .makeChecked(allSuccessFuture, TransactionCommitFailedExceptionMapper.CAN_COMMIT_ERROR_MAPPER);
356 * Invokes abort on underlying cohorts and returns future which
358 * once all abort on cohorts are completed.
360 * @return Future which will complete once all cohorts completed
364 private ListenableFuture<Void> abortAsyncAll() {
365 changeStateFrom(currentPhase, CommitPhase.ABORT);
366 Builder<ListenableFuture<Void>> ops = ImmutableList.builder();
367 for (DOMStoreThreePhaseCommitCohort cohort : cohorts) {
368 ops.add(cohort.abort());
371 * We are returing all futures as list, not only succeeded ones in
372 * order to fail composite future if any of them failed.
373 * See Futures.allAsList for this description.
375 @SuppressWarnings({ "unchecked", "rawtypes" })
376 ListenableFuture<Void> compositeResult = (ListenableFuture) Futures.allAsList(ops.build());
377 return compositeResult;
381 * Change phase / state of transaction from expected value to new value
383 * This method checks state and updates state to new state of
384 * of this task if current state equals expected state.
385 * If expected state and current state are different raises
386 * IllegalStateException
387 * which means there is probably bug in implementation of commit
390 * If transition is successful, it logs transition on DEBUG level.
392 * @param currentExpected
393 * Required phase for change of state
395 * New Phase which will be entered by transaction.
396 * @throws IllegalStateException
397 * If currentState of task does not match expected state
399 private synchronized void changeStateFrom(final CommitPhase currentExpected, final CommitPhase newState) {
400 Preconditions.checkState(currentPhase.equals(currentExpected),
401 "Invalid state transition: Tx: %s current state: %s new state: %s", tx.getIdentifier(),
402 currentPhase, newState);
403 LOG.debug("Transaction {}: Phase {} Started ", tx.getIdentifier(), newState);
404 currentPhase = newState;