Bug 8424: Don't output data tree and tree candidates wih debug
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / main / java / org / opendaylight / controller / cluster / datastore / CompositeDataTreeCohort.java
1 /*
2  * Copyright (c) 2016 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.controller.cluster.datastore;
10
11 import akka.actor.ActorRef;
12 import akka.actor.Status;
13 import akka.actor.Status.Failure;
14 import akka.dispatch.ExecutionContexts;
15 import akka.dispatch.Futures;
16 import akka.dispatch.Recover;
17 import akka.pattern.Patterns;
18 import akka.util.Timeout;
19 import com.google.common.base.Preconditions;
20 import com.google.common.base.Throwables;
21 import com.google.common.collect.ImmutableList;
22 import com.google.common.collect.Lists;
23 import java.util.AbstractMap.SimpleImmutableEntry;
24 import java.util.ArrayList;
25 import java.util.Collection;
26 import java.util.Iterator;
27 import java.util.List;
28 import java.util.Map.Entry;
29 import java.util.Optional;
30 import java.util.concurrent.ExecutionException;
31 import java.util.concurrent.TimeoutException;
32 import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
33 import org.opendaylight.controller.cluster.datastore.DataTreeCohortActor.CanCommit;
34 import org.opendaylight.controller.cluster.datastore.DataTreeCohortActor.Success;
35 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidate;
36 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
37 import org.slf4j.Logger;
38 import org.slf4j.LoggerFactory;
39 import scala.concurrent.Await;
40 import scala.concurrent.Future;
41
42 /**
43  * Composite cohort, which coordinates multiple user-provided cohorts as if it was only one cohort.
44  * <p/>
45  * It tracks current operation and list of cohorts which successfuly finished previous phase in
46  * case, if abort is necessary to invoke it only on cohort steps which are still active.
47  *
48  */
49 class CompositeDataTreeCohort {
50     private static final Logger LOG = LoggerFactory.getLogger(CompositeDataTreeCohort.class);
51
52     private enum State {
53         /**
54          * Cohorts are idle, no messages were sent.
55          */
56         IDLE,
57         /**
58          * CanCommit message was sent to all participating cohorts.
59          */
60         CAN_COMMIT_SENT,
61         /**
62          * Successful canCommit responses were received from every participating cohort.
63          */
64         CAN_COMMIT_SUCCESSFUL,
65         /**
66          * PreCommit message was sent to all participating cohorts.
67          */
68         PRE_COMMIT_SENT,
69         /**
70          * Successful preCommit responses were received from every participating cohort.
71          */
72         PRE_COMMIT_SUCCESSFUL,
73         /**
74          * Commit message was send to all participating cohorts.
75          */
76         COMMIT_SENT,
77         /**
78          * Successful commit responses were received from all participating cohorts.
79          */
80         COMMITED,
81         /**
82          * Some of cohorts responded back with unsuccessful message.
83          */
84         FAILED,
85         /**
86          * Abort message was send to all cohorts which responded with success previously.
87          */
88         ABORTED
89     }
90
91     static final Recover<Object> EXCEPTION_TO_MESSAGE = new Recover<Object>() {
92         @Override
93         public Failure recover(final Throwable error) {
94             return new Failure(error);
95         }
96     };
97
98     private final DataTreeCohortActorRegistry registry;
99     private final TransactionIdentifier txId;
100     private final SchemaContext schema;
101     private final Timeout timeout;
102
103     private List<Success> successfulFromPrevious;
104     private State state = State.IDLE;
105
106     CompositeDataTreeCohort(final DataTreeCohortActorRegistry registry, final TransactionIdentifier transactionID,
107         final SchemaContext schema, final Timeout timeout) {
108         this.registry = Preconditions.checkNotNull(registry);
109         this.txId = Preconditions.checkNotNull(transactionID);
110         this.schema = Preconditions.checkNotNull(schema);
111         this.timeout = Preconditions.checkNotNull(timeout);
112     }
113
114     void reset() {
115         switch (state) {
116             case CAN_COMMIT_SENT:
117             case CAN_COMMIT_SUCCESSFUL:
118             case PRE_COMMIT_SENT:
119             case PRE_COMMIT_SUCCESSFUL:
120             case COMMIT_SENT:
121                 abort();
122                 break;
123             case ABORTED:
124             case COMMITED:
125             case FAILED:
126             case IDLE:
127                 break;
128             default:
129                 throw new IllegalStateException("Unhandled state " + state);
130         }
131
132         successfulFromPrevious = null;
133         state = State.IDLE;
134     }
135
136     void canCommit(final DataTreeCandidate tip) throws ExecutionException, TimeoutException {
137         if (LOG.isTraceEnabled()) {
138             LOG.trace("{}: canCommit - candidate: {}", txId, tip);
139         } else {
140             LOG.debug("{}: canCommit - candidate rootPath: {}", txId, tip.getRootPath());
141         }
142
143         final List<CanCommit> messages = registry.createCanCommitMessages(txId, tip, schema);
144         LOG.debug("{}: canCommit - messages: {}", txId, messages);
145         if (messages.isEmpty()) {
146             successfulFromPrevious = ImmutableList.of();
147             changeStateFrom(State.IDLE, State.CAN_COMMIT_SUCCESSFUL);
148             return;
149         }
150
151         final List<Entry<ActorRef, Future<Object>>> futures = new ArrayList<>(messages.size());
152         for (CanCommit message : messages) {
153             final ActorRef actor = message.getCohort();
154             final Future<Object> future = Patterns.ask(actor, message, timeout).recover(EXCEPTION_TO_MESSAGE,
155                 ExecutionContexts.global());
156             LOG.trace("{}: requesting canCommit from {}", txId, actor);
157             futures.add(new SimpleImmutableEntry<>(actor, future));
158         }
159
160         changeStateFrom(State.IDLE, State.CAN_COMMIT_SENT);
161         processResponses(futures, State.CAN_COMMIT_SENT, State.CAN_COMMIT_SUCCESSFUL);
162     }
163
164     void preCommit() throws ExecutionException, TimeoutException {
165         LOG.debug("{}: preCommit - successfulFromPrevious: {}", txId, successfulFromPrevious);
166
167         Preconditions.checkState(successfulFromPrevious != null);
168         if (successfulFromPrevious.isEmpty()) {
169             changeStateFrom(State.CAN_COMMIT_SUCCESSFUL, State.PRE_COMMIT_SUCCESSFUL);
170             return;
171         }
172
173         final List<Entry<ActorRef, Future<Object>>> futures = sendMessageToSuccessful(
174             new DataTreeCohortActor.PreCommit(txId));
175         changeStateFrom(State.CAN_COMMIT_SUCCESSFUL, State.PRE_COMMIT_SENT);
176         processResponses(futures, State.PRE_COMMIT_SENT, State.PRE_COMMIT_SUCCESSFUL);
177     }
178
179     void commit() throws ExecutionException, TimeoutException {
180         LOG.debug("{}: commit - successfulFromPrevious: {}", txId, successfulFromPrevious);
181         if (successfulFromPrevious.isEmpty()) {
182             changeStateFrom(State.PRE_COMMIT_SUCCESSFUL, State.COMMITED);
183             return;
184         }
185
186         Preconditions.checkState(successfulFromPrevious != null);
187         final List<Entry<ActorRef, Future<Object>>> futures = sendMessageToSuccessful(
188             new DataTreeCohortActor.Commit(txId));
189         changeStateFrom(State.PRE_COMMIT_SUCCESSFUL, State.COMMIT_SENT);
190         processResponses(futures, State.COMMIT_SENT, State.COMMITED);
191     }
192
193     Optional<List<Future<Object>>> abort() {
194         LOG.debug("{}: abort - successfulFromPrevious: {}", txId, successfulFromPrevious);
195
196         state = State.ABORTED;
197         if (successfulFromPrevious == null || successfulFromPrevious.isEmpty()) {
198             return Optional.empty();
199         }
200
201         final DataTreeCohortActor.Abort message = new DataTreeCohortActor.Abort(txId);
202         final List<Future<Object>> futures = new ArrayList<>(successfulFromPrevious.size());
203         for (Success s : successfulFromPrevious) {
204             futures.add(Patterns.ask(s.getCohort(), message, timeout));
205         }
206         return Optional.of(futures);
207     }
208
209     private List<Entry<ActorRef, Future<Object>>> sendMessageToSuccessful(final Object message) {
210         LOG.debug("{}: sendMesageToSuccessful: {}", txId, message);
211
212         final List<Entry<ActorRef, Future<Object>>> ret = new ArrayList<>(successfulFromPrevious.size());
213         for (Success s : successfulFromPrevious) {
214             final ActorRef actor = s.getCohort();
215             ret.add(new SimpleImmutableEntry<>(actor, Patterns.ask(actor, message, timeout)));
216         }
217         return ret;
218     }
219
220     @SuppressWarnings("checkstyle:IllegalCatch")
221     private void processResponses(final List<Entry<ActorRef, Future<Object>>> futures, final State currentState,
222             final State afterState) throws TimeoutException, ExecutionException {
223         LOG.debug("{}: processResponses - currentState: {}, afterState: {}", txId, currentState, afterState);
224
225         final Iterable<Object> results;
226         try {
227             results = Await.result(Futures.sequence(Lists.transform(futures, e -> e.getValue()),
228                 ExecutionContexts.global()), timeout.duration());
229         } catch (TimeoutException e) {
230             successfulFromPrevious = null;
231             LOG.debug("{}: processResponses - error from Future", txId, e);
232
233             for (Entry<ActorRef, Future<Object>> f : futures) {
234                 if (!f.getValue().isCompleted()) {
235                     LOG.info("{}: actor {} failed to respond", txId, f.getKey());
236                 }
237             }
238             throw e;
239         } catch (ExecutionException e) {
240             successfulFromPrevious = null;
241             LOG.debug("{}: processResponses - error from Future", txId, e);
242             throw e;
243         } catch (Exception e) {
244             successfulFromPrevious = null;
245             LOG.debug("{}: processResponses - error from Future", txId, e);
246             throw new ExecutionException(e);
247         }
248
249         final Collection<Failure> failed = new ArrayList<>(1);
250         final List<Success> successful = new ArrayList<>(futures.size());
251         for (Object result : results) {
252             if (result instanceof DataTreeCohortActor.Success) {
253                 successful.add((Success) result);
254             } else if (result instanceof Status.Failure) {
255                 failed.add((Failure) result);
256             } else {
257                 LOG.warn("{}: unrecognized response {}, ignoring it", result);
258             }
259         }
260
261         LOG.debug("{}: processResponses - successful: {}, failed: {}", txId, successful, failed);
262
263         successfulFromPrevious = successful;
264         if (!failed.isEmpty()) {
265             changeStateFrom(currentState, State.FAILED);
266             final Iterator<Failure> it = failed.iterator();
267             final Throwable firstEx = it.next().cause();
268             while (it.hasNext()) {
269                 firstEx.addSuppressed(it.next().cause());
270             }
271             Throwables.propagateIfInstanceOf(firstEx, ExecutionException.class);
272             Throwables.propagateIfInstanceOf(firstEx, TimeoutException.class);
273             throw new ExecutionException(firstEx);
274         }
275         changeStateFrom(currentState, afterState);
276     }
277
278     void changeStateFrom(final State expected, final State followup) {
279         Preconditions.checkState(state == expected);
280         state = followup;
281     }
282 }