Improve segmented journal actor metrics
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / main / java / org / opendaylight / controller / cluster / datastore / SimpleShardDataTreeCohort.java
1 /*
2  * Copyright (c) 2015 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.controller.cluster.datastore;
9
10 import static com.google.common.base.Verify.verifyNotNull;
11 import static java.util.Objects.requireNonNull;
12
13 import com.google.common.base.MoreObjects.ToStringHelper;
14 import com.google.common.base.Preconditions;
15 import com.google.common.primitives.UnsignedLong;
16 import com.google.common.util.concurrent.FutureCallback;
17 import java.util.Optional;
18 import java.util.SortedSet;
19 import java.util.concurrent.CompletionStage;
20 import org.eclipse.jdt.annotation.NonNull;
21 import org.eclipse.jdt.annotation.Nullable;
22 import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
23 import org.opendaylight.yangtools.yang.common.Empty;
24 import org.opendaylight.yangtools.yang.data.tree.api.DataTreeCandidate;
25 import org.opendaylight.yangtools.yang.data.tree.api.DataTreeCandidateTip;
26 import org.opendaylight.yangtools.yang.data.tree.api.DataTreeModification;
27 import org.slf4j.Logger;
28 import org.slf4j.LoggerFactory;
29
30 final class SimpleShardDataTreeCohort extends ShardDataTreeCohort {
31     private static final Logger LOG = LoggerFactory.getLogger(SimpleShardDataTreeCohort.class);
32
33     private final DataTreeModification transaction;
34     private final ShardDataTree dataTree;
35     private final @NonNull TransactionIdentifier transactionId;
36     private final CompositeDataTreeCohort userCohorts;
37     private final @Nullable SortedSet<String> participatingShardNames;
38
39     private State state = State.READY;
40     private DataTreeCandidateTip candidate;
41     private FutureCallback<?> callback;
42     private Exception nextFailure;
43
44     SimpleShardDataTreeCohort(final ShardDataTree dataTree, final DataTreeModification transaction,
45             final TransactionIdentifier transactionId, final CompositeDataTreeCohort userCohorts,
46             final Optional<SortedSet<String>> participatingShardNames) {
47         this.dataTree = requireNonNull(dataTree);
48         this.transaction = requireNonNull(transaction);
49         this.transactionId = requireNonNull(transactionId);
50         this.userCohorts = requireNonNull(userCohorts);
51         this.participatingShardNames = requireNonNull(participatingShardNames).orElse(null);
52     }
53
54     SimpleShardDataTreeCohort(final ShardDataTree dataTree, final DataTreeModification transaction,
55         final TransactionIdentifier transactionId, final Exception nextFailure) {
56         this.dataTree = requireNonNull(dataTree);
57         this.transaction = requireNonNull(transaction);
58         this.transactionId = requireNonNull(transactionId);
59         userCohorts = null;
60         participatingShardNames = null;
61         this.nextFailure = requireNonNull(nextFailure);
62     }
63
64     @Override
65     TransactionIdentifier transactionId() {
66         return transactionId;
67     }
68
69     @Override
70     DataTreeCandidateTip getCandidate() {
71         return candidate;
72     }
73
74     @Override
75     DataTreeModification getDataTreeModification() {
76         return transaction;
77     }
78
79     @Override
80     Optional<SortedSet<String>> getParticipatingShardNames() {
81         return Optional.ofNullable(participatingShardNames);
82     }
83
84     private void checkState(final State expected) {
85         Preconditions.checkState(state == expected, "State %s does not match expected state %s for %s",
86                 state, expected, transactionId());
87     }
88
89     @Override
90     public void canCommit(final FutureCallback<Empty> newCallback) {
91         if (state == State.CAN_COMMIT_PENDING) {
92             return;
93         }
94
95         checkState(State.READY);
96         callback = requireNonNull(newCallback);
97         state = State.CAN_COMMIT_PENDING;
98
99         if (nextFailure == null) {
100             dataTree.startCanCommit(this);
101         } else {
102             failedCanCommit(nextFailure);
103         }
104     }
105
106     @Override
107     public void preCommit(final FutureCallback<DataTreeCandidate> newCallback) {
108         checkState(State.CAN_COMMIT_COMPLETE);
109         callback = requireNonNull(newCallback);
110         state = State.PRE_COMMIT_PENDING;
111
112         if (nextFailure == null) {
113             dataTree.startPreCommit(this);
114         } else {
115             failedPreCommit(nextFailure);
116         }
117     }
118
119     @Override
120     public void abort(final FutureCallback<Empty> abortCallback) {
121         if (!dataTree.startAbort(this)) {
122             abortCallback.onSuccess(Empty.value());
123             return;
124         }
125
126         candidate = null;
127         state = State.ABORTED;
128
129         final Optional<CompletionStage<?>> maybeAborts = userCohorts.abort();
130         if (!maybeAborts.isPresent()) {
131             abortCallback.onSuccess(Empty.value());
132             return;
133         }
134
135         maybeAborts.orElseThrow().whenComplete((noop, failure) -> {
136             if (failure != null) {
137                 abortCallback.onFailure(failure);
138             } else {
139                 abortCallback.onSuccess(Empty.value());
140             }
141         });
142     }
143
144     @Override
145     public void commit(final FutureCallback<UnsignedLong> newCallback) {
146         checkState(State.PRE_COMMIT_COMPLETE);
147         callback = requireNonNull(newCallback);
148         state = State.COMMIT_PENDING;
149
150         if (nextFailure == null) {
151             dataTree.startCommit(this, candidate);
152         } else {
153             failedCommit(nextFailure);
154         }
155     }
156
157     private <T> FutureCallback<T> switchState(final State newState) {
158         @SuppressWarnings("unchecked")
159         final FutureCallback<T> ret = (FutureCallback<T>) callback;
160         callback = null;
161         LOG.debug("Transaction {} changing state from {} to {}", transactionId, state, newState);
162         state = newState;
163         return ret;
164     }
165
166     void setNewCandidate(final DataTreeCandidateTip dataTreeCandidate) {
167         checkState(State.PRE_COMMIT_COMPLETE);
168         candidate = verifyNotNull(dataTreeCandidate);
169     }
170
171     void successfulCanCommit() {
172         switchState(State.CAN_COMMIT_COMPLETE).onSuccess(Empty.value());
173     }
174
175     void failedCanCommit(final Exception cause) {
176         switchState(State.FAILED).onFailure(cause);
177     }
178
179     /**
180      * Run user-defined canCommit and preCommit hooks. We want to run these before we initiate persistence so that
181      * any failure to validate is propagated before we record the transaction.
182      *
183      * @param dataTreeCandidate {@link DataTreeCandidate} under consideration
184      * @param futureCallback the callback to invoke on completion, which may be immediate or async.
185      */
186     void userPreCommit(final DataTreeCandidate dataTreeCandidate, final FutureCallback<Empty> futureCallback) {
187         userCohorts.reset();
188
189         final Optional<CompletionStage<Empty>> maybeCanCommitFuture = userCohorts.canCommit(dataTreeCandidate);
190         if (!maybeCanCommitFuture.isPresent()) {
191             doUserPreCommit(futureCallback);
192             return;
193         }
194
195         maybeCanCommitFuture.orElseThrow().whenComplete((noop, failure) -> {
196             if (failure != null) {
197                 futureCallback.onFailure(failure);
198             } else {
199                 doUserPreCommit(futureCallback);
200             }
201         });
202     }
203
204     private void doUserPreCommit(final FutureCallback<Empty> futureCallback) {
205         final Optional<CompletionStage<Empty>> maybePreCommitFuture = userCohorts.preCommit();
206         if (!maybePreCommitFuture.isPresent()) {
207             futureCallback.onSuccess(Empty.value());
208             return;
209         }
210
211         maybePreCommitFuture.orElseThrow().whenComplete((noop, failure) -> {
212             if (failure != null) {
213                 futureCallback.onFailure(failure);
214             } else {
215                 futureCallback.onSuccess(Empty.value());
216             }
217         });
218     }
219
220     void successfulPreCommit(final DataTreeCandidateTip dataTreeCandidate) {
221         LOG.trace("Transaction {} prepared candidate {}", transaction, dataTreeCandidate);
222         candidate = verifyNotNull(dataTreeCandidate);
223         switchState(State.PRE_COMMIT_COMPLETE).onSuccess(dataTreeCandidate);
224     }
225
226     void failedPreCommit(final Throwable cause) {
227         if (LOG.isTraceEnabled()) {
228             LOG.trace("Transaction {} failed to prepare", transaction, cause);
229         } else {
230             LOG.error("Transaction {} failed to prepare", transactionId, cause);
231         }
232
233         userCohorts.abort();
234         switchState(State.FAILED).onFailure(cause);
235     }
236
237     void successfulCommit(final UnsignedLong journalIndex, final Runnable onComplete) {
238         final Optional<CompletionStage<Empty>> maybeCommitFuture = userCohorts.commit();
239         if (!maybeCommitFuture.isPresent()) {
240             finishSuccessfulCommit(journalIndex, onComplete);
241             return;
242         }
243
244         maybeCommitFuture.orElseThrow().whenComplete((noop, failure) -> {
245             if (failure != null) {
246                 LOG.error("User cohorts failed to commit", failure);
247             }
248
249             finishSuccessfulCommit(journalIndex, onComplete);
250         });
251     }
252
253     private void finishSuccessfulCommit(final UnsignedLong journalIndex, final Runnable onComplete) {
254         switchState(State.COMMITTED).onSuccess(journalIndex);
255         onComplete.run();
256     }
257
258     void failedCommit(final Exception cause) {
259         if (LOG.isTraceEnabled()) {
260             LOG.trace("Transaction {} failed to commit", transaction, cause);
261         } else {
262             LOG.error("Transaction failed to commit", cause);
263         }
264
265         userCohorts.abort();
266         switchState(State.FAILED).onFailure(cause);
267     }
268
269     @Override
270     public State getState() {
271         return state;
272     }
273
274     void reportFailure(final Exception cause) {
275         if (nextFailure == null) {
276             nextFailure = requireNonNull(cause);
277         } else {
278             LOG.debug("Transaction {} already has a set failure, not updating it", transactionId, cause);
279         }
280     }
281
282     @Override
283     public boolean isFailed() {
284         return state == State.FAILED || nextFailure != null;
285     }
286
287     @Override
288     ToStringHelper addToStringAttributes(final ToStringHelper toStringHelper) {
289         return super.addToStringAttributes(toStringHelper).add("nextFailure", nextFailure);
290     }
291 }