2 * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
3 * This program and the accompanying materials are made available under the
4 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
5 * and is available at http://www.eclipse.org/legal/epl-v10.html
7 package org.opendaylight.controller.md.sal.dom.broker.impl;
10 import java.util.concurrent.Callable;
11 import java.util.concurrent.ExecutionException;
13 import javax.annotation.concurrent.GuardedBy;
15 import org.opendaylight.controller.md.sal.common.api.TransactionStatus;
16 import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException;
17 import org.opendaylight.controller.md.sal.dom.api.DOMDataWriteTransaction;
18 import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
19 import org.opendaylight.yangtools.yang.common.RpcResult;
20 import org.opendaylight.yangtools.yang.common.RpcResultBuilder;
21 import org.slf4j.Logger;
22 import org.slf4j.LoggerFactory;
24 import com.google.common.base.Function;
25 import com.google.common.base.Optional;
26 import com.google.common.base.Preconditions;
27 import com.google.common.base.Throwables;
28 import com.google.common.collect.ImmutableList;
29 import com.google.common.collect.ImmutableList.Builder;
30 import com.google.common.util.concurrent.CheckedFuture;
31 import com.google.common.util.concurrent.Futures;
32 import com.google.common.util.concurrent.ListenableFuture;
33 import com.google.common.util.concurrent.ListeningExecutorService;
37 * Implementation of blocking three phase commit coordinator, which which
38 * supports coordination on multiple {@link DOMStoreThreePhaseCommitCohort}.
40 * This implementation does not support cancelation of commit,
42 * In order to advance to next phase of three phase commit all subtasks of
43 * previous step must be finish.
45 * This executor does not have an upper bound on subtask timeout.
49 public class DOMDataCommitCoordinatorImpl implements DOMDataCommitExecutor {
51 private static final Logger LOG = LoggerFactory.getLogger(DOMDataCommitCoordinatorImpl.class);
54 * Runs AND binary operation between all booleans in supplied iteration of booleans.
56 * This method will stop evaluating iterables if first found is false.
58 private static final Function<Iterable<Boolean>, Boolean> AND_FUNCTION = new Function<Iterable<Boolean>, Boolean>() {
61 public Boolean apply(final Iterable<Boolean> input) {
62 for(boolean value : input) {
71 private final ListeningExecutorService executor;
75 * Construct DOMDataCommitCoordinator which uses supplied executor to
76 * process commit coordinations.
80 public DOMDataCommitCoordinatorImpl(final ListeningExecutorService executor) {
81 this.executor = Preconditions.checkNotNull(executor, "executor must not be null.");
85 public ListenableFuture<RpcResult<TransactionStatus>> submit(final DOMDataWriteTransaction transaction,
86 final Iterable<DOMStoreThreePhaseCommitCohort> cohorts, final Optional<DOMDataCommitErrorListener> listener) {
87 Preconditions.checkArgument(transaction != null, "Transaction must not be null.");
88 Preconditions.checkArgument(cohorts != null, "Cohorts must not be null.");
89 Preconditions.checkArgument(listener != null, "Listener must not be null");
90 LOG.debug("Tx: {} is submitted for execution.", transaction.getIdentifier());
91 ListenableFuture<RpcResult<TransactionStatus>> commitFuture = executor.submit(new CommitCoordinationTask(
92 transaction, cohorts, listener));
93 if (listener.isPresent()) {
94 Futures.addCallback(commitFuture, new DOMDataCommitErrorInvoker(transaction, listener.get()));
101 * Phase of 3PC commit
103 * Represents phase of 3PC Commit
107 private static enum CommitPhase {
110 * Commit Coordination Task is submitted for executing
115 * Commit Coordination Task is in can commit phase of 3PC
120 * Commit Coordination Task is in pre-commit phase of 3PC
125 * Commit Coordination Task is in commit phase of 3PC
130 * Commit Coordination Task is in abort phase of 3PC
138 * Implementation of blocking three-phase commit-coordination tasks without
139 * support of cancelation.
142 private static class CommitCoordinationTask implements Callable<RpcResult<TransactionStatus>> {
144 private final DOMDataWriteTransaction tx;
145 private final Iterable<DOMStoreThreePhaseCommitCohort> cohorts;
148 private CommitPhase currentPhase;
150 public CommitCoordinationTask(final DOMDataWriteTransaction transaction,
151 final Iterable<DOMStoreThreePhaseCommitCohort> cohorts,
152 final Optional<DOMDataCommitErrorListener> listener) {
153 this.tx = Preconditions.checkNotNull(transaction, "transaction must not be null");
154 this.cohorts = Preconditions.checkNotNull(cohorts, "cohorts must not be null");
155 this.currentPhase = CommitPhase.SUBMITTED;
159 public RpcResult<TransactionStatus> call() throws TransactionCommitFailedException {
164 return commitBlocking();
165 } catch (TransactionCommitFailedException e) {
166 LOG.warn("Tx: {} Error during phase {}, starting Abort", tx.getIdentifier(), currentPhase, e);
174 * Invokes canCommit on underlying cohorts and blocks till
175 * all results are returned.
177 * Valid state transition is from SUBMITTED to CAN_COMMIT,
178 * if currentPhase is not SUBMITTED throws IllegalStateException.
180 * @throws TransactionCommitFailedException
181 * If one of cohorts failed can Commit
184 private void canCommitBlocking() throws TransactionCommitFailedException {
185 final Boolean canCommitResult = canCommitAll().checkedGet();
186 if (!canCommitResult) {
187 throw new TransactionCommitFailedException("Can Commit failed, no detailed cause available.");
193 * Invokes preCommit on underlying cohorts and blocks till
194 * all results are returned.
196 * Valid state transition is from CAN_COMMIT to PRE_COMMIT, if current
197 * state is not CAN_COMMIT
198 * throws IllegalStateException.
200 * @throws TransactionCommitFailedException
201 * If one of cohorts failed preCommit
204 private void preCommitBlocking() throws TransactionCommitFailedException {
205 preCommitAll().checkedGet();
210 * Invokes commit on underlying cohorts and blocks till
211 * all results are returned.
213 * Valid state transition is from PRE_COMMIT to COMMIT, if not throws
214 * IllegalStateException.
216 * @throws TransactionCommitFailedException
217 * If one of cohorts failed preCommit
220 private RpcResult<TransactionStatus> commitBlocking() throws TransactionCommitFailedException {
221 commitAll().checkedGet();
222 return RpcResultBuilder.<TransactionStatus>success(TransactionStatus.COMMITED).build();
226 * Aborts transaction.
228 * Invokes {@link DOMStoreThreePhaseCommitCohort#abort()} on all
230 * for all results. If any of the abort failed throws
231 * IllegalStateException,
232 * which will contains originalCause as suppressed Exception.
234 * If aborts we're successful throws supplied exception
236 * @param originalCause
237 * Exception which should be used to fail transaction for
238 * consumers of transaction
239 * future and listeners of transaction failure.
240 * @throws TransactionCommitFailedException
241 * on invocation of this method.
243 * @throws IllegalStateException
246 private void abortBlocking(final TransactionCommitFailedException originalCause)
247 throws TransactionCommitFailedException {
248 LOG.warn("Tx: {} Error during phase {}, starting Abort", tx.getIdentifier(), currentPhase, originalCause);
249 Exception cause = originalCause;
251 abortAsyncAll().get();
252 } catch (InterruptedException | ExecutionException e) {
253 LOG.error("Tx: {} Error during Abort.", tx.getIdentifier(), e);
254 cause = new IllegalStateException("Abort failed.", e);
255 cause.addSuppressed(e);
257 Throwables.propagateIfPossible(cause, TransactionCommitFailedException.class);
262 * Invokes preCommit on underlying cohorts and returns future
263 * which will complete once all preCommit on cohorts completed or
267 * Valid state transition is from CAN_COMMIT to PRE_COMMIT, if current
268 * state is not CAN_COMMIT
269 * throws IllegalStateException.
271 * @return Future which will complete once all cohorts completed
273 * Future throws TransactionCommitFailedException
274 * If any of cohorts failed preCommit
277 private CheckedFuture<Void, TransactionCommitFailedException> preCommitAll() {
278 changeStateFrom(CommitPhase.CAN_COMMIT, CommitPhase.PRE_COMMIT);
279 Builder<ListenableFuture<Void>> ops = ImmutableList.builder();
280 for (DOMStoreThreePhaseCommitCohort cohort : cohorts) {
281 ops.add(cohort.preCommit());
284 * We are returing all futures as list, not only succeeded ones in
285 * order to fail composite future if any of them failed.
286 * See Futures.allAsList for this description.
288 @SuppressWarnings({ "unchecked", "rawtypes" })
289 ListenableFuture<Void> compositeResult = (ListenableFuture) Futures.allAsList(ops.build());
290 return Futures.makeChecked(compositeResult, TransactionCommitFailedExceptionMapper.PRE_COMMIT_MAPPER);
295 * Invokes commit on underlying cohorts and returns future which
297 * once all commits on cohorts are completed.
299 * Valid state transition is from PRE_COMMIT to COMMIT, if not throws
300 * IllegalStateException
302 * @return Future which will complete once all cohorts completed
304 * Future throws TransactionCommitFailedException
305 * If any of cohorts failed preCommit
308 private CheckedFuture<Void, TransactionCommitFailedException> commitAll() {
309 changeStateFrom(CommitPhase.PRE_COMMIT, CommitPhase.COMMIT);
310 Builder<ListenableFuture<Void>> ops = ImmutableList.builder();
311 for (DOMStoreThreePhaseCommitCohort cohort : cohorts) {
312 ops.add(cohort.commit());
315 * We are returing all futures as list, not only succeeded ones in
316 * order to fail composite future if any of them failed.
317 * See Futures.allAsList for this description.
319 @SuppressWarnings({ "unchecked", "rawtypes" })
320 ListenableFuture<Void> compositeResult = (ListenableFuture) Futures.allAsList(ops.build());
321 return Futures.makeChecked(compositeResult, TransactionCommitFailedExceptionMapper.COMMIT_ERROR_MAPPER);
326 * Invokes canCommit on underlying cohorts and returns composite future
327 * which will contains {@link Boolean#TRUE} only and only if
328 * all cohorts returned true.
330 * Valid state transition is from SUBMITTED to CAN_COMMIT,
331 * if currentPhase is not SUBMITTED throws IllegalStateException.
333 * @return Future which will complete once all cohorts completed
335 * Future throws TransactionCommitFailedException
336 * If any of cohorts failed preCommit
339 private CheckedFuture<Boolean, TransactionCommitFailedException> canCommitAll() {
340 changeStateFrom(CommitPhase.SUBMITTED, CommitPhase.CAN_COMMIT);
341 Builder<ListenableFuture<Boolean>> canCommitOperations = ImmutableList.builder();
342 for (DOMStoreThreePhaseCommitCohort cohort : cohorts) {
343 canCommitOperations.add(cohort.canCommit());
345 ListenableFuture<List<Boolean>> allCanCommits = Futures.allAsList(canCommitOperations.build());
346 ListenableFuture<Boolean> allSuccessFuture = Futures.transform(allCanCommits, AND_FUNCTION);
348 .makeChecked(allSuccessFuture, TransactionCommitFailedExceptionMapper.CAN_COMMIT_ERROR_MAPPER);
354 * Invokes abort on underlying cohorts and returns future which
356 * once all abort on cohorts are completed.
358 * @return Future which will complete once all cohorts completed
362 private ListenableFuture<Void> abortAsyncAll() {
363 changeStateFrom(currentPhase, CommitPhase.ABORT);
364 Builder<ListenableFuture<Void>> ops = ImmutableList.builder();
365 for (DOMStoreThreePhaseCommitCohort cohort : cohorts) {
366 ops.add(cohort.abort());
369 * We are returing all futures as list, not only succeeded ones in
370 * order to fail composite future if any of them failed.
371 * See Futures.allAsList for this description.
373 @SuppressWarnings({ "unchecked", "rawtypes" })
374 ListenableFuture<Void> compositeResult = (ListenableFuture) Futures.allAsList(ops.build());
375 return compositeResult;
379 * Change phase / state of transaction from expected value to new value
381 * This method checks state and updates state to new state of
382 * of this task if current state equals expected state.
383 * If expected state and current state are different raises
384 * IllegalStateException
385 * which means there is probably bug in implementation of commit
388 * If transition is successful, it logs transition on DEBUG level.
390 * @param currentExpected
391 * Required phase for change of state
393 * New Phase which will be entered by transaction.
394 * @throws IllegalStateException
395 * If currentState of task does not match expected state
397 private synchronized void changeStateFrom(final CommitPhase currentExpected, final CommitPhase newState) {
398 Preconditions.checkState(currentPhase.equals(currentExpected),
399 "Invalid state transition: Tx: %s current state: %s new state: %s", tx.getIdentifier(),
400 currentPhase, newState);
401 LOG.debug("Transaction {}: Phase {} Started ", tx.getIdentifier(), newState);
402 currentPhase = newState;