Bump upstream SNAPSHOTS
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / main / java / org / opendaylight / controller / cluster / datastore / CompositeDataTreeCohort.java
1 /*
2  * Copyright (c) 2016 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.controller.cluster.datastore;
9
10 import static com.google.common.base.Preconditions.checkState;
11 import static java.util.Objects.requireNonNull;
12
13 import akka.actor.ActorRef;
14 import akka.actor.Status;
15 import akka.actor.Status.Failure;
16 import akka.dispatch.ExecutionContexts;
17 import akka.dispatch.Futures;
18 import akka.dispatch.OnComplete;
19 import akka.dispatch.Recover;
20 import akka.pattern.Patterns;
21 import akka.util.Timeout;
22 import com.google.common.collect.Lists;
23 import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
24 import java.util.AbstractMap.SimpleImmutableEntry;
25 import java.util.ArrayList;
26 import java.util.Collection;
27 import java.util.Iterator;
28 import java.util.List;
29 import java.util.Map.Entry;
30 import java.util.Optional;
31 import java.util.concurrent.CompletableFuture;
32 import java.util.concurrent.CompletionStage;
33 import java.util.concurrent.Executor;
34 import org.eclipse.jdt.annotation.NonNull;
35 import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
36 import org.opendaylight.controller.cluster.datastore.DataTreeCohortActor.CanCommit;
37 import org.opendaylight.controller.cluster.datastore.DataTreeCohortActor.Success;
38 import org.opendaylight.yangtools.yang.data.tree.api.DataTreeCandidate;
39 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
40 import org.slf4j.Logger;
41 import org.slf4j.LoggerFactory;
42 import scala.compat.java8.FutureConverters;
43 import scala.concurrent.Future;
44
45 /**
46  * Composite cohort, which coordinates multiple user-provided cohorts as if it was only one cohort.
47  * <p/>
48  * It tracks current operation and list of cohorts which successfuly finished previous phase in
49  * case, if abort is necessary to invoke it only on cohort steps which are still active.
50  */
51 class CompositeDataTreeCohort {
52     private static final Logger LOG = LoggerFactory.getLogger(CompositeDataTreeCohort.class);
53
54     private enum State {
55         /**
56          * Cohorts are idle, no messages were sent.
57          */
58         IDLE,
59         /**
60          * CanCommit message was sent to all participating cohorts.
61          */
62         CAN_COMMIT_SENT,
63         /**
64          * Successful canCommit responses were received from every participating cohort.
65          */
66         CAN_COMMIT_SUCCESSFUL,
67         /**
68          * PreCommit message was sent to all participating cohorts.
69          */
70         PRE_COMMIT_SENT,
71         /**
72          * Successful preCommit responses were received from every participating cohort.
73          */
74         PRE_COMMIT_SUCCESSFUL,
75         /**
76          * Commit message was send to all participating cohorts.
77          */
78         COMMIT_SENT,
79         /**
80          * Successful commit responses were received from all participating cohorts.
81          */
82         COMMITED,
83         /**
84          * Some of cohorts responded back with unsuccessful message.
85          */
86         FAILED,
87         /**
88          * Abort message was send to all cohorts which responded with success previously.
89          */
90         ABORTED
91     }
92
93     static final Recover<Object> EXCEPTION_TO_MESSAGE = new Recover<>() {
94         @Override
95         public Failure recover(final Throwable error) {
96             return new Failure(error);
97         }
98     };
99
100     private final DataTreeCohortActorRegistry registry;
101     private final TransactionIdentifier txId;
102     private final SchemaContext schema;
103     private final Executor callbackExecutor;
104     private final Timeout timeout;
105
106     private @NonNull List<Success> successfulFromPrevious = List.of();
107     private State state = State.IDLE;
108
109     CompositeDataTreeCohort(final DataTreeCohortActorRegistry registry, final TransactionIdentifier transactionID,
110         final SchemaContext schema, final Executor callbackExecutor, final Timeout timeout) {
111         this.registry = requireNonNull(registry);
112         txId = requireNonNull(transactionID);
113         this.schema = requireNonNull(schema);
114         this.callbackExecutor = requireNonNull(callbackExecutor);
115         this.timeout = requireNonNull(timeout);
116     }
117
118     void reset() {
119         switch (state) {
120             case CAN_COMMIT_SENT:
121             case CAN_COMMIT_SUCCESSFUL:
122             case PRE_COMMIT_SENT:
123             case PRE_COMMIT_SUCCESSFUL:
124             case COMMIT_SENT:
125                 abort();
126                 break;
127             case ABORTED:
128             case COMMITED:
129             case FAILED:
130             case IDLE:
131                 break;
132             default:
133                 throw new IllegalStateException("Unhandled state " + state);
134         }
135
136         successfulFromPrevious = List.of();
137         state = State.IDLE;
138     }
139
140     Optional<CompletionStage<Void>> canCommit(final DataTreeCandidate tip) {
141         if (LOG.isTraceEnabled()) {
142             LOG.trace("{}: canCommit - candidate: {}", txId, tip);
143         } else {
144             LOG.debug("{}: canCommit - candidate rootPath: {}", txId, tip.getRootPath());
145         }
146
147         final List<CanCommit> messages = registry.createCanCommitMessages(txId, tip, schema);
148         LOG.debug("{}: canCommit - messages: {}", txId, messages);
149         if (messages.isEmpty()) {
150             successfulFromPrevious = List.of();
151             changeStateFrom(State.IDLE, State.CAN_COMMIT_SUCCESSFUL);
152             return Optional.empty();
153         }
154
155         final List<Entry<ActorRef, Future<Object>>> futures = new ArrayList<>(messages.size());
156         for (CanCommit message : messages) {
157             final ActorRef actor = message.getCohort();
158             final Future<Object> future = Patterns.ask(actor, message, timeout).recover(EXCEPTION_TO_MESSAGE,
159                 ExecutionContexts.global());
160             LOG.trace("{}: requesting canCommit from {}", txId, actor);
161             futures.add(new SimpleImmutableEntry<>(actor, future));
162         }
163
164         changeStateFrom(State.IDLE, State.CAN_COMMIT_SENT);
165         return Optional.of(processResponses(futures, State.CAN_COMMIT_SENT, State.CAN_COMMIT_SUCCESSFUL));
166     }
167
168     Optional<CompletionStage<Void>> preCommit() {
169         LOG.debug("{}: preCommit - successfulFromPrevious: {}", txId, successfulFromPrevious);
170
171         if (successfulFromPrevious.isEmpty()) {
172             changeStateFrom(State.CAN_COMMIT_SUCCESSFUL, State.PRE_COMMIT_SUCCESSFUL);
173             return Optional.empty();
174         }
175
176         final List<Entry<ActorRef, Future<Object>>> futures = sendMessageToSuccessful(
177             new DataTreeCohortActor.PreCommit(txId));
178         changeStateFrom(State.CAN_COMMIT_SUCCESSFUL, State.PRE_COMMIT_SENT);
179         return Optional.of(processResponses(futures, State.PRE_COMMIT_SENT, State.PRE_COMMIT_SUCCESSFUL));
180     }
181
182     Optional<CompletionStage<Void>> commit() {
183         LOG.debug("{}: commit - successfulFromPrevious: {}", txId, successfulFromPrevious);
184         if (successfulFromPrevious.isEmpty()) {
185             changeStateFrom(State.PRE_COMMIT_SUCCESSFUL, State.COMMITED);
186             return Optional.empty();
187         }
188
189         final List<Entry<ActorRef, Future<Object>>> futures = sendMessageToSuccessful(
190             new DataTreeCohortActor.Commit(txId));
191         changeStateFrom(State.PRE_COMMIT_SUCCESSFUL, State.COMMIT_SENT);
192         return Optional.of(processResponses(futures, State.COMMIT_SENT, State.COMMITED));
193     }
194
195     Optional<CompletionStage<?>> abort() {
196         LOG.debug("{}: abort - successfulFromPrevious: {}", txId, successfulFromPrevious);
197
198         state = State.ABORTED;
199         if (successfulFromPrevious.isEmpty()) {
200             return Optional.empty();
201         }
202
203         final DataTreeCohortActor.Abort message = new DataTreeCohortActor.Abort(txId);
204         final List<Future<Object>> futures = new ArrayList<>(successfulFromPrevious.size());
205         for (Success s : successfulFromPrevious) {
206             futures.add(Patterns.ask(s.getCohort(), message, timeout));
207         }
208
209         return Optional.of(FutureConverters.toJava(Futures.sequence(futures, ExecutionContexts.global())));
210     }
211
212     private List<Entry<ActorRef, Future<Object>>> sendMessageToSuccessful(final Object message) {
213         LOG.debug("{}: sendMesageToSuccessful: {}", txId, message);
214
215         final List<Entry<ActorRef, Future<Object>>> ret = new ArrayList<>(successfulFromPrevious.size());
216         for (Success s : successfulFromPrevious) {
217             final ActorRef actor = s.getCohort();
218             ret.add(new SimpleImmutableEntry<>(actor, Patterns.ask(actor, message, timeout)));
219         }
220         return ret;
221     }
222
223     private @NonNull CompletionStage<Void> processResponses(final List<Entry<ActorRef, Future<Object>>> futures,
224             final State currentState, final State afterState) {
225         LOG.debug("{}: processResponses - currentState: {}, afterState: {}", txId, currentState, afterState);
226         final CompletableFuture<Void> returnFuture = new CompletableFuture<>();
227         Future<Iterable<Object>> aggregateFuture = Futures.sequence(Lists.transform(futures, Entry::getValue),
228                 ExecutionContexts.global());
229
230         aggregateFuture.onComplete(new OnComplete<Iterable<Object>>() {
231             @Override
232             public void onComplete(final Throwable failure, final Iterable<Object> results) {
233                 callbackExecutor.execute(
234                     () -> processResponses(failure, results, currentState, afterState, returnFuture));
235             }
236         }, ExecutionContexts.global());
237
238         return returnFuture;
239     }
240
241     // FB issues violation for passing null to CompletableFuture#complete but it is valid and necessary when the
242     // generic type is Void.
243     @SuppressFBWarnings(value = "NP_NONNULL_PARAM_VIOLATION")
244     private void processResponses(final Throwable failure, final Iterable<Object> results,
245             final State currentState, final State afterState, final CompletableFuture<Void> resultFuture) {
246         if (failure != null) {
247             successfulFromPrevious = List.of();
248             resultFuture.completeExceptionally(failure);
249             return;
250         }
251
252         final Collection<Failure> failed = new ArrayList<>(1);
253         final List<Success> successful = new ArrayList<>();
254         for (Object result : results) {
255             if (result instanceof DataTreeCohortActor.Success) {
256                 successful.add((Success) result);
257             } else if (result instanceof Status.Failure) {
258                 failed.add((Failure) result);
259             } else {
260                 LOG.warn("{}: unrecognized response {}, ignoring it", txId, result);
261             }
262         }
263
264         LOG.debug("{}: processResponses - successful: {}, failed: {}", txId, successful, failed);
265
266         if (!failed.isEmpty()) {
267             changeStateFrom(currentState, State.FAILED);
268             final Iterator<Failure> it = failed.iterator();
269             final Throwable firstEx = it.next().cause();
270             while (it.hasNext()) {
271                 firstEx.addSuppressed(it.next().cause());
272             }
273
274             successfulFromPrevious = List.of();
275             resultFuture.completeExceptionally(firstEx);
276         } else {
277             successfulFromPrevious = successful;
278             changeStateFrom(currentState, afterState);
279             resultFuture.complete(null);
280         }
281     }
282
283     void changeStateFrom(final State expected, final State followup) {
284         checkState(state == expected);
285         state = followup;
286     }
287 }