Move byte-based serialization method
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / main / java / org / opendaylight / controller / cluster / datastore / CompositeDataTreeCohort.java
1 /*
2  * Copyright (c) 2016 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.controller.cluster.datastore;
10
11 import akka.actor.ActorRef;
12 import akka.actor.Status;
13 import akka.actor.Status.Failure;
14 import akka.dispatch.ExecutionContexts;
15 import akka.dispatch.Futures;
16 import akka.dispatch.OnComplete;
17 import akka.dispatch.Recover;
18 import akka.pattern.Patterns;
19 import akka.util.Timeout;
20 import com.google.common.base.Preconditions;
21 import com.google.common.collect.Lists;
22 import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
23 import java.util.AbstractMap.SimpleImmutableEntry;
24 import java.util.ArrayList;
25 import java.util.Collection;
26 import java.util.Collections;
27 import java.util.Iterator;
28 import java.util.List;
29 import java.util.Map.Entry;
30 import java.util.Optional;
31 import java.util.concurrent.CompletableFuture;
32 import java.util.concurrent.CompletionStage;
33 import java.util.concurrent.Executor;
34 import org.eclipse.jdt.annotation.NonNull;
35 import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
36 import org.opendaylight.controller.cluster.datastore.DataTreeCohortActor.CanCommit;
37 import org.opendaylight.controller.cluster.datastore.DataTreeCohortActor.Success;
38 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidate;
39 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
40 import org.slf4j.Logger;
41 import org.slf4j.LoggerFactory;
42 import scala.compat.java8.FutureConverters;
43 import scala.concurrent.Future;
44
45 /**
46  * Composite cohort, which coordinates multiple user-provided cohorts as if it was only one cohort.
47  * <p/>
48  * It tracks current operation and list of cohorts which successfuly finished previous phase in
49  * case, if abort is necessary to invoke it only on cohort steps which are still active.
50  *
51  */
52 class CompositeDataTreeCohort {
53     private static final Logger LOG = LoggerFactory.getLogger(CompositeDataTreeCohort.class);
54
55     private enum State {
56         /**
57          * Cohorts are idle, no messages were sent.
58          */
59         IDLE,
60         /**
61          * CanCommit message was sent to all participating cohorts.
62          */
63         CAN_COMMIT_SENT,
64         /**
65          * Successful canCommit responses were received from every participating cohort.
66          */
67         CAN_COMMIT_SUCCESSFUL,
68         /**
69          * PreCommit message was sent to all participating cohorts.
70          */
71         PRE_COMMIT_SENT,
72         /**
73          * Successful preCommit responses were received from every participating cohort.
74          */
75         PRE_COMMIT_SUCCESSFUL,
76         /**
77          * Commit message was send to all participating cohorts.
78          */
79         COMMIT_SENT,
80         /**
81          * Successful commit responses were received from all participating cohorts.
82          */
83         COMMITED,
84         /**
85          * Some of cohorts responded back with unsuccessful message.
86          */
87         FAILED,
88         /**
89          * Abort message was send to all cohorts which responded with success previously.
90          */
91         ABORTED
92     }
93
94     static final Recover<Object> EXCEPTION_TO_MESSAGE = new Recover<Object>() {
95         @Override
96         public Failure recover(final Throwable error) {
97             return new Failure(error);
98         }
99     };
100
101     private final DataTreeCohortActorRegistry registry;
102     private final TransactionIdentifier txId;
103     private final SchemaContext schema;
104     private final Executor callbackExecutor;
105     private final Timeout timeout;
106
107     private @NonNull List<Success> successfulFromPrevious = Collections.emptyList();
108     private State state = State.IDLE;
109
110     CompositeDataTreeCohort(final DataTreeCohortActorRegistry registry, final TransactionIdentifier transactionID,
111         final SchemaContext schema, final Executor callbackExecutor, final Timeout timeout) {
112         this.registry = Preconditions.checkNotNull(registry);
113         this.txId = Preconditions.checkNotNull(transactionID);
114         this.schema = Preconditions.checkNotNull(schema);
115         this.callbackExecutor = Preconditions.checkNotNull(callbackExecutor);
116         this.timeout = Preconditions.checkNotNull(timeout);
117     }
118
119     void reset() {
120         switch (state) {
121             case CAN_COMMIT_SENT:
122             case CAN_COMMIT_SUCCESSFUL:
123             case PRE_COMMIT_SENT:
124             case PRE_COMMIT_SUCCESSFUL:
125             case COMMIT_SENT:
126                 abort();
127                 break;
128             case ABORTED:
129             case COMMITED:
130             case FAILED:
131             case IDLE:
132                 break;
133             default:
134                 throw new IllegalStateException("Unhandled state " + state);
135         }
136
137         successfulFromPrevious = Collections.emptyList();
138         state = State.IDLE;
139     }
140
141     Optional<CompletionStage<Void>> canCommit(final DataTreeCandidate tip) {
142         if (LOG.isTraceEnabled()) {
143             LOG.trace("{}: canCommit - candidate: {}", txId, tip);
144         } else {
145             LOG.debug("{}: canCommit - candidate rootPath: {}", txId, tip.getRootPath());
146         }
147
148         final List<CanCommit> messages = registry.createCanCommitMessages(txId, tip, schema);
149         LOG.debug("{}: canCommit - messages: {}", txId, messages);
150         if (messages.isEmpty()) {
151             successfulFromPrevious = Collections.emptyList();
152             changeStateFrom(State.IDLE, State.CAN_COMMIT_SUCCESSFUL);
153             return Optional.empty();
154         }
155
156         final List<Entry<ActorRef, Future<Object>>> futures = new ArrayList<>(messages.size());
157         for (CanCommit message : messages) {
158             final ActorRef actor = message.getCohort();
159             final Future<Object> future = Patterns.ask(actor, message, timeout).recover(EXCEPTION_TO_MESSAGE,
160                 ExecutionContexts.global());
161             LOG.trace("{}: requesting canCommit from {}", txId, actor);
162             futures.add(new SimpleImmutableEntry<>(actor, future));
163         }
164
165         changeStateFrom(State.IDLE, State.CAN_COMMIT_SENT);
166         return Optional.of(processResponses(futures, State.CAN_COMMIT_SENT, State.CAN_COMMIT_SUCCESSFUL));
167     }
168
169     Optional<CompletionStage<Void>> preCommit() {
170         LOG.debug("{}: preCommit - successfulFromPrevious: {}", txId, successfulFromPrevious);
171
172         if (successfulFromPrevious.isEmpty()) {
173             changeStateFrom(State.CAN_COMMIT_SUCCESSFUL, State.PRE_COMMIT_SUCCESSFUL);
174             return Optional.empty();
175         }
176
177         final List<Entry<ActorRef, Future<Object>>> futures = sendMessageToSuccessful(
178             new DataTreeCohortActor.PreCommit(txId));
179         changeStateFrom(State.CAN_COMMIT_SUCCESSFUL, State.PRE_COMMIT_SENT);
180         return Optional.of(processResponses(futures, State.PRE_COMMIT_SENT, State.PRE_COMMIT_SUCCESSFUL));
181     }
182
183     Optional<CompletionStage<Void>> commit() {
184         LOG.debug("{}: commit - successfulFromPrevious: {}", txId, successfulFromPrevious);
185         if (successfulFromPrevious.isEmpty()) {
186             changeStateFrom(State.PRE_COMMIT_SUCCESSFUL, State.COMMITED);
187             return Optional.empty();
188         }
189
190         final List<Entry<ActorRef, Future<Object>>> futures = sendMessageToSuccessful(
191             new DataTreeCohortActor.Commit(txId));
192         changeStateFrom(State.PRE_COMMIT_SUCCESSFUL, State.COMMIT_SENT);
193         return Optional.of(processResponses(futures, State.COMMIT_SENT, State.COMMITED));
194     }
195
196     Optional<CompletionStage<?>> abort() {
197         LOG.debug("{}: abort - successfulFromPrevious: {}", txId, successfulFromPrevious);
198
199         state = State.ABORTED;
200         if (successfulFromPrevious.isEmpty()) {
201             return Optional.empty();
202         }
203
204         final DataTreeCohortActor.Abort message = new DataTreeCohortActor.Abort(txId);
205         final List<Future<Object>> futures = new ArrayList<>(successfulFromPrevious.size());
206         for (Success s : successfulFromPrevious) {
207             futures.add(Patterns.ask(s.getCohort(), message, timeout));
208         }
209
210         return Optional.of(FutureConverters.toJava(Futures.sequence(futures, ExecutionContexts.global())));
211     }
212
213     private List<Entry<ActorRef, Future<Object>>> sendMessageToSuccessful(final Object message) {
214         LOG.debug("{}: sendMesageToSuccessful: {}", txId, message);
215
216         final List<Entry<ActorRef, Future<Object>>> ret = new ArrayList<>(successfulFromPrevious.size());
217         for (Success s : successfulFromPrevious) {
218             final ActorRef actor = s.getCohort();
219             ret.add(new SimpleImmutableEntry<>(actor, Patterns.ask(actor, message, timeout)));
220         }
221         return ret;
222     }
223
224     private @NonNull CompletionStage<Void> processResponses(final List<Entry<ActorRef, Future<Object>>> futures,
225             final State currentState, final State afterState) {
226         LOG.debug("{}: processResponses - currentState: {}, afterState: {}", txId, currentState, afterState);
227         final CompletableFuture<Void> returnFuture = new CompletableFuture<>();
228         Future<Iterable<Object>> aggregateFuture = Futures.sequence(Lists.transform(futures, Entry::getValue),
229                 ExecutionContexts.global());
230
231         aggregateFuture.onComplete(new OnComplete<Iterable<Object>>() {
232             @Override
233             public void onComplete(final Throwable failure, final Iterable<Object> results) {
234                 callbackExecutor.execute(
235                     () -> processResponses(failure, results, currentState, afterState, returnFuture));
236             }
237         }, ExecutionContexts.global());
238
239         return returnFuture;
240     }
241
242     // FB issues violation for passing null to CompletableFuture#complete but it is valid and necessary when the
243     // generic type is Void.
244     @SuppressFBWarnings("NP_NONNULL_PARAM_VIOLATION")
245     private void processResponses(final Throwable failure, final Iterable<Object> results,
246             final State currentState, final State afterState, final CompletableFuture<Void> resultFuture) {
247         if (failure != null) {
248             successfulFromPrevious = Collections.emptyList();
249             resultFuture.completeExceptionally(failure);
250             return;
251         }
252
253         final Collection<Failure> failed = new ArrayList<>(1);
254         final List<Success> successful = new ArrayList<>();
255         for (Object result : results) {
256             if (result instanceof DataTreeCohortActor.Success) {
257                 successful.add((Success) result);
258             } else if (result instanceof Status.Failure) {
259                 failed.add((Failure) result);
260             } else {
261                 LOG.warn("{}: unrecognized response {}, ignoring it", txId, result);
262             }
263         }
264
265         LOG.debug("{}: processResponses - successful: {}, failed: {}", txId, successful, failed);
266
267         if (!failed.isEmpty()) {
268             changeStateFrom(currentState, State.FAILED);
269             final Iterator<Failure> it = failed.iterator();
270             final Throwable firstEx = it.next().cause();
271             while (it.hasNext()) {
272                 firstEx.addSuppressed(it.next().cause());
273             }
274
275             successfulFromPrevious = Collections.emptyList();
276             resultFuture.completeExceptionally(firstEx);
277         } else {
278             successfulFromPrevious = successful;
279             changeStateFrom(currentState, afterState);
280             resultFuture.complete(null);
281         }
282     }
283
284     void changeStateFrom(final State expected, final State followup) {
285         Preconditions.checkState(state == expected);
286         state = followup;
287     }
288 }