BUG-8618: eliminate SimpleShardDataTreeCohort subclasses
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / main / java / org / opendaylight / controller / cluster / datastore / SimpleShardDataTreeCohort.java
1 /*
2  * Copyright (c) 2015 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.controller.cluster.datastore;
9
10 import akka.dispatch.ExecutionContexts;
11 import akka.dispatch.Futures;
12 import akka.dispatch.OnComplete;
13 import com.google.common.base.MoreObjects.ToStringHelper;
14 import com.google.common.base.Preconditions;
15 import com.google.common.base.Verify;
16 import com.google.common.primitives.UnsignedLong;
17 import com.google.common.util.concurrent.FutureCallback;
18 import java.util.List;
19 import java.util.Optional;
20 import java.util.concurrent.ExecutionException;
21 import java.util.concurrent.TimeoutException;
22 import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
23 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidate;
24 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidateTip;
25 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeModification;
26 import org.slf4j.Logger;
27 import org.slf4j.LoggerFactory;
28 import scala.concurrent.Future;
29
30 final class SimpleShardDataTreeCohort extends ShardDataTreeCohort {
31     private static final Logger LOG = LoggerFactory.getLogger(SimpleShardDataTreeCohort.class);
32
33     private final DataTreeModification transaction;
34     private final ShardDataTree dataTree;
35     private final TransactionIdentifier transactionId;
36     private final CompositeDataTreeCohort userCohorts;
37
38     private State state = State.READY;
39     private DataTreeCandidateTip candidate;
40     private FutureCallback<?> callback;
41     private Exception nextFailure;
42
43     SimpleShardDataTreeCohort(final ShardDataTree dataTree, final DataTreeModification transaction,
44             final TransactionIdentifier transactionId, final CompositeDataTreeCohort userCohorts) {
45         this.dataTree = Preconditions.checkNotNull(dataTree);
46         this.transaction = Preconditions.checkNotNull(transaction);
47         this.transactionId = Preconditions.checkNotNull(transactionId);
48         this.userCohorts = Preconditions.checkNotNull(userCohorts);
49     }
50
51     SimpleShardDataTreeCohort(final ShardDataTree dataTree, final DataTreeModification transaction,
52         final TransactionIdentifier transactionId, final Exception nextFailure) {
53         this.dataTree = Preconditions.checkNotNull(dataTree);
54         this.transaction = Preconditions.checkNotNull(transaction);
55         this.transactionId = Preconditions.checkNotNull(transactionId);
56         this.userCohorts = null;
57         this.nextFailure = Preconditions.checkNotNull(nextFailure);
58     }
59
60     @Override
61     public TransactionIdentifier getIdentifier() {
62         return transactionId;
63     }
64
65     @Override
66     DataTreeCandidateTip getCandidate() {
67         return candidate;
68     }
69
70     @Override
71     DataTreeModification getDataTreeModification() {
72         return transaction;
73     }
74
75     private void checkState(final State expected) {
76         Preconditions.checkState(state == expected, "State %s does not match expected state %s", state, expected);
77     }
78
79     @Override
80     public void canCommit(final FutureCallback<Void> newCallback) {
81         if (state == State.CAN_COMMIT_PENDING) {
82             return;
83         }
84
85         checkState(State.READY);
86         this.callback = Preconditions.checkNotNull(newCallback);
87         state = State.CAN_COMMIT_PENDING;
88
89         if (nextFailure == null) {
90             dataTree.startCanCommit(this);
91         } else {
92             failedCanCommit(nextFailure);
93         }
94     }
95
96     @Override
97     public void preCommit(final FutureCallback<DataTreeCandidate> newCallback) {
98         checkState(State.CAN_COMMIT_COMPLETE);
99         this.callback = Preconditions.checkNotNull(newCallback);
100         state = State.PRE_COMMIT_PENDING;
101
102         if (nextFailure == null) {
103             dataTree.startPreCommit(this);
104         } else {
105             failedPreCommit(nextFailure);
106         }
107     }
108
109     @Override
110     public void abort(final FutureCallback<Void> abortCallback) {
111         if (!dataTree.startAbort(this)) {
112             abortCallback.onSuccess(null);
113             return;
114         }
115
116         candidate = null;
117         state = State.ABORTED;
118
119         final Optional<List<Future<Object>>> maybeAborts = userCohorts.abort();
120         if (!maybeAborts.isPresent()) {
121             abortCallback.onSuccess(null);
122             return;
123         }
124
125         final Future<Iterable<Object>> aborts = Futures.sequence(maybeAborts.get(), ExecutionContexts.global());
126         if (aborts.isCompleted()) {
127             abortCallback.onSuccess(null);
128             return;
129         }
130
131         aborts.onComplete(new OnComplete<Iterable<Object>>() {
132             @Override
133             public void onComplete(final Throwable failure, final Iterable<Object> objs) {
134                 if (failure != null) {
135                     abortCallback.onFailure(failure);
136                 } else {
137                     abortCallback.onSuccess(null);
138                 }
139             }
140         }, ExecutionContexts.global());
141     }
142
143     @Override
144     public void commit(final FutureCallback<UnsignedLong> newCallback) {
145         checkState(State.PRE_COMMIT_COMPLETE);
146         this.callback = Preconditions.checkNotNull(newCallback);
147         state = State.COMMIT_PENDING;
148
149         if (nextFailure == null) {
150             dataTree.startCommit(this, candidate);
151         } else {
152             failedCommit(nextFailure);
153         }
154     }
155
156     private <T> FutureCallback<T> switchState(final State newState) {
157         @SuppressWarnings("unchecked")
158         final FutureCallback<T> ret = (FutureCallback<T>) this.callback;
159         this.callback = null;
160         LOG.debug("Transaction {} changing state from {} to {}", transactionId, state, newState);
161         this.state = newState;
162         return ret;
163     }
164
165     void setNewCandidate(final DataTreeCandidateTip dataTreeCandidate) {
166         checkState(State.PRE_COMMIT_COMPLETE);
167         this.candidate = Verify.verifyNotNull(dataTreeCandidate);
168     }
169
170     void successfulCanCommit() {
171         switchState(State.CAN_COMMIT_COMPLETE).onSuccess(null);
172     }
173
174     void failedCanCommit(final Exception cause) {
175         switchState(State.FAILED).onFailure(cause);
176     }
177
178     /**
179      * Run user-defined canCommit and preCommit hooks. We want to run these before we initiate persistence so that
180      * any failure to validate is propagated before we record the transaction.
181      *
182      * @param dataTreeCandidate {@link DataTreeCandidate} under consideration
183      * @throws ExecutionException if the operation fails
184      * @throws TimeoutException if the operation times out
185      */
186     // FIXME: this should be asynchronous
187     void userPreCommit(final DataTreeCandidate dataTreeCandidate) throws ExecutionException, TimeoutException {
188         userCohorts.reset();
189         userCohorts.canCommit(dataTreeCandidate);
190         userCohorts.preCommit();
191     }
192
193     void successfulPreCommit(final DataTreeCandidateTip dataTreeCandidate) {
194         LOG.trace("Transaction {} prepared candidate {}", transaction, dataTreeCandidate);
195         this.candidate = Verify.verifyNotNull(dataTreeCandidate);
196         switchState(State.PRE_COMMIT_COMPLETE).onSuccess(dataTreeCandidate);
197     }
198
199     void failedPreCommit(final Exception cause) {
200         if (LOG.isTraceEnabled()) {
201             LOG.trace("Transaction {} failed to prepare", transaction, cause);
202         } else {
203             LOG.error("Transaction {} failed to prepare", transactionId, cause);
204         }
205
206         userCohorts.abort();
207         switchState(State.FAILED).onFailure(cause);
208     }
209
210     void successfulCommit(final UnsignedLong journalIndex) {
211         try {
212             userCohorts.commit();
213         } catch (TimeoutException | ExecutionException e) {
214             // We are probably dead, depending on what the cohorts end up doing
215             LOG.error("User cohorts failed to commit", e);
216         }
217
218         switchState(State.COMMITTED).onSuccess(journalIndex);
219     }
220
221     void failedCommit(final Exception cause) {
222         if (LOG.isTraceEnabled()) {
223             LOG.trace("Transaction {} failed to commit", transaction, cause);
224         } else {
225             LOG.error("Transaction failed to commit", cause);
226         }
227
228         userCohorts.abort();
229         switchState(State.FAILED).onFailure(cause);
230     }
231
232     @Override
233     public State getState() {
234         return state;
235     }
236
237     void reportFailure(final Exception cause) {
238         if (nextFailure == null) {
239             this.nextFailure = Preconditions.checkNotNull(cause);
240         } else {
241             LOG.debug("Transaction {} already has a set failure, not updating it", transactionId, cause);
242         }
243     }
244
245     @Override
246     public boolean isFailed() {
247         return state == State.FAILED || nextFailure != null;
248     }
249
250     @Override
251     ToStringHelper addToStringAttributes(final ToStringHelper toStringHelper) {
252         return super.addToStringAttributes(toStringHelper).add("nextFailure", nextFailure);
253     }
254 }