BUG-8219: Cleanup CompositeDataTreeCohort
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / main / java / org / opendaylight / controller / cluster / datastore / CompositeDataTreeCohort.java
1 /*
2  * Copyright (c) 2016 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.controller.cluster.datastore;
10
11 import akka.actor.ActorRef;
12 import akka.actor.Status;
13 import akka.actor.Status.Failure;
14 import akka.dispatch.ExecutionContexts;
15 import akka.dispatch.Futures;
16 import akka.dispatch.Recover;
17 import akka.pattern.Patterns;
18 import akka.util.Timeout;
19 import com.google.common.base.Preconditions;
20 import com.google.common.base.Throwables;
21 import com.google.common.collect.ImmutableList;
22 import com.google.common.collect.Lists;
23 import java.util.AbstractMap.SimpleImmutableEntry;
24 import java.util.ArrayList;
25 import java.util.Collection;
26 import java.util.Iterator;
27 import java.util.List;
28 import java.util.Map.Entry;
29 import java.util.Optional;
30 import java.util.concurrent.ExecutionException;
31 import java.util.concurrent.TimeoutException;
32 import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
33 import org.opendaylight.controller.cluster.datastore.DataTreeCohortActor.CanCommit;
34 import org.opendaylight.controller.cluster.datastore.DataTreeCohortActor.Success;
35 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidate;
36 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
37 import org.slf4j.Logger;
38 import org.slf4j.LoggerFactory;
39 import scala.concurrent.Await;
40 import scala.concurrent.Future;
41
42 /**
43  * Composite cohort, which coordinates multiple user-provided cohorts as if it was only one cohort.
44  * <p/>
45  * It tracks current operation and list of cohorts which successfuly finished previous phase in
46  * case, if abort is necessary to invoke it only on cohort steps which are still active.
47  *
48  */
49 class CompositeDataTreeCohort {
50     private static final Logger LOG = LoggerFactory.getLogger(CompositeDataTreeCohort.class);
51
52     private enum State {
53         /**
54          * Cohorts are idle, no messages were sent.
55          */
56         IDLE,
57         /**
58          * CanCommit message was sent to all participating cohorts.
59          */
60         CAN_COMMIT_SENT,
61         /**
62          * Successful canCommit responses were received from every participating cohort.
63          */
64         CAN_COMMIT_SUCCESSFUL,
65         /**
66          * PreCommit message was sent to all participating cohorts.
67          */
68         PRE_COMMIT_SENT,
69         /**
70          * Successful preCommit responses were received from every participating cohort.
71          */
72         PRE_COMMIT_SUCCESSFUL,
73         /**
74          * Commit message was send to all participating cohorts.
75          */
76         COMMIT_SENT,
77         /**
78          * Successful commit responses were received from all participating cohorts.
79          */
80         COMMITED,
81         /**
82          * Some of cohorts responded back with unsuccessful message.
83          */
84         FAILED,
85         /**
86          * Abort message was send to all cohorts which responded with success previously.
87          */
88         ABORTED
89     }
90
91     static final Recover<Object> EXCEPTION_TO_MESSAGE = new Recover<Object>() {
92         @Override
93         public Failure recover(final Throwable error) {
94             return new Failure(error);
95         }
96     };
97
98     private final DataTreeCohortActorRegistry registry;
99     private final TransactionIdentifier txId;
100     private final SchemaContext schema;
101     private final Timeout timeout;
102
103     private List<Success> successfulFromPrevious;
104     private State state = State.IDLE;
105
106     CompositeDataTreeCohort(final DataTreeCohortActorRegistry registry, final TransactionIdentifier transactionID,
107         final SchemaContext schema, final Timeout timeout) {
108         this.registry = Preconditions.checkNotNull(registry);
109         this.txId = Preconditions.checkNotNull(transactionID);
110         this.schema = Preconditions.checkNotNull(schema);
111         this.timeout = Preconditions.checkNotNull(timeout);
112     }
113
114     void reset() {
115         switch (state) {
116             case CAN_COMMIT_SENT:
117             case CAN_COMMIT_SUCCESSFUL:
118             case PRE_COMMIT_SENT:
119             case PRE_COMMIT_SUCCESSFUL:
120             case COMMIT_SENT:
121                 abort();
122                 break;
123             case ABORTED:
124             case COMMITED:
125             case FAILED:
126             case IDLE:
127                 break;
128             default:
129                 throw new IllegalStateException("Unhandled state " + state);
130         }
131
132         successfulFromPrevious = null;
133         state = State.IDLE;
134     }
135
136     void canCommit(final DataTreeCandidate tip) throws ExecutionException, TimeoutException {
137         LOG.debug("{}: canCommit - candidate: {}", txId, tip);
138
139         final List<CanCommit> messages = registry.createCanCommitMessages(txId, tip, schema);
140         LOG.debug("{}: canCommit - messages: {}", txId, messages);
141         if (messages.isEmpty()) {
142             successfulFromPrevious = ImmutableList.of();
143             changeStateFrom(State.IDLE, State.CAN_COMMIT_SUCCESSFUL);
144             return;
145         }
146
147         final List<Entry<ActorRef, Future<Object>>> futures = new ArrayList<>(messages.size());
148         for (CanCommit message : messages) {
149             final ActorRef actor = message.getCohort();
150             final Future<Object> future = Patterns.ask(actor, message, timeout).recover(EXCEPTION_TO_MESSAGE,
151                 ExecutionContexts.global());
152             LOG.trace("{}: requesting canCommit from {}", txId, actor);
153             futures.add(new SimpleImmutableEntry<>(actor, future));
154         }
155
156         changeStateFrom(State.IDLE, State.CAN_COMMIT_SENT);
157         processResponses(futures, State.CAN_COMMIT_SENT, State.CAN_COMMIT_SUCCESSFUL);
158     }
159
160     void preCommit() throws ExecutionException, TimeoutException {
161         LOG.debug("{}: preCommit - successfulFromPrevious: {}", txId, successfulFromPrevious);
162
163         Preconditions.checkState(successfulFromPrevious != null);
164         if (successfulFromPrevious.isEmpty()) {
165             changeStateFrom(State.CAN_COMMIT_SUCCESSFUL, State.PRE_COMMIT_SUCCESSFUL);
166             return;
167         }
168
169         final List<Entry<ActorRef, Future<Object>>> futures = sendMessageToSuccessful(
170             new DataTreeCohortActor.PreCommit(txId));
171         changeStateFrom(State.CAN_COMMIT_SUCCESSFUL, State.PRE_COMMIT_SENT);
172         processResponses(futures, State.PRE_COMMIT_SENT, State.PRE_COMMIT_SUCCESSFUL);
173     }
174
175     void commit() throws ExecutionException, TimeoutException {
176         LOG.debug("{}: commit - successfulFromPrevious: {}", txId, successfulFromPrevious);
177         if (successfulFromPrevious.isEmpty()) {
178             changeStateFrom(State.PRE_COMMIT_SUCCESSFUL, State.COMMITED);
179             return;
180         }
181
182         Preconditions.checkState(successfulFromPrevious != null);
183         final List<Entry<ActorRef, Future<Object>>> futures = sendMessageToSuccessful(
184             new DataTreeCohortActor.Commit(txId));
185         changeStateFrom(State.PRE_COMMIT_SUCCESSFUL, State.COMMIT_SENT);
186         processResponses(futures, State.COMMIT_SENT, State.COMMITED);
187     }
188
189     Optional<List<Future<Object>>> abort() {
190         LOG.debug("{}: abort - successfulFromPrevious: {}", txId, successfulFromPrevious);
191
192         state = State.ABORTED;
193         if (successfulFromPrevious == null || successfulFromPrevious.isEmpty()) {
194             return Optional.empty();
195         }
196
197         final DataTreeCohortActor.Abort message = new DataTreeCohortActor.Abort(txId);
198         final List<Future<Object>> futures = new ArrayList<>(successfulFromPrevious.size());
199         for (Success s : successfulFromPrevious) {
200             futures.add(Patterns.ask(s.getCohort(), message, timeout));
201         }
202         return Optional.of(futures);
203     }
204
205     private List<Entry<ActorRef, Future<Object>>> sendMessageToSuccessful(final Object message) {
206         LOG.debug("{}: sendMesageToSuccessful: {}", txId, message);
207
208         final List<Entry<ActorRef, Future<Object>>> ret = new ArrayList<>(successfulFromPrevious.size());
209         for (Success s : successfulFromPrevious) {
210             final ActorRef actor = s.getCohort();
211             ret.add(new SimpleImmutableEntry<>(actor, Patterns.ask(actor, message, timeout)));
212         }
213         return ret;
214     }
215
216     @SuppressWarnings("checkstyle:IllegalCatch")
217     private void processResponses(final List<Entry<ActorRef, Future<Object>>> futures, final State currentState,
218             final State afterState) throws TimeoutException, ExecutionException {
219         LOG.debug("{}: processResponses - currentState: {}, afterState: {}", txId, currentState, afterState);
220
221         final Iterable<Object> results;
222         try {
223             results = Await.result(Futures.sequence(Lists.transform(futures, e -> e.getValue()),
224                 ExecutionContexts.global()), timeout.duration());
225         } catch (TimeoutException e) {
226             successfulFromPrevious = null;
227             LOG.debug("{}: processResponses - error from Future", txId, e);
228
229             for (Entry<ActorRef, Future<Object>> f : futures) {
230                 if (!f.getValue().isCompleted()) {
231                     LOG.info("{}: actor {} failed to respond", txId, f.getKey());
232                 }
233             }
234             throw e;
235         } catch (ExecutionException e) {
236             successfulFromPrevious = null;
237             LOG.debug("{}: processResponses - error from Future", txId, e);
238             throw e;
239         } catch (Exception e) {
240             successfulFromPrevious = null;
241             LOG.debug("{}: processResponses - error from Future", txId, e);
242             throw new ExecutionException(e);
243         }
244
245         final Collection<Failure> failed = new ArrayList<>(1);
246         final List<Success> successful = new ArrayList<>(futures.size());
247         for (Object result : results) {
248             if (result instanceof DataTreeCohortActor.Success) {
249                 successful.add((Success) result);
250             } else if (result instanceof Status.Failure) {
251                 failed.add((Failure) result);
252             } else {
253                 LOG.warn("{}: unrecognized response {}, ignoring it", result);
254             }
255         }
256
257         LOG.debug("{}: processResponses - successful: {}, failed: {}", txId, successful, failed);
258
259         successfulFromPrevious = successful;
260         if (!failed.isEmpty()) {
261             changeStateFrom(currentState, State.FAILED);
262             final Iterator<Failure> it = failed.iterator();
263             final Throwable firstEx = it.next().cause();
264             while (it.hasNext()) {
265                 firstEx.addSuppressed(it.next().cause());
266             }
267             Throwables.propagateIfInstanceOf(firstEx, ExecutionException.class);
268             Throwables.propagateIfInstanceOf(firstEx, TimeoutException.class);
269             throw new ExecutionException(firstEx);
270         }
271         changeStateFrom(currentState, afterState);
272     }
273
274     void changeStateFrom(final State expected, final State followup) {
275         Preconditions.checkState(state == expected);
276         state = followup;
277     }
278 }