X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-dom-broker%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fmd%2Fsal%2Fdom%2Fbroker%2Fimpl%2FCommitCoordinationTask.java;h=bd16b5dbc98e314cbf775580627ecfab7b2eb830;hp=e0ac702dad59c8d0119460e13b9a87a9535a8cc1;hb=2a6aa1775604906755883f810ee9ea6d5f286135;hpb=f298b5a67b70daf3face69bf65483de544a6da61 diff --git a/opendaylight/md-sal/sal-dom-broker/src/main/java/org/opendaylight/controller/md/sal/dom/broker/impl/CommitCoordinationTask.java b/opendaylight/md-sal/sal-dom-broker/src/main/java/org/opendaylight/controller/md/sal/dom/broker/impl/CommitCoordinationTask.java index e0ac702dad..bd16b5dbc9 100644 --- a/opendaylight/md-sal/sal-dom-broker/src/main/java/org/opendaylight/controller/md/sal/dom/broker/impl/CommitCoordinationTask.java +++ b/opendaylight/md-sal/sal-dom-broker/src/main/java/org/opendaylight/controller/md/sal/dom/broker/impl/CommitCoordinationTask.java @@ -1,16 +1,19 @@ /* * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved. + * * This program and the accompanying materials are made available under the * terms of the Eclipse Public License v1.0 which accompanies this distribution, * and is available at http://www.eclipse.org/legal/epl-v10.html */ package org.opendaylight.controller.md.sal.dom.broker.impl; -import com.google.common.base.Preconditions; +import static java.util.Objects.requireNonNull; + +import com.google.common.base.Supplier; import com.google.common.base.Throwables; -import com.google.common.collect.Iterables; import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; +import java.util.Collection; import java.util.concurrent.Callable; import java.util.concurrent.ExecutionException; import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException; @@ -24,30 +27,32 @@ import org.slf4j.LoggerFactory; * Implementation of blocking three-phase commit-coordination tasks without * support of cancellation. */ -final class CommitCoordinationTask implements Callable { - private static enum Phase { +@Deprecated(forRemoval = true) +final class CommitCoordinationTask implements Callable { + private enum Phase { canCommit, preCommit, doCommit, - }; + } private static final Logger LOG = LoggerFactory.getLogger(CommitCoordinationTask.class); - private final Iterable cohorts; + private final Collection cohorts; private final DurationStatisticsTracker commitStatTracker; private final DOMDataWriteTransaction tx; - private final int cohortSize; + private final Supplier futureValueSupplier; - public CommitCoordinationTask(final DOMDataWriteTransaction transaction, - final Iterable cohorts, - final DurationStatisticsTracker commitStatTracker) { - this.tx = Preconditions.checkNotNull(transaction, "transaction must not be null"); - this.cohorts = Preconditions.checkNotNull(cohorts, "cohorts must not be null"); + CommitCoordinationTask(final DOMDataWriteTransaction transaction, + final Collection cohorts, + final DurationStatisticsTracker commitStatTracker, + final Supplier futureValueSupplier) { + this.tx = requireNonNull(transaction, "transaction must not be null"); + this.cohorts = requireNonNull(cohorts, "cohorts must not be null"); this.commitStatTracker = commitStatTracker; - this.cohortSize = Iterables.size(cohorts); + this.futureValueSupplier = futureValueSupplier; } @Override - public Void call() throws TransactionCommitFailedException { + public T call() throws TransactionCommitFailedException { final long startTime = commitStatTracker != null ? System.nanoTime() : 0; Phase phase = Phase.canCommit; @@ -65,8 +70,8 @@ final class CommitCoordinationTask implements Callable { commitBlocking(); LOG.debug("Transaction {}: doCommit completed", tx.getIdentifier()); - return null; - } catch (TransactionCommitFailedException e) { + return futureValueSupplier.get(); + } catch (final TransactionCommitFailedException e) { LOG.warn("Tx: {} Error during phase {}, starting Abort", tx.getIdentifier(), phase, e); abortBlocking(e); throw e; @@ -78,10 +83,10 @@ final class CommitCoordinationTask implements Callable { } /** - * * Invokes canCommit on underlying cohorts and blocks till * all results are returned. * + *

* Valid state transition is from SUBMITTED to CAN_COMMIT, * if currentPhase is not SUBMITTED throws IllegalStateException. * @@ -90,7 +95,7 @@ final class CommitCoordinationTask implements Callable { * */ private void canCommitBlocking() throws TransactionCommitFailedException { - for (ListenableFuture canCommit : canCommitAll()) { + for (final ListenableFuture canCommit : canCommitAll()) { try { final Boolean result = (Boolean)canCommit.get(); if (result == null || !result) { @@ -103,11 +108,11 @@ final class CommitCoordinationTask implements Callable { } /** - * * Invokes canCommit on underlying cohorts and returns composite future * which will contains {@link Boolean#TRUE} only and only if * all cohorts returned true. * + *

* Valid state transition is from SUBMITTED to CAN_COMMIT, * if currentPhase is not SUBMITTED throws IllegalStateException. * @@ -115,19 +120,19 @@ final class CommitCoordinationTask implements Callable { * */ private ListenableFuture[] canCommitAll() { - final ListenableFuture[] ops = new ListenableFuture[cohortSize]; - int i = 0; - for (DOMStoreThreePhaseCommitCohort cohort : cohorts) { - ops[i++] = cohort.canCommit(); + final ListenableFuture[] ops = new ListenableFuture[cohorts.size()]; + int index = 0; + for (final DOMStoreThreePhaseCommitCohort cohort : cohorts) { + ops[index++] = cohort.canCommit(); } return ops; } /** - * * Invokes preCommit on underlying cohorts and blocks till * all results are returned. * + *

* Valid state transition is from CAN_COMMIT to PRE_COMMIT, if current * state is not CAN_COMMIT * throws IllegalStateException. @@ -139,7 +144,7 @@ final class CommitCoordinationTask implements Callable { private void preCommitBlocking() throws TransactionCommitFailedException { final ListenableFuture[] preCommitFutures = preCommitAll(); try { - for(ListenableFuture future : preCommitFutures) { + for (final ListenableFuture future : preCommitFutures) { future.get(); } } catch (InterruptedException | ExecutionException e) { @@ -148,12 +153,11 @@ final class CommitCoordinationTask implements Callable { } /** - * * Invokes preCommit on underlying cohorts and returns future * which will complete once all preCommit on cohorts completed or * failed. * - * + *

* Valid state transition is from CAN_COMMIT to PRE_COMMIT, if current * state is not CAN_COMMIT * throws IllegalStateException. @@ -162,19 +166,19 @@ final class CommitCoordinationTask implements Callable { * */ private ListenableFuture[] preCommitAll() { - final ListenableFuture[] ops = new ListenableFuture[cohortSize]; - int i = 0; - for (DOMStoreThreePhaseCommitCohort cohort : cohorts) { - ops[i++] = cohort.preCommit(); + final ListenableFuture[] ops = new ListenableFuture[cohorts.size()]; + int index = 0; + for (final DOMStoreThreePhaseCommitCohort cohort : cohorts) { + ops[index++] = cohort.preCommit(); } return ops; } /** - * * Invokes commit on underlying cohorts and blocks till * all results are returned. * + *

* Valid state transition is from PRE_COMMIT to COMMIT, if not throws * IllegalStateException. * @@ -185,7 +189,7 @@ final class CommitCoordinationTask implements Callable { private void commitBlocking() throws TransactionCommitFailedException { final ListenableFuture[] commitFutures = commitAll(); try { - for(ListenableFuture future : commitFutures) { + for (final ListenableFuture future : commitFutures) { future.get(); } } catch (InterruptedException | ExecutionException e) { @@ -194,21 +198,21 @@ final class CommitCoordinationTask implements Callable { } /** - * * Invokes commit on underlying cohorts and returns future which * completes * once all commits on cohorts are completed. * + *

* Valid state transition is from PRE_COMMIT to COMMIT, if not throws * IllegalStateException * * @return List of all cohorts futures from can commit phase. */ private ListenableFuture[] commitAll() { - final ListenableFuture[] ops = new ListenableFuture[cohortSize]; - int i = 0; - for (DOMStoreThreePhaseCommitCohort cohort : cohorts) { - ops[i++] = cohort.commit(); + final ListenableFuture[] ops = new ListenableFuture[cohorts.size()]; + int index = 0; + for (final DOMStoreThreePhaseCommitCohort cohort : cohorts) { + ops[index++] = cohort.commit(); } return ops; } @@ -216,12 +220,14 @@ final class CommitCoordinationTask implements Callable { /** * Aborts transaction. * + *

* Invokes {@link DOMStoreThreePhaseCommitCohort#abort()} on all * cohorts, blocks * for all results. If any of the abort failed throws * IllegalStateException, * which will contains originalCause as suppressed Exception. * + *

* If aborts we're successful throws supplied exception * * @param originalCause @@ -235,7 +241,8 @@ final class CommitCoordinationTask implements Callable { * @throws IllegalStateException * if abort failed. */ - private void abortBlocking(final TransactionCommitFailedException originalCause) throws TransactionCommitFailedException { + private void abortBlocking( + final TransactionCommitFailedException originalCause) throws TransactionCommitFailedException { Exception cause = originalCause; try { abortAsyncAll().get(); @@ -254,12 +261,13 @@ final class CommitCoordinationTask implements Callable { * @return Future which will complete once all cohorts completed * abort. */ + @SuppressWarnings({"unchecked", "rawtypes"}) private ListenableFuture abortAsyncAll() { - final ListenableFuture[] ops = new ListenableFuture[cohortSize]; - int i = 0; - for (DOMStoreThreePhaseCommitCohort cohort : cohorts) { - ops[i++] = cohort.abort(); + final ListenableFuture[] ops = new ListenableFuture[cohorts.size()]; + int index = 0; + for (final DOMStoreThreePhaseCommitCohort cohort : cohorts) { + ops[index++] = cohort.abort(); } /* @@ -267,8 +275,6 @@ final class CommitCoordinationTask implements Callable { * order to fail composite future if any of them failed. * See Futures.allAsList for this description. */ - @SuppressWarnings({ "unchecked", "rawtypes" }) - ListenableFuture compositeResult = (ListenableFuture) Futures.allAsList(ops); - return compositeResult; + return (ListenableFuture) Futures.allAsList(ops); } }