b5b49c2396c772a0fbfdd75e4bcc1ff8e14ca1cb
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / main / java / org / opendaylight / controller / cluster / datastore / SimpleShardDataTreeCohort.java
1 /*
2  * Copyright (c) 2015 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.controller.cluster.datastore;
9
10 import static java.util.Objects.requireNonNull;
11
12 import com.google.common.base.MoreObjects.ToStringHelper;
13 import com.google.common.base.Preconditions;
14 import com.google.common.base.Verify;
15 import com.google.common.primitives.UnsignedLong;
16 import com.google.common.util.concurrent.FutureCallback;
17 import java.util.Optional;
18 import java.util.SortedSet;
19 import java.util.concurrent.CompletionStage;
20 import javax.annotation.Nullable;
21 import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
22 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidate;
23 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidateTip;
24 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeModification;
25 import org.slf4j.Logger;
26 import org.slf4j.LoggerFactory;
27
28 final class SimpleShardDataTreeCohort extends ShardDataTreeCohort {
29     private static final Logger LOG = LoggerFactory.getLogger(SimpleShardDataTreeCohort.class);
30
31     private final DataTreeModification transaction;
32     private final ShardDataTree dataTree;
33     private final TransactionIdentifier transactionId;
34     private final CompositeDataTreeCohort userCohorts;
35     @Nullable
36     private final SortedSet<String> participatingShardNames;
37
38     private State state = State.READY;
39     private DataTreeCandidateTip candidate;
40     private FutureCallback<?> callback;
41     private Exception nextFailure;
42
43     SimpleShardDataTreeCohort(final ShardDataTree dataTree, final DataTreeModification transaction,
44             final TransactionIdentifier transactionId, final CompositeDataTreeCohort userCohorts,
45             final Optional<SortedSet<String>> participatingShardNames) {
46         this.dataTree = requireNonNull(dataTree);
47         this.transaction = requireNonNull(transaction);
48         this.transactionId = requireNonNull(transactionId);
49         this.userCohorts = requireNonNull(userCohorts);
50         this.participatingShardNames = requireNonNull(participatingShardNames).orElse(null);
51     }
52
53     SimpleShardDataTreeCohort(final ShardDataTree dataTree, final DataTreeModification transaction,
54         final TransactionIdentifier transactionId, final Exception nextFailure) {
55         this.dataTree = requireNonNull(dataTree);
56         this.transaction = requireNonNull(transaction);
57         this.transactionId = requireNonNull(transactionId);
58         this.userCohorts = null;
59         this.participatingShardNames = null;
60         this.nextFailure = requireNonNull(nextFailure);
61     }
62
63     @Override
64     public TransactionIdentifier getIdentifier() {
65         return transactionId;
66     }
67
68     @Override
69     DataTreeCandidateTip getCandidate() {
70         return candidate;
71     }
72
73     @Override
74     DataTreeModification getDataTreeModification() {
75         return transaction;
76     }
77
78     @Override
79     Optional<SortedSet<String>> getParticipatingShardNames() {
80         return Optional.ofNullable(participatingShardNames);
81     }
82
83     private void checkState(final State expected) {
84         Preconditions.checkState(state == expected, "State %s does not match expected state %s for %s",
85                 state, expected, getIdentifier());
86     }
87
88     @Override
89     public void canCommit(final FutureCallback<Void> newCallback) {
90         if (state == State.CAN_COMMIT_PENDING) {
91             return;
92         }
93
94         checkState(State.READY);
95         this.callback = requireNonNull(newCallback);
96         state = State.CAN_COMMIT_PENDING;
97
98         if (nextFailure == null) {
99             dataTree.startCanCommit(this);
100         } else {
101             failedCanCommit(nextFailure);
102         }
103     }
104
105     @Override
106     public void preCommit(final FutureCallback<DataTreeCandidate> newCallback) {
107         checkState(State.CAN_COMMIT_COMPLETE);
108         this.callback = requireNonNull(newCallback);
109         state = State.PRE_COMMIT_PENDING;
110
111         if (nextFailure == null) {
112             dataTree.startPreCommit(this);
113         } else {
114             failedPreCommit(nextFailure);
115         }
116     }
117
118     @Override
119     public void abort(final FutureCallback<Void> abortCallback) {
120         if (!dataTree.startAbort(this)) {
121             abortCallback.onSuccess(null);
122             return;
123         }
124
125         candidate = null;
126         state = State.ABORTED;
127
128         final Optional<CompletionStage<?>> maybeAborts = userCohorts.abort();
129         if (!maybeAborts.isPresent()) {
130             abortCallback.onSuccess(null);
131             return;
132         }
133
134         maybeAborts.get().whenComplete((noop, failure) -> {
135             if (failure != null) {
136                 abortCallback.onFailure(failure);
137             } else {
138                 abortCallback.onSuccess(null);
139             }
140         });
141     }
142
143     @Override
144     public void commit(final FutureCallback<UnsignedLong> newCallback) {
145         checkState(State.PRE_COMMIT_COMPLETE);
146         this.callback = requireNonNull(newCallback);
147         state = State.COMMIT_PENDING;
148
149         if (nextFailure == null) {
150             dataTree.startCommit(this, candidate);
151         } else {
152             failedCommit(nextFailure);
153         }
154     }
155
156     private <T> FutureCallback<T> switchState(final State newState) {
157         @SuppressWarnings("unchecked")
158         final FutureCallback<T> ret = (FutureCallback<T>) this.callback;
159         this.callback = null;
160         LOG.debug("Transaction {} changing state from {} to {}", transactionId, state, newState);
161         this.state = newState;
162         return ret;
163     }
164
165     void setNewCandidate(final DataTreeCandidateTip dataTreeCandidate) {
166         checkState(State.PRE_COMMIT_COMPLETE);
167         this.candidate = Verify.verifyNotNull(dataTreeCandidate);
168     }
169
170     void successfulCanCommit() {
171         switchState(State.CAN_COMMIT_COMPLETE).onSuccess(null);
172     }
173
174     void failedCanCommit(final Exception cause) {
175         switchState(State.FAILED).onFailure(cause);
176     }
177
178     /**
179      * Run user-defined canCommit and preCommit hooks. We want to run these before we initiate persistence so that
180      * any failure to validate is propagated before we record the transaction.
181      *
182      * @param dataTreeCandidate {@link DataTreeCandidate} under consideration
183      * @param futureCallback the callback to invoke on completion, which may be immediate or async.
184      */
185     void userPreCommit(final DataTreeCandidate dataTreeCandidate, final FutureCallback<Void> futureCallback) {
186         userCohorts.reset();
187
188         final Optional<CompletionStage<Void>> maybeCanCommitFuture = userCohorts.canCommit(dataTreeCandidate);
189         if (!maybeCanCommitFuture.isPresent()) {
190             doUserPreCommit(futureCallback);
191             return;
192         }
193
194         maybeCanCommitFuture.get().whenComplete((noop, failure) -> {
195             if (failure != null) {
196                 futureCallback.onFailure(failure);
197             } else {
198                 doUserPreCommit(futureCallback);
199             }
200         });
201     }
202
203     private void doUserPreCommit(final FutureCallback<Void> futureCallback) {
204         final Optional<CompletionStage<Void>> maybePreCommitFuture = userCohorts.preCommit();
205         if (!maybePreCommitFuture.isPresent()) {
206             futureCallback.onSuccess(null);
207             return;
208         }
209
210         maybePreCommitFuture.get().whenComplete((noop, failure) -> {
211             if (failure != null) {
212                 futureCallback.onFailure(failure);
213             } else {
214                 futureCallback.onSuccess(null);
215             }
216         });
217     }
218
219     void successfulPreCommit(final DataTreeCandidateTip dataTreeCandidate) {
220         LOG.trace("Transaction {} prepared candidate {}", transaction, dataTreeCandidate);
221         this.candidate = Verify.verifyNotNull(dataTreeCandidate);
222         switchState(State.PRE_COMMIT_COMPLETE).onSuccess(dataTreeCandidate);
223     }
224
225     void failedPreCommit(final Throwable cause) {
226         if (LOG.isTraceEnabled()) {
227             LOG.trace("Transaction {} failed to prepare", transaction, cause);
228         } else {
229             LOG.error("Transaction {} failed to prepare", transactionId, cause);
230         }
231
232         userCohorts.abort();
233         switchState(State.FAILED).onFailure(cause);
234     }
235
236     void successfulCommit(final UnsignedLong journalIndex, final Runnable onComplete) {
237         final Optional<CompletionStage<Void>> maybeCommitFuture = userCohorts.commit();
238         if (!maybeCommitFuture.isPresent()) {
239             finishSuccessfulCommit(journalIndex, onComplete);
240             return;
241         }
242
243         maybeCommitFuture.get().whenComplete((noop, failure) -> {
244             if (failure != null) {
245                 LOG.error("User cohorts failed to commit", failure);
246             }
247
248             finishSuccessfulCommit(journalIndex, onComplete);
249         });
250     }
251
252     private void finishSuccessfulCommit(final UnsignedLong journalIndex, final Runnable onComplete) {
253         switchState(State.COMMITTED).onSuccess(journalIndex);
254         onComplete.run();
255     }
256
257     void failedCommit(final Exception cause) {
258         if (LOG.isTraceEnabled()) {
259             LOG.trace("Transaction {} failed to commit", transaction, cause);
260         } else {
261             LOG.error("Transaction failed to commit", cause);
262         }
263
264         userCohorts.abort();
265         switchState(State.FAILED).onFailure(cause);
266     }
267
268     @Override
269     public State getState() {
270         return state;
271     }
272
273     void reportFailure(final Exception cause) {
274         if (nextFailure == null) {
275             this.nextFailure = requireNonNull(cause);
276         } else {
277             LOG.debug("Transaction {} already has a set failure, not updating it", transactionId, cause);
278         }
279     }
280
281     @Override
282     public boolean isFailed() {
283         return state == State.FAILED || nextFailure != null;
284     }
285
286     @Override
287     ToStringHelper addToStringAttributes(final ToStringHelper toStringHelper) {
288         return super.addToStringAttributes(toStringHelper).add("nextFailure", nextFailure);
289     }
290 }