NormalizedNodeAggregator should also report empty
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / main / java / org / opendaylight / controller / cluster / datastore / CompositeDataTreeCohort.java
1 /*
2  * Copyright (c) 2016 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.controller.cluster.datastore;
9
10 import static com.google.common.base.Preconditions.checkState;
11 import static java.util.Objects.requireNonNull;
12
13 import akka.actor.ActorRef;
14 import akka.actor.Status;
15 import akka.actor.Status.Failure;
16 import akka.dispatch.ExecutionContexts;
17 import akka.dispatch.Futures;
18 import akka.dispatch.OnComplete;
19 import akka.dispatch.Recover;
20 import akka.pattern.Patterns;
21 import akka.util.Timeout;
22 import com.google.common.collect.Lists;
23 import java.util.AbstractMap.SimpleImmutableEntry;
24 import java.util.ArrayList;
25 import java.util.Collection;
26 import java.util.Iterator;
27 import java.util.List;
28 import java.util.Map.Entry;
29 import java.util.Optional;
30 import java.util.concurrent.CompletableFuture;
31 import java.util.concurrent.CompletionStage;
32 import java.util.concurrent.Executor;
33 import org.eclipse.jdt.annotation.NonNull;
34 import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
35 import org.opendaylight.controller.cluster.datastore.DataTreeCohortActor.CanCommit;
36 import org.opendaylight.controller.cluster.datastore.DataTreeCohortActor.Success;
37 import org.opendaylight.yangtools.yang.common.Empty;
38 import org.opendaylight.yangtools.yang.data.tree.api.DataTreeCandidate;
39 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
40 import org.slf4j.Logger;
41 import org.slf4j.LoggerFactory;
42 import scala.compat.java8.FutureConverters;
43 import scala.concurrent.Future;
44
45 /**
46  * Composite cohort, which coordinates multiple user-provided cohorts as if it was only one cohort.
47  * <p/>
48  * It tracks current operation and list of cohorts which successfuly finished previous phase in
49  * case, if abort is necessary to invoke it only on cohort steps which are still active.
50  */
51 class CompositeDataTreeCohort {
52     private static final Logger LOG = LoggerFactory.getLogger(CompositeDataTreeCohort.class);
53
54     private enum State {
55         /**
56          * Cohorts are idle, no messages were sent.
57          */
58         IDLE,
59         /**
60          * CanCommit message was sent to all participating cohorts.
61          */
62         CAN_COMMIT_SENT,
63         /**
64          * Successful canCommit responses were received from every participating cohort.
65          */
66         CAN_COMMIT_SUCCESSFUL,
67         /**
68          * PreCommit message was sent to all participating cohorts.
69          */
70         PRE_COMMIT_SENT,
71         /**
72          * Successful preCommit responses were received from every participating cohort.
73          */
74         PRE_COMMIT_SUCCESSFUL,
75         /**
76          * Commit message was send to all participating cohorts.
77          */
78         COMMIT_SENT,
79         /**
80          * Successful commit responses were received from all participating cohorts.
81          */
82         COMMITED,
83         /**
84          * Some of cohorts responded back with unsuccessful message.
85          */
86         FAILED,
87         /**
88          * Abort message was send to all cohorts which responded with success previously.
89          */
90         ABORTED
91     }
92
93     static final Recover<Object> EXCEPTION_TO_MESSAGE = new Recover<>() {
94         @Override
95         public Failure recover(final Throwable error) {
96             return new Failure(error);
97         }
98     };
99
100     private final DataTreeCohortActorRegistry registry;
101     private final TransactionIdentifier txId;
102     private final SchemaContext schema;
103     private final Executor callbackExecutor;
104     private final Timeout timeout;
105
106     private @NonNull List<Success> successfulFromPrevious = List.of();
107     private State state = State.IDLE;
108
109     CompositeDataTreeCohort(final DataTreeCohortActorRegistry registry, final TransactionIdentifier transactionID,
110         final SchemaContext schema, final Executor callbackExecutor, final Timeout timeout) {
111         this.registry = requireNonNull(registry);
112         txId = requireNonNull(transactionID);
113         this.schema = requireNonNull(schema);
114         this.callbackExecutor = requireNonNull(callbackExecutor);
115         this.timeout = requireNonNull(timeout);
116     }
117
118     void reset() {
119         switch (state) {
120             case CAN_COMMIT_SENT:
121             case CAN_COMMIT_SUCCESSFUL:
122             case PRE_COMMIT_SENT:
123             case PRE_COMMIT_SUCCESSFUL:
124             case COMMIT_SENT:
125                 abort();
126                 break;
127             case ABORTED:
128             case COMMITED:
129             case FAILED:
130             case IDLE:
131                 break;
132             default:
133                 throw new IllegalStateException("Unhandled state " + state);
134         }
135
136         successfulFromPrevious = List.of();
137         state = State.IDLE;
138     }
139
140     Optional<CompletionStage<Empty>> canCommit(final DataTreeCandidate tip) {
141         if (LOG.isTraceEnabled()) {
142             LOG.trace("{}: canCommit - candidate: {}", txId, tip);
143         } else {
144             LOG.debug("{}: canCommit - candidate rootPath: {}", txId, tip.getRootPath());
145         }
146
147         final List<CanCommit> messages = registry.createCanCommitMessages(txId, tip, schema);
148         LOG.debug("{}: canCommit - messages: {}", txId, messages);
149         if (messages.isEmpty()) {
150             successfulFromPrevious = List.of();
151             changeStateFrom(State.IDLE, State.CAN_COMMIT_SUCCESSFUL);
152             return Optional.empty();
153         }
154
155         final List<Entry<ActorRef, Future<Object>>> futures = new ArrayList<>(messages.size());
156         for (CanCommit message : messages) {
157             final ActorRef actor = message.getCohort();
158             final Future<Object> future = Patterns.ask(actor, message, timeout).recover(EXCEPTION_TO_MESSAGE,
159                 ExecutionContexts.global());
160             LOG.trace("{}: requesting canCommit from {}", txId, actor);
161             futures.add(new SimpleImmutableEntry<>(actor, future));
162         }
163
164         changeStateFrom(State.IDLE, State.CAN_COMMIT_SENT);
165         return Optional.of(processResponses(futures, State.CAN_COMMIT_SENT, State.CAN_COMMIT_SUCCESSFUL));
166     }
167
168     Optional<CompletionStage<Empty>> preCommit() {
169         LOG.debug("{}: preCommit - successfulFromPrevious: {}", txId, successfulFromPrevious);
170
171         if (successfulFromPrevious.isEmpty()) {
172             changeStateFrom(State.CAN_COMMIT_SUCCESSFUL, State.PRE_COMMIT_SUCCESSFUL);
173             return Optional.empty();
174         }
175
176         final List<Entry<ActorRef, Future<Object>>> futures = sendMessageToSuccessful(
177             new DataTreeCohortActor.PreCommit(txId));
178         changeStateFrom(State.CAN_COMMIT_SUCCESSFUL, State.PRE_COMMIT_SENT);
179         return Optional.of(processResponses(futures, State.PRE_COMMIT_SENT, State.PRE_COMMIT_SUCCESSFUL));
180     }
181
182     Optional<CompletionStage<Empty>> commit() {
183         LOG.debug("{}: commit - successfulFromPrevious: {}", txId, successfulFromPrevious);
184         if (successfulFromPrevious.isEmpty()) {
185             changeStateFrom(State.PRE_COMMIT_SUCCESSFUL, State.COMMITED);
186             return Optional.empty();
187         }
188
189         final List<Entry<ActorRef, Future<Object>>> futures = sendMessageToSuccessful(
190             new DataTreeCohortActor.Commit(txId));
191         changeStateFrom(State.PRE_COMMIT_SUCCESSFUL, State.COMMIT_SENT);
192         return Optional.of(processResponses(futures, State.COMMIT_SENT, State.COMMITED));
193     }
194
195     Optional<CompletionStage<?>> abort() {
196         LOG.debug("{}: abort - successfulFromPrevious: {}", txId, successfulFromPrevious);
197
198         state = State.ABORTED;
199         if (successfulFromPrevious.isEmpty()) {
200             return Optional.empty();
201         }
202
203         final DataTreeCohortActor.Abort message = new DataTreeCohortActor.Abort(txId);
204         final List<Future<Object>> futures = new ArrayList<>(successfulFromPrevious.size());
205         for (Success s : successfulFromPrevious) {
206             futures.add(Patterns.ask(s.getCohort(), message, timeout));
207         }
208
209         return Optional.of(FutureConverters.toJava(Futures.sequence(futures, ExecutionContexts.global())));
210     }
211
212     private List<Entry<ActorRef, Future<Object>>> sendMessageToSuccessful(final Object message) {
213         LOG.debug("{}: sendMesageToSuccessful: {}", txId, message);
214
215         final List<Entry<ActorRef, Future<Object>>> ret = new ArrayList<>(successfulFromPrevious.size());
216         for (Success s : successfulFromPrevious) {
217             final ActorRef actor = s.getCohort();
218             ret.add(new SimpleImmutableEntry<>(actor, Patterns.ask(actor, message, timeout)));
219         }
220         return ret;
221     }
222
223     private @NonNull CompletionStage<Empty> processResponses(final List<Entry<ActorRef, Future<Object>>> futures,
224             final State currentState, final State afterState) {
225         LOG.debug("{}: processResponses - currentState: {}, afterState: {}", txId, currentState, afterState);
226         final CompletableFuture<Empty> returnFuture = new CompletableFuture<>();
227         Future<Iterable<Object>> aggregateFuture = Futures.sequence(Lists.transform(futures, Entry::getValue),
228                 ExecutionContexts.global());
229
230         aggregateFuture.onComplete(new OnComplete<Iterable<Object>>() {
231             @Override
232             public void onComplete(final Throwable failure, final Iterable<Object> results) {
233                 callbackExecutor.execute(
234                     () -> processResponses(failure, results, currentState, afterState, returnFuture));
235             }
236         }, ExecutionContexts.global());
237
238         return returnFuture;
239     }
240
241     private void processResponses(final Throwable failure, final Iterable<Object> results,
242             final State currentState, final State afterState, final CompletableFuture<Empty> resultFuture) {
243         if (failure != null) {
244             successfulFromPrevious = List.of();
245             resultFuture.completeExceptionally(failure);
246             return;
247         }
248
249         final Collection<Failure> failed = new ArrayList<>(1);
250         final List<Success> successful = new ArrayList<>();
251         for (Object result : results) {
252             if (result instanceof DataTreeCohortActor.Success) {
253                 successful.add((Success) result);
254             } else if (result instanceof Status.Failure) {
255                 failed.add((Failure) result);
256             } else {
257                 LOG.warn("{}: unrecognized response {}, ignoring it", txId, result);
258             }
259         }
260
261         LOG.debug("{}: processResponses - successful: {}, failed: {}", txId, successful, failed);
262
263         if (!failed.isEmpty()) {
264             changeStateFrom(currentState, State.FAILED);
265             final Iterator<Failure> it = failed.iterator();
266             final Throwable firstEx = it.next().cause();
267             while (it.hasNext()) {
268                 firstEx.addSuppressed(it.next().cause());
269             }
270
271             successfulFromPrevious = List.of();
272             resultFuture.completeExceptionally(firstEx);
273         } else {
274             successfulFromPrevious = successful;
275             changeStateFrom(currentState, afterState);
276             resultFuture.complete(Empty.value());
277         }
278     }
279
280     void changeStateFrom(final State expected, final State followup) {
281         checkState(state == expected);
282         state = followup;
283     }
284 }