Move AbstractDOMDataBroker to mdsal-dom-spi
[mdsal.git] / dom / mdsal-dom-broker / src / main / java / org / opendaylight / mdsal / dom / broker / CommitCoordinationTask.java
1 /*
2  * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.mdsal.dom.broker;
9
10 import static java.util.Objects.requireNonNull;
11
12 import com.google.common.base.Throwables;
13 import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
14 import java.util.concurrent.Callable;
15 import java.util.concurrent.ExecutionException;
16 import org.opendaylight.mdsal.common.api.CommitInfo;
17 import org.opendaylight.mdsal.common.api.TransactionCommitFailedException;
18 import org.opendaylight.mdsal.dom.api.DOMDataTreeWriteTransaction;
19 import org.opendaylight.mdsal.dom.spi.TransactionCommitFailedExceptionMapper;
20 import org.opendaylight.mdsal.dom.spi.store.DOMStoreThreePhaseCommitCohort;
21 import org.opendaylight.yangtools.util.DurationStatisticsTracker;
22 import org.slf4j.Logger;
23 import org.slf4j.LoggerFactory;
24
25 /**
26  * Implementation of blocking three-phase commit-coordination tasks without support of cancellation.
27  */
28 sealed class CommitCoordinationTask implements Callable<CommitInfo> {
29     static final class WithTracker extends CommitCoordinationTask {
30         private final DurationStatisticsTracker commitStatTracker;
31
32         WithTracker(final DOMDataTreeWriteTransaction transaction, final DOMStoreThreePhaseCommitCohort cohort,
33                 final DurationStatisticsTracker commitStatTracker) {
34             super(transaction, cohort);
35             this.commitStatTracker = requireNonNull(commitStatTracker);
36         }
37
38         @Override
39         public CommitInfo call() throws TransactionCommitFailedException {
40             final long startTime = System.nanoTime();
41
42             try {
43                 return super.call();
44             } finally {
45                 commitStatTracker.addDuration(System.nanoTime() - startTime);
46             }
47         }
48     }
49
50     private enum Phase {
51         CAN_COMMIT,
52         PRE_COMMIT,
53         DO_COMMIT
54     }
55
56     private static final Logger LOG = LoggerFactory.getLogger(CommitCoordinationTask.class);
57
58     private final DOMStoreThreePhaseCommitCohort cohort;
59     private final DOMDataTreeWriteTransaction tx;
60
61     CommitCoordinationTask(final DOMDataTreeWriteTransaction transaction, final DOMStoreThreePhaseCommitCohort cohort) {
62         tx = requireNonNull(transaction, "transaction must not be null");
63         this.cohort = requireNonNull(cohort, "cohort must not be null");
64     }
65
66     @Override
67     public CommitInfo call() throws TransactionCommitFailedException {
68         var phase = Phase.CAN_COMMIT;
69         try {
70             LOG.debug("Transaction {}: canCommit Started", tx.getIdentifier());
71             canCommitBlocking();
72
73             phase = Phase.PRE_COMMIT;
74             LOG.debug("Transaction {}: preCommit Started", tx.getIdentifier());
75             preCommitBlocking();
76
77             phase = Phase.DO_COMMIT;
78             LOG.debug("Transaction {}: doCommit Started", tx.getIdentifier());
79             commitBlocking();
80
81             LOG.debug("Transaction {}: doCommit completed", tx.getIdentifier());
82             return CommitInfo.empty();
83         } catch (final TransactionCommitFailedException e) {
84             LOG.warn("Tx: {} Error during phase {}, starting Abort", tx.getIdentifier(), phase, e);
85             abortBlocking(e);
86             throw e;
87         }
88     }
89
90     /**
91      * Invokes canCommit on underlying cohort and blocks till the result is returned.
92      *
93      * <p>
94      * Valid state transition is from SUBMITTED to CAN_COMMIT, if currentPhase is not SUBMITTED throws
95      * IllegalStateException.
96      *
97      * @throws TransactionCommitFailedException If cohort fails Commit
98      */
99     @SuppressFBWarnings("BC_UNCONFIRMED_CAST_OF_RETURN_VALUE")
100     private void canCommitBlocking() throws TransactionCommitFailedException {
101         final var future = cohort.canCommit();
102         final Boolean result;
103         try {
104             result = future.get();
105         } catch (InterruptedException | ExecutionException e) {
106             throw TransactionCommitFailedExceptionMapper.CAN_COMMIT_ERROR_MAPPER.apply(e);
107         }
108
109         if (!Boolean.TRUE.equals(result)) {
110             throw new TransactionCommitFailedException("Can Commit failed, no detailed cause available.");
111         }
112     }
113
114     /**
115      * Invokes preCommit on underlying cohort and blocks until the result is returned.
116      *
117      * <p>
118      * Valid state transition is from CAN_COMMIT to PRE_COMMIT, if current state is not CAN_COMMIT throws
119      * IllegalStateException.
120      *
121      * @throws TransactionCommitFailedException If cohort fails preCommit
122      */
123     @SuppressFBWarnings("BC_UNCONFIRMED_CAST_OF_RETURN_VALUE")
124     private void preCommitBlocking() throws TransactionCommitFailedException {
125         try {
126             cohort.preCommit().get();
127         } catch (InterruptedException | ExecutionException e) {
128             throw TransactionCommitFailedExceptionMapper.PRE_COMMIT_MAPPER.apply(e);
129         }
130     }
131
132     /**
133      * Invokes commit on underlying cohort and blocks until result is returned.
134      *
135      * <p>
136      * Valid state transition is from PRE_COMMIT to COMMIT, if not throws IllegalStateException.
137      *
138      * @throws TransactionCommitFailedException If cohort fails preCommit
139      */
140     @SuppressFBWarnings("BC_UNCONFIRMED_CAST_OF_RETURN_VALUE")
141     private void commitBlocking() throws TransactionCommitFailedException {
142         try {
143             cohort.commit().get();
144         } catch (InterruptedException | ExecutionException e) {
145             throw TransactionCommitFailedExceptionMapper.COMMIT_ERROR_MAPPER.apply(e);
146         }
147     }
148
149     /**
150      * Aborts transaction.
151      *
152      * <p>
153      * Invokes {@link DOMStoreThreePhaseCommitCohort#abort()} on underlying cohort, blocks the results. If
154      * abort failed throws IllegalStateException, which will contains originalCause as suppressed Exception.
155      *
156      * <p>
157      * If abort was successful throws supplied exception
158      *
159      * @param originalCause Exception which should be used to fail transaction for consumers of transaction future
160      *                      and listeners of transaction failure.
161      * @throws IllegalStateException if abort failed.
162      * @throws TransactionCommitFailedException on invocation of this method.
163      */
164     private void abortBlocking(final TransactionCommitFailedException originalCause)
165             throws TransactionCommitFailedException {
166         Exception cause = originalCause;
167         try {
168             cohort.abort().get();
169         } catch (InterruptedException | ExecutionException e) {
170             LOG.error("Tx: {} Error during Abort.", tx.getIdentifier(), e);
171             cause = new IllegalStateException("Abort failed.", e);
172             cause.addSuppressed(e);
173         }
174         Throwables.propagateIfPossible(cause, TransactionCommitFailedException.class);
175     }
176 }