Define DataStoreVersions.MAGNESIUM_VERSION
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / main / java / org / opendaylight / controller / cluster / datastore / CompositeDataTreeCohort.java
1 /*
2  * Copyright (c) 2016 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.controller.cluster.datastore;
9
10 import static com.google.common.base.Preconditions.checkState;
11 import static java.util.Objects.requireNonNull;
12
13 import akka.actor.ActorRef;
14 import akka.actor.Status;
15 import akka.actor.Status.Failure;
16 import akka.dispatch.ExecutionContexts;
17 import akka.dispatch.Futures;
18 import akka.dispatch.OnComplete;
19 import akka.dispatch.Recover;
20 import akka.pattern.Patterns;
21 import akka.util.Timeout;
22 import com.google.common.collect.Lists;
23 import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
24 import java.util.AbstractMap.SimpleImmutableEntry;
25 import java.util.ArrayList;
26 import java.util.Collection;
27 import java.util.Collections;
28 import java.util.Iterator;
29 import java.util.List;
30 import java.util.Map.Entry;
31 import java.util.Optional;
32 import java.util.concurrent.CompletableFuture;
33 import java.util.concurrent.CompletionStage;
34 import java.util.concurrent.Executor;
35 import org.eclipse.jdt.annotation.NonNull;
36 import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
37 import org.opendaylight.controller.cluster.datastore.DataTreeCohortActor.CanCommit;
38 import org.opendaylight.controller.cluster.datastore.DataTreeCohortActor.Success;
39 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidate;
40 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
41 import org.slf4j.Logger;
42 import org.slf4j.LoggerFactory;
43 import scala.compat.java8.FutureConverters;
44 import scala.concurrent.Future;
45
46 /**
47  * Composite cohort, which coordinates multiple user-provided cohorts as if it was only one cohort.
48  * <p/>
49  * It tracks current operation and list of cohorts which successfuly finished previous phase in
50  * case, if abort is necessary to invoke it only on cohort steps which are still active.
51  *
52  */
53 class CompositeDataTreeCohort {
54     private static final Logger LOG = LoggerFactory.getLogger(CompositeDataTreeCohort.class);
55
56     private enum State {
57         /**
58          * Cohorts are idle, no messages were sent.
59          */
60         IDLE,
61         /**
62          * CanCommit message was sent to all participating cohorts.
63          */
64         CAN_COMMIT_SENT,
65         /**
66          * Successful canCommit responses were received from every participating cohort.
67          */
68         CAN_COMMIT_SUCCESSFUL,
69         /**
70          * PreCommit message was sent to all participating cohorts.
71          */
72         PRE_COMMIT_SENT,
73         /**
74          * Successful preCommit responses were received from every participating cohort.
75          */
76         PRE_COMMIT_SUCCESSFUL,
77         /**
78          * Commit message was send to all participating cohorts.
79          */
80         COMMIT_SENT,
81         /**
82          * Successful commit responses were received from all participating cohorts.
83          */
84         COMMITED,
85         /**
86          * Some of cohorts responded back with unsuccessful message.
87          */
88         FAILED,
89         /**
90          * Abort message was send to all cohorts which responded with success previously.
91          */
92         ABORTED
93     }
94
95     static final Recover<Object> EXCEPTION_TO_MESSAGE = new Recover<Object>() {
96         @Override
97         public Failure recover(final Throwable error) {
98             return new Failure(error);
99         }
100     };
101
102     private final DataTreeCohortActorRegistry registry;
103     private final TransactionIdentifier txId;
104     private final SchemaContext schema;
105     private final Executor callbackExecutor;
106     private final Timeout timeout;
107
108     private @NonNull List<Success> successfulFromPrevious = Collections.emptyList();
109     private State state = State.IDLE;
110
111     CompositeDataTreeCohort(final DataTreeCohortActorRegistry registry, final TransactionIdentifier transactionID,
112         final SchemaContext schema, final Executor callbackExecutor, final Timeout timeout) {
113         this.registry = requireNonNull(registry);
114         this.txId = requireNonNull(transactionID);
115         this.schema = requireNonNull(schema);
116         this.callbackExecutor = requireNonNull(callbackExecutor);
117         this.timeout = requireNonNull(timeout);
118     }
119
120     void reset() {
121         switch (state) {
122             case CAN_COMMIT_SENT:
123             case CAN_COMMIT_SUCCESSFUL:
124             case PRE_COMMIT_SENT:
125             case PRE_COMMIT_SUCCESSFUL:
126             case COMMIT_SENT:
127                 abort();
128                 break;
129             case ABORTED:
130             case COMMITED:
131             case FAILED:
132             case IDLE:
133                 break;
134             default:
135                 throw new IllegalStateException("Unhandled state " + state);
136         }
137
138         successfulFromPrevious = Collections.emptyList();
139         state = State.IDLE;
140     }
141
142     Optional<CompletionStage<Void>> canCommit(final DataTreeCandidate tip) {
143         if (LOG.isTraceEnabled()) {
144             LOG.trace("{}: canCommit - candidate: {}", txId, tip);
145         } else {
146             LOG.debug("{}: canCommit - candidate rootPath: {}", txId, tip.getRootPath());
147         }
148
149         final List<CanCommit> messages = registry.createCanCommitMessages(txId, tip, schema);
150         LOG.debug("{}: canCommit - messages: {}", txId, messages);
151         if (messages.isEmpty()) {
152             successfulFromPrevious = Collections.emptyList();
153             changeStateFrom(State.IDLE, State.CAN_COMMIT_SUCCESSFUL);
154             return Optional.empty();
155         }
156
157         final List<Entry<ActorRef, Future<Object>>> futures = new ArrayList<>(messages.size());
158         for (CanCommit message : messages) {
159             final ActorRef actor = message.getCohort();
160             final Future<Object> future = Patterns.ask(actor, message, timeout).recover(EXCEPTION_TO_MESSAGE,
161                 ExecutionContexts.global());
162             LOG.trace("{}: requesting canCommit from {}", txId, actor);
163             futures.add(new SimpleImmutableEntry<>(actor, future));
164         }
165
166         changeStateFrom(State.IDLE, State.CAN_COMMIT_SENT);
167         return Optional.of(processResponses(futures, State.CAN_COMMIT_SENT, State.CAN_COMMIT_SUCCESSFUL));
168     }
169
170     Optional<CompletionStage<Void>> preCommit() {
171         LOG.debug("{}: preCommit - successfulFromPrevious: {}", txId, successfulFromPrevious);
172
173         if (successfulFromPrevious.isEmpty()) {
174             changeStateFrom(State.CAN_COMMIT_SUCCESSFUL, State.PRE_COMMIT_SUCCESSFUL);
175             return Optional.empty();
176         }
177
178         final List<Entry<ActorRef, Future<Object>>> futures = sendMessageToSuccessful(
179             new DataTreeCohortActor.PreCommit(txId));
180         changeStateFrom(State.CAN_COMMIT_SUCCESSFUL, State.PRE_COMMIT_SENT);
181         return Optional.of(processResponses(futures, State.PRE_COMMIT_SENT, State.PRE_COMMIT_SUCCESSFUL));
182     }
183
184     Optional<CompletionStage<Void>> commit() {
185         LOG.debug("{}: commit - successfulFromPrevious: {}", txId, successfulFromPrevious);
186         if (successfulFromPrevious.isEmpty()) {
187             changeStateFrom(State.PRE_COMMIT_SUCCESSFUL, State.COMMITED);
188             return Optional.empty();
189         }
190
191         final List<Entry<ActorRef, Future<Object>>> futures = sendMessageToSuccessful(
192             new DataTreeCohortActor.Commit(txId));
193         changeStateFrom(State.PRE_COMMIT_SUCCESSFUL, State.COMMIT_SENT);
194         return Optional.of(processResponses(futures, State.COMMIT_SENT, State.COMMITED));
195     }
196
197     Optional<CompletionStage<?>> abort() {
198         LOG.debug("{}: abort - successfulFromPrevious: {}", txId, successfulFromPrevious);
199
200         state = State.ABORTED;
201         if (successfulFromPrevious.isEmpty()) {
202             return Optional.empty();
203         }
204
205         final DataTreeCohortActor.Abort message = new DataTreeCohortActor.Abort(txId);
206         final List<Future<Object>> futures = new ArrayList<>(successfulFromPrevious.size());
207         for (Success s : successfulFromPrevious) {
208             futures.add(Patterns.ask(s.getCohort(), message, timeout));
209         }
210
211         return Optional.of(FutureConverters.toJava(Futures.sequence(futures, ExecutionContexts.global())));
212     }
213
214     private List<Entry<ActorRef, Future<Object>>> sendMessageToSuccessful(final Object message) {
215         LOG.debug("{}: sendMesageToSuccessful: {}", txId, message);
216
217         final List<Entry<ActorRef, Future<Object>>> ret = new ArrayList<>(successfulFromPrevious.size());
218         for (Success s : successfulFromPrevious) {
219             final ActorRef actor = s.getCohort();
220             ret.add(new SimpleImmutableEntry<>(actor, Patterns.ask(actor, message, timeout)));
221         }
222         return ret;
223     }
224
225     private @NonNull CompletionStage<Void> processResponses(final List<Entry<ActorRef, Future<Object>>> futures,
226             final State currentState, final State afterState) {
227         LOG.debug("{}: processResponses - currentState: {}, afterState: {}", txId, currentState, afterState);
228         final CompletableFuture<Void> returnFuture = new CompletableFuture<>();
229         Future<Iterable<Object>> aggregateFuture = Futures.sequence(Lists.transform(futures, Entry::getValue),
230                 ExecutionContexts.global());
231
232         aggregateFuture.onComplete(new OnComplete<Iterable<Object>>() {
233             @Override
234             public void onComplete(final Throwable failure, final Iterable<Object> results) {
235                 callbackExecutor.execute(
236                     () -> processResponses(failure, results, currentState, afterState, returnFuture));
237             }
238         }, ExecutionContexts.global());
239
240         return returnFuture;
241     }
242
243     // FB issues violation for passing null to CompletableFuture#complete but it is valid and necessary when the
244     // generic type is Void.
245     @SuppressFBWarnings("NP_NONNULL_PARAM_VIOLATION")
246     private void processResponses(final Throwable failure, final Iterable<Object> results,
247             final State currentState, final State afterState, final CompletableFuture<Void> resultFuture) {
248         if (failure != null) {
249             successfulFromPrevious = Collections.emptyList();
250             resultFuture.completeExceptionally(failure);
251             return;
252         }
253
254         final Collection<Failure> failed = new ArrayList<>(1);
255         final List<Success> successful = new ArrayList<>();
256         for (Object result : results) {
257             if (result instanceof DataTreeCohortActor.Success) {
258                 successful.add((Success) result);
259             } else if (result instanceof Status.Failure) {
260                 failed.add((Failure) result);
261             } else {
262                 LOG.warn("{}: unrecognized response {}, ignoring it", txId, result);
263             }
264         }
265
266         LOG.debug("{}: processResponses - successful: {}, failed: {}", txId, successful, failed);
267
268         if (!failed.isEmpty()) {
269             changeStateFrom(currentState, State.FAILED);
270             final Iterator<Failure> it = failed.iterator();
271             final Throwable firstEx = it.next().cause();
272             while (it.hasNext()) {
273                 firstEx.addSuppressed(it.next().cause());
274             }
275
276             successfulFromPrevious = Collections.emptyList();
277             resultFuture.completeExceptionally(firstEx);
278         } else {
279             successfulFromPrevious = successful;
280             changeStateFrom(currentState, afterState);
281             resultFuture.complete(null);
282         }
283     }
284
285     void changeStateFrom(final State expected, final State followup) {
286         checkState(state == expected);
287         state = followup;
288     }
289 }