BUG-5280: fix invalid local transaction replay
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / main / java / org / opendaylight / controller / cluster / datastore / SimpleShardDataTreeCohort.java
1 /*
2  * Copyright (c) 2015 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.controller.cluster.datastore;
9
10 import akka.dispatch.ExecutionContexts;
11 import akka.dispatch.OnComplete;
12 import com.google.common.base.Preconditions;
13 import com.google.common.base.Verify;
14 import com.google.common.primitives.UnsignedLong;
15 import com.google.common.util.concurrent.FutureCallback;
16 import java.util.Optional;
17 import java.util.concurrent.ExecutionException;
18 import java.util.concurrent.TimeoutException;
19 import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
20 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidate;
21 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidateTip;
22 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeModification;
23 import org.slf4j.Logger;
24 import org.slf4j.LoggerFactory;
25 import scala.concurrent.Future;
26
27 abstract class SimpleShardDataTreeCohort extends ShardDataTreeCohort {
28     static final class DeadOnArrival extends SimpleShardDataTreeCohort {
29         private final Exception failure;
30
31         DeadOnArrival(final ShardDataTree dataTree, final DataTreeModification transaction,
32             final TransactionIdentifier transactionId, final Exception failure) {
33             super(dataTree, transaction, transactionId, null);
34             this.failure = Preconditions.checkNotNull(failure);
35         }
36
37         @Override
38         void throwCanCommitFailure() throws Exception {
39             throw failure;
40         }
41     }
42
43     static final class Normal extends SimpleShardDataTreeCohort {
44         Normal(final ShardDataTree dataTree, final DataTreeModification transaction,
45             final TransactionIdentifier transactionId, final CompositeDataTreeCohort userCohorts) {
46             super(dataTree, transaction, transactionId, Preconditions.checkNotNull(userCohorts));
47         }
48
49         @Override
50         void throwCanCommitFailure() {
51             // No-op
52         }
53     }
54
55     private static final Logger LOG = LoggerFactory.getLogger(SimpleShardDataTreeCohort.class);
56
57     private final DataTreeModification transaction;
58     private final ShardDataTree dataTree;
59     private final TransactionIdentifier transactionId;
60     private final CompositeDataTreeCohort userCohorts;
61
62     private State state = State.READY;
63     private DataTreeCandidateTip candidate;
64     private FutureCallback<?> callback;
65     private Exception nextFailure;
66
67     SimpleShardDataTreeCohort(final ShardDataTree dataTree, final DataTreeModification transaction,
68             final TransactionIdentifier transactionId, final CompositeDataTreeCohort userCohorts) {
69         this.dataTree = Preconditions.checkNotNull(dataTree);
70         this.transaction = Preconditions.checkNotNull(transaction);
71         this.transactionId = Preconditions.checkNotNull(transactionId);
72         this.userCohorts = userCohorts;
73     }
74
75     @Override
76     public TransactionIdentifier getIdentifier() {
77         return transactionId;
78     }
79
80     @Override
81     DataTreeCandidateTip getCandidate() {
82         return candidate;
83     }
84
85     @Override
86     DataTreeModification getDataTreeModification() {
87         return transaction;
88     }
89
90     private void checkState(final State expected) {
91         Preconditions.checkState(state == expected, "State %s does not match expected state %s", state, expected);
92     }
93
94     @Override
95     public void canCommit(final FutureCallback<Void> newCallback) {
96         if (state == State.CAN_COMMIT_PENDING) {
97             return;
98         }
99
100         checkState(State.READY);
101         this.callback = Preconditions.checkNotNull(newCallback);
102         state = State.CAN_COMMIT_PENDING;
103         dataTree.startCanCommit(this);
104     }
105
106     @Override
107     public void preCommit(final FutureCallback<DataTreeCandidate> newCallback) {
108         checkState(State.CAN_COMMIT_COMPLETE);
109         this.callback = Preconditions.checkNotNull(newCallback);
110         state = State.PRE_COMMIT_PENDING;
111
112         if (nextFailure == null) {
113             dataTree.startPreCommit(this);
114         } else {
115             failedPreCommit(nextFailure);
116         }
117     }
118
119     @Override
120     public void abort(final FutureCallback<Void> abortCallback) {
121         if (!dataTree.startAbort(this)) {
122             abortCallback.onSuccess(null);
123             return;
124         }
125
126         candidate = null;
127         state = State.ABORTED;
128
129         final Optional<Future<Iterable<Object>>> maybeAborts = userCohorts.abort();
130         if (!maybeAborts.isPresent()) {
131             abortCallback.onSuccess(null);
132             return;
133         }
134
135         final Future<Iterable<Object>> aborts = maybeAborts.get();
136         if (aborts.isCompleted()) {
137             abortCallback.onSuccess(null);
138             return;
139         }
140
141         aborts.onComplete(new OnComplete<Iterable<Object>>() {
142             @Override
143             public void onComplete(final Throwable failure, final Iterable<Object> objs) {
144                 if (failure != null) {
145                     abortCallback.onFailure(failure);
146                 } else {
147                     abortCallback.onSuccess(null);
148                 }
149             }
150         }, ExecutionContexts.global());
151     }
152
153     @Override
154     public void commit(final FutureCallback<UnsignedLong> newCallback) {
155         checkState(State.PRE_COMMIT_COMPLETE);
156         this.callback = Preconditions.checkNotNull(newCallback);
157         state = State.COMMIT_PENDING;
158
159         if (nextFailure == null) {
160             dataTree.startCommit(this, candidate);
161         } else {
162             failedCommit(nextFailure);
163         }
164     }
165
166     private <T> FutureCallback<T> switchState(final State newState) {
167         @SuppressWarnings("unchecked")
168         final FutureCallback<T> ret = (FutureCallback<T>) this.callback;
169         this.callback = null;
170         LOG.debug("Transaction {} changing state from {} to {}", transactionId, state, newState);
171         this.state = newState;
172         return ret;
173     }
174
175     void setNewCandidate(final DataTreeCandidateTip dataTreeCandidate) {
176         checkState(State.PRE_COMMIT_COMPLETE);
177         this.candidate = Verify.verifyNotNull(dataTreeCandidate);
178     }
179
180     void successfulCanCommit() {
181         switchState(State.CAN_COMMIT_COMPLETE).onSuccess(null);
182     }
183
184     void failedCanCommit(final Exception cause) {
185         switchState(State.FAILED).onFailure(cause);
186     }
187
188     /**
189      * Run user-defined canCommit and preCommit hooks. We want to run these before we initiate persistence so that
190      * any failure to validate is propagated before we record the transaction.
191      *
192      * @param dataTreeCandidate {@link DataTreeCandidate} under consideration
193      * @throws ExecutionException if the operation fails
194      * @throws TimeoutException if the operation times out
195      */
196     // FIXME: this should be asynchronous
197     void userPreCommit(final DataTreeCandidate dataTreeCandidate) throws ExecutionException, TimeoutException {
198         userCohorts.reset();
199         userCohorts.canCommit(dataTreeCandidate);
200         userCohorts.preCommit();
201     }
202
203     void successfulPreCommit(final DataTreeCandidateTip dataTreeCandidate) {
204         LOG.trace("Transaction {} prepared candidate {}", transaction, dataTreeCandidate);
205         this.candidate = Verify.verifyNotNull(dataTreeCandidate);
206         switchState(State.PRE_COMMIT_COMPLETE).onSuccess(dataTreeCandidate);
207     }
208
209     void failedPreCommit(final Exception cause) {
210         if (LOG.isTraceEnabled()) {
211             LOG.trace("Transaction {} failed to prepare", transaction, cause);
212         } else {
213             LOG.error("Transaction {} failed to prepare", transactionId, cause);
214         }
215
216         userCohorts.abort();
217         switchState(State.FAILED).onFailure(cause);
218     }
219
220     void successfulCommit(final UnsignedLong journalIndex) {
221         try {
222             userCohorts.commit();
223         } catch (TimeoutException | ExecutionException e) {
224             // We are probably dead, depending on what the cohorts end up doing
225             LOG.error("User cohorts failed to commit", e);
226         }
227
228         switchState(State.COMMITTED).onSuccess(journalIndex);
229     }
230
231     void failedCommit(final Exception cause) {
232         if (LOG.isTraceEnabled()) {
233             LOG.trace("Transaction {} failed to commit", transaction, cause);
234         } else {
235             LOG.error("Transaction failed to commit", cause);
236         }
237
238         userCohorts.abort();
239         switchState(State.FAILED).onFailure(cause);
240     }
241
242     @Override
243     public State getState() {
244         return state;
245     }
246
247     void reportFailure(final Exception cause) {
248         this.nextFailure = Preconditions.checkNotNull(cause);
249     }
250
251     /**
252      * If there is an initial failure, throw it so the caller can process it.
253      *
254      * @throws Exception reported failure.
255      */
256     abstract void throwCanCommitFailure() throws Exception;
257
258     @Override
259     public boolean isFailed() {
260         return state == State.FAILED || nextFailure != null;
261     }
262 }