Speed up DatastoreContextIntrospector a bit
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / main / java / org / opendaylight / controller / cluster / datastore / SimpleShardDataTreeCohort.java
1 /*
2  * Copyright (c) 2015 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.controller.cluster.datastore;
9
10 import static java.util.Objects.requireNonNull;
11
12 import com.google.common.base.MoreObjects.ToStringHelper;
13 import com.google.common.base.Preconditions;
14 import com.google.common.base.Verify;
15 import com.google.common.primitives.UnsignedLong;
16 import com.google.common.util.concurrent.FutureCallback;
17 import java.util.Optional;
18 import java.util.SortedSet;
19 import java.util.concurrent.CompletionStage;
20 import org.eclipse.jdt.annotation.Nullable;
21 import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
22 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidate;
23 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidateTip;
24 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeModification;
25 import org.slf4j.Logger;
26 import org.slf4j.LoggerFactory;
27
28 final class SimpleShardDataTreeCohort extends ShardDataTreeCohort {
29     private static final Logger LOG = LoggerFactory.getLogger(SimpleShardDataTreeCohort.class);
30
31     private final DataTreeModification transaction;
32     private final ShardDataTree dataTree;
33     private final TransactionIdentifier transactionId;
34     private final CompositeDataTreeCohort userCohorts;
35     private final @Nullable SortedSet<String> participatingShardNames;
36
37     private State state = State.READY;
38     private DataTreeCandidateTip candidate;
39     private FutureCallback<?> callback;
40     private Exception nextFailure;
41
42     SimpleShardDataTreeCohort(final ShardDataTree dataTree, final DataTreeModification transaction,
43             final TransactionIdentifier transactionId, final CompositeDataTreeCohort userCohorts,
44             final Optional<SortedSet<String>> participatingShardNames) {
45         this.dataTree = requireNonNull(dataTree);
46         this.transaction = requireNonNull(transaction);
47         this.transactionId = requireNonNull(transactionId);
48         this.userCohorts = requireNonNull(userCohorts);
49         this.participatingShardNames = requireNonNull(participatingShardNames).orElse(null);
50     }
51
52     SimpleShardDataTreeCohort(final ShardDataTree dataTree, final DataTreeModification transaction,
53         final TransactionIdentifier transactionId, final Exception nextFailure) {
54         this.dataTree = requireNonNull(dataTree);
55         this.transaction = requireNonNull(transaction);
56         this.transactionId = requireNonNull(transactionId);
57         this.userCohorts = null;
58         this.participatingShardNames = null;
59         this.nextFailure = requireNonNull(nextFailure);
60     }
61
62     @Override
63     public TransactionIdentifier getIdentifier() {
64         return transactionId;
65     }
66
67     @Override
68     DataTreeCandidateTip getCandidate() {
69         return candidate;
70     }
71
72     @Override
73     DataTreeModification getDataTreeModification() {
74         return transaction;
75     }
76
77     @Override
78     Optional<SortedSet<String>> getParticipatingShardNames() {
79         return Optional.ofNullable(participatingShardNames);
80     }
81
82     private void checkState(final State expected) {
83         Preconditions.checkState(state == expected, "State %s does not match expected state %s for %s",
84                 state, expected, getIdentifier());
85     }
86
87     @Override
88     public void canCommit(final FutureCallback<Void> newCallback) {
89         if (state == State.CAN_COMMIT_PENDING) {
90             return;
91         }
92
93         checkState(State.READY);
94         this.callback = requireNonNull(newCallback);
95         state = State.CAN_COMMIT_PENDING;
96
97         if (nextFailure == null) {
98             dataTree.startCanCommit(this);
99         } else {
100             failedCanCommit(nextFailure);
101         }
102     }
103
104     @Override
105     public void preCommit(final FutureCallback<DataTreeCandidate> newCallback) {
106         checkState(State.CAN_COMMIT_COMPLETE);
107         this.callback = requireNonNull(newCallback);
108         state = State.PRE_COMMIT_PENDING;
109
110         if (nextFailure == null) {
111             dataTree.startPreCommit(this);
112         } else {
113             failedPreCommit(nextFailure);
114         }
115     }
116
117     @Override
118     public void abort(final FutureCallback<Void> abortCallback) {
119         if (!dataTree.startAbort(this)) {
120             abortCallback.onSuccess(null);
121             return;
122         }
123
124         candidate = null;
125         state = State.ABORTED;
126
127         final Optional<CompletionStage<?>> maybeAborts = userCohorts.abort();
128         if (!maybeAborts.isPresent()) {
129             abortCallback.onSuccess(null);
130             return;
131         }
132
133         maybeAborts.get().whenComplete((noop, failure) -> {
134             if (failure != null) {
135                 abortCallback.onFailure(failure);
136             } else {
137                 abortCallback.onSuccess(null);
138             }
139         });
140     }
141
142     @Override
143     public void commit(final FutureCallback<UnsignedLong> newCallback) {
144         checkState(State.PRE_COMMIT_COMPLETE);
145         this.callback = requireNonNull(newCallback);
146         state = State.COMMIT_PENDING;
147
148         if (nextFailure == null) {
149             dataTree.startCommit(this, candidate);
150         } else {
151             failedCommit(nextFailure);
152         }
153     }
154
155     private <T> FutureCallback<T> switchState(final State newState) {
156         @SuppressWarnings("unchecked")
157         final FutureCallback<T> ret = (FutureCallback<T>) this.callback;
158         this.callback = null;
159         LOG.debug("Transaction {} changing state from {} to {}", transactionId, state, newState);
160         this.state = newState;
161         return ret;
162     }
163
164     void setNewCandidate(final DataTreeCandidateTip dataTreeCandidate) {
165         checkState(State.PRE_COMMIT_COMPLETE);
166         this.candidate = Verify.verifyNotNull(dataTreeCandidate);
167     }
168
169     void successfulCanCommit() {
170         switchState(State.CAN_COMMIT_COMPLETE).onSuccess(null);
171     }
172
173     void failedCanCommit(final Exception cause) {
174         switchState(State.FAILED).onFailure(cause);
175     }
176
177     /**
178      * Run user-defined canCommit and preCommit hooks. We want to run these before we initiate persistence so that
179      * any failure to validate is propagated before we record the transaction.
180      *
181      * @param dataTreeCandidate {@link DataTreeCandidate} under consideration
182      * @param futureCallback the callback to invoke on completion, which may be immediate or async.
183      */
184     void userPreCommit(final DataTreeCandidate dataTreeCandidate, final FutureCallback<Void> futureCallback) {
185         userCohorts.reset();
186
187         final Optional<CompletionStage<Void>> maybeCanCommitFuture = userCohorts.canCommit(dataTreeCandidate);
188         if (!maybeCanCommitFuture.isPresent()) {
189             doUserPreCommit(futureCallback);
190             return;
191         }
192
193         maybeCanCommitFuture.get().whenComplete((noop, failure) -> {
194             if (failure != null) {
195                 futureCallback.onFailure(failure);
196             } else {
197                 doUserPreCommit(futureCallback);
198             }
199         });
200     }
201
202     private void doUserPreCommit(final FutureCallback<Void> futureCallback) {
203         final Optional<CompletionStage<Void>> maybePreCommitFuture = userCohorts.preCommit();
204         if (!maybePreCommitFuture.isPresent()) {
205             futureCallback.onSuccess(null);
206             return;
207         }
208
209         maybePreCommitFuture.get().whenComplete((noop, failure) -> {
210             if (failure != null) {
211                 futureCallback.onFailure(failure);
212             } else {
213                 futureCallback.onSuccess(null);
214             }
215         });
216     }
217
218     void successfulPreCommit(final DataTreeCandidateTip dataTreeCandidate) {
219         LOG.trace("Transaction {} prepared candidate {}", transaction, dataTreeCandidate);
220         this.candidate = Verify.verifyNotNull(dataTreeCandidate);
221         switchState(State.PRE_COMMIT_COMPLETE).onSuccess(dataTreeCandidate);
222     }
223
224     void failedPreCommit(final Throwable cause) {
225         if (LOG.isTraceEnabled()) {
226             LOG.trace("Transaction {} failed to prepare", transaction, cause);
227         } else {
228             LOG.error("Transaction {} failed to prepare", transactionId, cause);
229         }
230
231         userCohorts.abort();
232         switchState(State.FAILED).onFailure(cause);
233     }
234
235     void successfulCommit(final UnsignedLong journalIndex, final Runnable onComplete) {
236         final Optional<CompletionStage<Void>> maybeCommitFuture = userCohorts.commit();
237         if (!maybeCommitFuture.isPresent()) {
238             finishSuccessfulCommit(journalIndex, onComplete);
239             return;
240         }
241
242         maybeCommitFuture.get().whenComplete((noop, failure) -> {
243             if (failure != null) {
244                 LOG.error("User cohorts failed to commit", failure);
245             }
246
247             finishSuccessfulCommit(journalIndex, onComplete);
248         });
249     }
250
251     private void finishSuccessfulCommit(final UnsignedLong journalIndex, final Runnable onComplete) {
252         switchState(State.COMMITTED).onSuccess(journalIndex);
253         onComplete.run();
254     }
255
256     void failedCommit(final Exception cause) {
257         if (LOG.isTraceEnabled()) {
258             LOG.trace("Transaction {} failed to commit", transaction, cause);
259         } else {
260             LOG.error("Transaction failed to commit", cause);
261         }
262
263         userCohorts.abort();
264         switchState(State.FAILED).onFailure(cause);
265     }
266
267     @Override
268     public State getState() {
269         return state;
270     }
271
272     void reportFailure(final Exception cause) {
273         if (nextFailure == null) {
274             this.nextFailure = requireNonNull(cause);
275         } else {
276             LOG.debug("Transaction {} already has a set failure, not updating it", transactionId, cause);
277         }
278     }
279
280     @Override
281     public boolean isFailed() {
282         return state == State.FAILED || nextFailure != null;
283     }
284
285     @Override
286     ToStringHelper addToStringAttributes(final ToStringHelper toStringHelper) {
287         return super.addToStringAttributes(toStringHelper).add("nextFailure", nextFailure);
288     }
289 }