2 * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.controller.cluster.datastore;
10 import static com.google.common.base.Preconditions.checkState;
11 import static java.util.Objects.requireNonNull;
13 import akka.actor.ActorRef;
14 import akka.actor.Status;
15 import akka.actor.Status.Failure;
16 import akka.dispatch.ExecutionContexts;
17 import akka.dispatch.Futures;
18 import akka.dispatch.OnComplete;
19 import akka.dispatch.Recover;
20 import akka.pattern.Patterns;
21 import akka.util.Timeout;
22 import com.google.common.collect.Lists;
23 import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
24 import java.util.AbstractMap.SimpleImmutableEntry;
25 import java.util.ArrayList;
26 import java.util.Collection;
27 import java.util.Collections;
28 import java.util.Iterator;
29 import java.util.List;
30 import java.util.Map.Entry;
31 import java.util.Optional;
32 import java.util.concurrent.CompletableFuture;
33 import java.util.concurrent.CompletionStage;
34 import java.util.concurrent.Executor;
35 import org.eclipse.jdt.annotation.NonNull;
36 import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
37 import org.opendaylight.controller.cluster.datastore.DataTreeCohortActor.CanCommit;
38 import org.opendaylight.controller.cluster.datastore.DataTreeCohortActor.Success;
39 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidate;
40 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
41 import org.slf4j.Logger;
42 import org.slf4j.LoggerFactory;
43 import scala.compat.java8.FutureConverters;
44 import scala.concurrent.Future;
47 * Composite cohort, which coordinates multiple user-provided cohorts as if it was only one cohort.
49 * It tracks current operation and list of cohorts which successfuly finished previous phase in
50 * case, if abort is necessary to invoke it only on cohort steps which are still active.
53 class CompositeDataTreeCohort {
54 private static final Logger LOG = LoggerFactory.getLogger(CompositeDataTreeCohort.class);
58 * Cohorts are idle, no messages were sent.
62 * CanCommit message was sent to all participating cohorts.
66 * Successful canCommit responses were received from every participating cohort.
68 CAN_COMMIT_SUCCESSFUL,
70 * PreCommit message was sent to all participating cohorts.
74 * Successful preCommit responses were received from every participating cohort.
76 PRE_COMMIT_SUCCESSFUL,
78 * Commit message was send to all participating cohorts.
82 * Successful commit responses were received from all participating cohorts.
86 * Some of cohorts responded back with unsuccessful message.
90 * Abort message was send to all cohorts which responded with success previously.
95 static final Recover<Object> EXCEPTION_TO_MESSAGE = new Recover<Object>() {
97 public Failure recover(final Throwable error) {
98 return new Failure(error);
102 private final DataTreeCohortActorRegistry registry;
103 private final TransactionIdentifier txId;
104 private final SchemaContext schema;
105 private final Executor callbackExecutor;
106 private final Timeout timeout;
108 private @NonNull List<Success> successfulFromPrevious = Collections.emptyList();
109 private State state = State.IDLE;
111 CompositeDataTreeCohort(final DataTreeCohortActorRegistry registry, final TransactionIdentifier transactionID,
112 final SchemaContext schema, final Executor callbackExecutor, final Timeout timeout) {
113 this.registry = requireNonNull(registry);
114 this.txId = requireNonNull(transactionID);
115 this.schema = requireNonNull(schema);
116 this.callbackExecutor = requireNonNull(callbackExecutor);
117 this.timeout = requireNonNull(timeout);
122 case CAN_COMMIT_SENT:
123 case CAN_COMMIT_SUCCESSFUL:
124 case PRE_COMMIT_SENT:
125 case PRE_COMMIT_SUCCESSFUL:
135 throw new IllegalStateException("Unhandled state " + state);
138 successfulFromPrevious = Collections.emptyList();
142 Optional<CompletionStage<Void>> canCommit(final DataTreeCandidate tip) {
143 if (LOG.isTraceEnabled()) {
144 LOG.trace("{}: canCommit - candidate: {}", txId, tip);
146 LOG.debug("{}: canCommit - candidate rootPath: {}", txId, tip.getRootPath());
149 final List<CanCommit> messages = registry.createCanCommitMessages(txId, tip, schema);
150 LOG.debug("{}: canCommit - messages: {}", txId, messages);
151 if (messages.isEmpty()) {
152 successfulFromPrevious = Collections.emptyList();
153 changeStateFrom(State.IDLE, State.CAN_COMMIT_SUCCESSFUL);
154 return Optional.empty();
157 final List<Entry<ActorRef, Future<Object>>> futures = new ArrayList<>(messages.size());
158 for (CanCommit message : messages) {
159 final ActorRef actor = message.getCohort();
160 final Future<Object> future = Patterns.ask(actor, message, timeout).recover(EXCEPTION_TO_MESSAGE,
161 ExecutionContexts.global());
162 LOG.trace("{}: requesting canCommit from {}", txId, actor);
163 futures.add(new SimpleImmutableEntry<>(actor, future));
166 changeStateFrom(State.IDLE, State.CAN_COMMIT_SENT);
167 return Optional.of(processResponses(futures, State.CAN_COMMIT_SENT, State.CAN_COMMIT_SUCCESSFUL));
170 Optional<CompletionStage<Void>> preCommit() {
171 LOG.debug("{}: preCommit - successfulFromPrevious: {}", txId, successfulFromPrevious);
173 if (successfulFromPrevious.isEmpty()) {
174 changeStateFrom(State.CAN_COMMIT_SUCCESSFUL, State.PRE_COMMIT_SUCCESSFUL);
175 return Optional.empty();
178 final List<Entry<ActorRef, Future<Object>>> futures = sendMessageToSuccessful(
179 new DataTreeCohortActor.PreCommit(txId));
180 changeStateFrom(State.CAN_COMMIT_SUCCESSFUL, State.PRE_COMMIT_SENT);
181 return Optional.of(processResponses(futures, State.PRE_COMMIT_SENT, State.PRE_COMMIT_SUCCESSFUL));
184 Optional<CompletionStage<Void>> commit() {
185 LOG.debug("{}: commit - successfulFromPrevious: {}", txId, successfulFromPrevious);
186 if (successfulFromPrevious.isEmpty()) {
187 changeStateFrom(State.PRE_COMMIT_SUCCESSFUL, State.COMMITED);
188 return Optional.empty();
191 final List<Entry<ActorRef, Future<Object>>> futures = sendMessageToSuccessful(
192 new DataTreeCohortActor.Commit(txId));
193 changeStateFrom(State.PRE_COMMIT_SUCCESSFUL, State.COMMIT_SENT);
194 return Optional.of(processResponses(futures, State.COMMIT_SENT, State.COMMITED));
197 Optional<CompletionStage<?>> abort() {
198 LOG.debug("{}: abort - successfulFromPrevious: {}", txId, successfulFromPrevious);
200 state = State.ABORTED;
201 if (successfulFromPrevious.isEmpty()) {
202 return Optional.empty();
205 final DataTreeCohortActor.Abort message = new DataTreeCohortActor.Abort(txId);
206 final List<Future<Object>> futures = new ArrayList<>(successfulFromPrevious.size());
207 for (Success s : successfulFromPrevious) {
208 futures.add(Patterns.ask(s.getCohort(), message, timeout));
211 return Optional.of(FutureConverters.toJava(Futures.sequence(futures, ExecutionContexts.global())));
214 private List<Entry<ActorRef, Future<Object>>> sendMessageToSuccessful(final Object message) {
215 LOG.debug("{}: sendMesageToSuccessful: {}", txId, message);
217 final List<Entry<ActorRef, Future<Object>>> ret = new ArrayList<>(successfulFromPrevious.size());
218 for (Success s : successfulFromPrevious) {
219 final ActorRef actor = s.getCohort();
220 ret.add(new SimpleImmutableEntry<>(actor, Patterns.ask(actor, message, timeout)));
225 private @NonNull CompletionStage<Void> processResponses(final List<Entry<ActorRef, Future<Object>>> futures,
226 final State currentState, final State afterState) {
227 LOG.debug("{}: processResponses - currentState: {}, afterState: {}", txId, currentState, afterState);
228 final CompletableFuture<Void> returnFuture = new CompletableFuture<>();
229 Future<Iterable<Object>> aggregateFuture = Futures.sequence(Lists.transform(futures, Entry::getValue),
230 ExecutionContexts.global());
232 aggregateFuture.onComplete(new OnComplete<Iterable<Object>>() {
234 public void onComplete(final Throwable failure, final Iterable<Object> results) {
235 callbackExecutor.execute(
236 () -> processResponses(failure, results, currentState, afterState, returnFuture));
238 }, ExecutionContexts.global());
243 // FB issues violation for passing null to CompletableFuture#complete but it is valid and necessary when the
244 // generic type is Void.
245 @SuppressFBWarnings(value = { "NP_NONNULL_PARAM_VIOLATION", "UPM_UNCALLED_PRIVATE_METHOD" },
246 justification = "https://github.com/spotbugs/spotbugs/issues/811")
247 private void processResponses(final Throwable failure, final Iterable<Object> results,
248 final State currentState, final State afterState, final CompletableFuture<Void> resultFuture) {
249 if (failure != null) {
250 successfulFromPrevious = Collections.emptyList();
251 resultFuture.completeExceptionally(failure);
255 final Collection<Failure> failed = new ArrayList<>(1);
256 final List<Success> successful = new ArrayList<>();
257 for (Object result : results) {
258 if (result instanceof DataTreeCohortActor.Success) {
259 successful.add((Success) result);
260 } else if (result instanceof Status.Failure) {
261 failed.add((Failure) result);
263 LOG.warn("{}: unrecognized response {}, ignoring it", txId, result);
267 LOG.debug("{}: processResponses - successful: {}, failed: {}", txId, successful, failed);
269 if (!failed.isEmpty()) {
270 changeStateFrom(currentState, State.FAILED);
271 final Iterator<Failure> it = failed.iterator();
272 final Throwable firstEx = it.next().cause();
273 while (it.hasNext()) {
274 firstEx.addSuppressed(it.next().cause());
277 successfulFromPrevious = Collections.emptyList();
278 resultFuture.completeExceptionally(firstEx);
280 successfulFromPrevious = successful;
281 changeStateFrom(currentState, afterState);
282 resultFuture.complete(null);
286 void changeStateFrom(final State expected, final State followup) {
287 checkState(state == expected);