BUG-7033: Fix commit exception due to pipe-lining
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / main / java / org / opendaylight / controller / cluster / datastore / SimpleShardDataTreeCohort.java
1 /*
2  * Copyright (c) 2015 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.controller.cluster.datastore;
9
10 import akka.dispatch.ExecutionContexts;
11 import akka.dispatch.OnComplete;
12 import com.google.common.base.Preconditions;
13 import com.google.common.base.Verify;
14 import com.google.common.primitives.UnsignedLong;
15 import com.google.common.util.concurrent.FutureCallback;
16 import java.util.Optional;
17 import java.util.concurrent.ExecutionException;
18 import java.util.concurrent.TimeoutException;
19 import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
20 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidate;
21 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidateTip;
22 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeModification;
23 import org.slf4j.Logger;
24 import org.slf4j.LoggerFactory;
25 import scala.concurrent.Future;
26
27 final class SimpleShardDataTreeCohort extends ShardDataTreeCohort {
28     private static final Logger LOG = LoggerFactory.getLogger(SimpleShardDataTreeCohort.class);
29
30     private final DataTreeModification transaction;
31     private final ShardDataTree dataTree;
32     private final TransactionIdentifier transactionId;
33     private final CompositeDataTreeCohort userCohorts;
34
35     private State state = State.READY;
36     private DataTreeCandidateTip candidate;
37     private FutureCallback<?> callback;
38     private Exception nextFailure;
39
40     SimpleShardDataTreeCohort(final ShardDataTree dataTree, final DataTreeModification transaction,
41             final TransactionIdentifier transactionId, final CompositeDataTreeCohort userCohorts) {
42         this.dataTree = Preconditions.checkNotNull(dataTree);
43         this.transaction = Preconditions.checkNotNull(transaction);
44         this.transactionId = Preconditions.checkNotNull(transactionId);
45         this.userCohorts = Preconditions.checkNotNull(userCohorts);
46     }
47
48     @Override
49     public TransactionIdentifier getIdentifier() {
50         return transactionId;
51     }
52
53     @Override
54     DataTreeCandidateTip getCandidate() {
55         return candidate;
56     }
57
58     @Override
59     DataTreeModification getDataTreeModification() {
60         return transaction;
61     }
62
63     private void checkState(final State expected) {
64         Preconditions.checkState(state == expected, "State %s does not match expected state %s", state, expected);
65     }
66
67     @Override
68     public void canCommit(final FutureCallback<Void> newCallback) {
69         if (state == State.CAN_COMMIT_PENDING) {
70             return;
71         }
72
73         checkState(State.READY);
74         this.callback = Preconditions.checkNotNull(newCallback);
75         state = State.CAN_COMMIT_PENDING;
76         dataTree.startCanCommit(this);
77     }
78
79     @Override
80     public void preCommit(final FutureCallback<DataTreeCandidate> newCallback) {
81         checkState(State.CAN_COMMIT_COMPLETE);
82         this.callback = Preconditions.checkNotNull(newCallback);
83         state = State.PRE_COMMIT_PENDING;
84
85         if (nextFailure == null) {
86             dataTree.startPreCommit(this);
87         } else {
88             failedPreCommit(nextFailure);
89         }
90     }
91
92     @Override
93     public void abort(final FutureCallback<Void> abortCallback) {
94         if (!dataTree.startAbort(this)) {
95             abortCallback.onSuccess(null);
96             return;
97         }
98
99         candidate = null;
100         state = State.ABORTED;
101
102         final Optional<Future<Iterable<Object>>> maybeAborts = userCohorts.abort();
103         if (!maybeAborts.isPresent()) {
104             abortCallback.onSuccess(null);
105             return;
106         }
107
108         final Future<Iterable<Object>> aborts = maybeAborts.get();
109         if (aborts.isCompleted()) {
110             abortCallback.onSuccess(null);
111             return;
112         }
113
114         aborts.onComplete(new OnComplete<Iterable<Object>>() {
115             @Override
116             public void onComplete(final Throwable failure, final Iterable<Object> objs) {
117                 if (failure != null) {
118                     abortCallback.onFailure(failure);
119                 } else {
120                     abortCallback.onSuccess(null);
121                 }
122             }
123         }, ExecutionContexts.global());
124     }
125
126     @Override
127     public void commit(final FutureCallback<UnsignedLong> newCallback) {
128         checkState(State.PRE_COMMIT_COMPLETE);
129         this.callback = Preconditions.checkNotNull(newCallback);
130         state = State.COMMIT_PENDING;
131
132         if (nextFailure == null) {
133             dataTree.startCommit(this, candidate);
134         } else {
135             failedCommit(nextFailure);
136         }
137     }
138
139     private <T> FutureCallback<T> switchState(final State newState) {
140         @SuppressWarnings("unchecked")
141         final FutureCallback<T> ret = (FutureCallback<T>) this.callback;
142         this.callback = null;
143         LOG.debug("Transaction {} changing state from {} to {}", transactionId, state, newState);
144         this.state = newState;
145         return ret;
146     }
147
148     void setNewCandidate(DataTreeCandidateTip dataTreeCandidate) {
149         checkState(State.PRE_COMMIT_COMPLETE);
150         this.candidate = Verify.verifyNotNull(dataTreeCandidate);
151     }
152
153     void successfulCanCommit() {
154         switchState(State.CAN_COMMIT_COMPLETE).onSuccess(null);
155     }
156
157     void failedCanCommit(final Exception cause) {
158         switchState(State.FAILED).onFailure(cause);
159     }
160
161     /**
162      * Run user-defined canCommit and preCommit hooks. We want to run these before we initiate persistence so that
163      * any failure to validate is propagated before we record the transaction.
164      *
165      * @param dataTreeCandidate {@link DataTreeCandidate} under consideration
166      * @throws ExecutionException if the operation fails
167      * @throws TimeoutException if the operation times out
168      */
169     // FIXME: this should be asynchronous
170     void userPreCommit(final DataTreeCandidate dataTreeCandidate) throws ExecutionException, TimeoutException {
171         userCohorts.reset();
172         userCohorts.canCommit(dataTreeCandidate);
173         userCohorts.preCommit();
174     }
175
176     void successfulPreCommit(final DataTreeCandidateTip dataTreeCandidate) {
177         LOG.trace("Transaction {} prepared candidate {}", transaction, dataTreeCandidate);
178         this.candidate = Verify.verifyNotNull(dataTreeCandidate);
179         switchState(State.PRE_COMMIT_COMPLETE).onSuccess(dataTreeCandidate);
180     }
181
182     void failedPreCommit(final Exception cause) {
183         if (LOG.isTraceEnabled()) {
184             LOG.trace("Transaction {} failed to prepare", transaction, cause);
185         } else {
186             LOG.error("Transaction {} failed to prepare", transactionId, cause);
187         }
188
189         userCohorts.abort();
190         switchState(State.FAILED).onFailure(cause);
191     }
192
193     void successfulCommit(final UnsignedLong journalIndex) {
194         try {
195             userCohorts.commit();
196         } catch (TimeoutException | ExecutionException e) {
197             // We are probably dead, depending on what the cohorts end up doing
198             LOG.error("User cohorts failed to commit", e);
199         }
200
201         switchState(State.COMMITTED).onSuccess(journalIndex);
202     }
203
204     void failedCommit(final Exception cause) {
205         if (LOG.isTraceEnabled()) {
206             LOG.trace("Transaction {} failed to commit", transaction, cause);
207         } else {
208             LOG.error("Transaction failed to commit", cause);
209         }
210
211         userCohorts.abort();
212         switchState(State.FAILED).onFailure(cause);
213     }
214
215     void finishCommitPending() {
216         checkState(State.COMMIT_PENDING);
217         // We want to switch the state but keep the callback.
218         callback = switchState(State.FINISH_COMMIT_PENDING);
219     }
220
221     @Override
222     public State getState() {
223         return state;
224     }
225
226     void reportFailure(final Exception cause) {
227         this.nextFailure = Preconditions.checkNotNull(cause);
228     }
229
230     @Override
231     public boolean isFailed() {
232         return state == State.FAILED || nextFailure != null;
233     }
234 }

©2013 OpenDaylight, A Linux Foundation Collaborative Project. All Rights Reserved.
OpenDaylight is a registered trademark of The OpenDaylight Project, Inc.
Linux Foundation and OpenDaylight are registered trademarks of the Linux Foundation.
Linux is a registered trademark of Linus Torvalds.