2 * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
9 package org.opendaylight.controller.cluster.datastore;
11 import akka.actor.ActorRef;
12 import akka.actor.Status;
13 import akka.actor.Status.Failure;
14 import akka.dispatch.ExecutionContexts;
15 import akka.dispatch.Futures;
16 import akka.dispatch.OnComplete;
17 import akka.dispatch.Recover;
18 import akka.pattern.Patterns;
19 import akka.util.Timeout;
20 import com.google.common.base.Preconditions;
21 import com.google.common.collect.Lists;
22 import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
23 import java.util.AbstractMap.SimpleImmutableEntry;
24 import java.util.ArrayList;
25 import java.util.Collection;
26 import java.util.Collections;
27 import java.util.Iterator;
28 import java.util.List;
29 import java.util.Map.Entry;
30 import java.util.Optional;
31 import java.util.concurrent.CompletableFuture;
32 import java.util.concurrent.CompletionStage;
33 import java.util.concurrent.Executor;
34 import org.eclipse.jdt.annotation.NonNull;
35 import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
36 import org.opendaylight.controller.cluster.datastore.DataTreeCohortActor.CanCommit;
37 import org.opendaylight.controller.cluster.datastore.DataTreeCohortActor.Success;
38 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidate;
39 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
40 import org.slf4j.Logger;
41 import org.slf4j.LoggerFactory;
42 import scala.compat.java8.FutureConverters;
43 import scala.concurrent.Future;
46 * Composite cohort, which coordinates multiple user-provided cohorts as if it was only one cohort.
48 * It tracks current operation and list of cohorts which successfuly finished previous phase in
49 * case, if abort is necessary to invoke it only on cohort steps which are still active.
52 class CompositeDataTreeCohort {
53 private static final Logger LOG = LoggerFactory.getLogger(CompositeDataTreeCohort.class);
57 * Cohorts are idle, no messages were sent.
61 * CanCommit message was sent to all participating cohorts.
65 * Successful canCommit responses were received from every participating cohort.
67 CAN_COMMIT_SUCCESSFUL,
69 * PreCommit message was sent to all participating cohorts.
73 * Successful preCommit responses were received from every participating cohort.
75 PRE_COMMIT_SUCCESSFUL,
77 * Commit message was send to all participating cohorts.
81 * Successful commit responses were received from all participating cohorts.
85 * Some of cohorts responded back with unsuccessful message.
89 * Abort message was send to all cohorts which responded with success previously.
94 static final Recover<Object> EXCEPTION_TO_MESSAGE = new Recover<Object>() {
96 public Failure recover(final Throwable error) {
97 return new Failure(error);
101 private final DataTreeCohortActorRegistry registry;
102 private final TransactionIdentifier txId;
103 private final SchemaContext schema;
104 private final Executor callbackExecutor;
105 private final Timeout timeout;
107 private @NonNull List<Success> successfulFromPrevious = Collections.emptyList();
108 private State state = State.IDLE;
110 CompositeDataTreeCohort(final DataTreeCohortActorRegistry registry, final TransactionIdentifier transactionID,
111 final SchemaContext schema, final Executor callbackExecutor, final Timeout timeout) {
112 this.registry = Preconditions.checkNotNull(registry);
113 this.txId = Preconditions.checkNotNull(transactionID);
114 this.schema = Preconditions.checkNotNull(schema);
115 this.callbackExecutor = Preconditions.checkNotNull(callbackExecutor);
116 this.timeout = Preconditions.checkNotNull(timeout);
121 case CAN_COMMIT_SENT:
122 case CAN_COMMIT_SUCCESSFUL:
123 case PRE_COMMIT_SENT:
124 case PRE_COMMIT_SUCCESSFUL:
134 throw new IllegalStateException("Unhandled state " + state);
137 successfulFromPrevious = Collections.emptyList();
141 Optional<CompletionStage<Void>> canCommit(final DataTreeCandidate tip) {
142 if (LOG.isTraceEnabled()) {
143 LOG.trace("{}: canCommit - candidate: {}", txId, tip);
145 LOG.debug("{}: canCommit - candidate rootPath: {}", txId, tip.getRootPath());
148 final List<CanCommit> messages = registry.createCanCommitMessages(txId, tip, schema);
149 LOG.debug("{}: canCommit - messages: {}", txId, messages);
150 if (messages.isEmpty()) {
151 successfulFromPrevious = Collections.emptyList();
152 changeStateFrom(State.IDLE, State.CAN_COMMIT_SUCCESSFUL);
153 return Optional.empty();
156 final List<Entry<ActorRef, Future<Object>>> futures = new ArrayList<>(messages.size());
157 for (CanCommit message : messages) {
158 final ActorRef actor = message.getCohort();
159 final Future<Object> future = Patterns.ask(actor, message, timeout).recover(EXCEPTION_TO_MESSAGE,
160 ExecutionContexts.global());
161 LOG.trace("{}: requesting canCommit from {}", txId, actor);
162 futures.add(new SimpleImmutableEntry<>(actor, future));
165 changeStateFrom(State.IDLE, State.CAN_COMMIT_SENT);
166 return Optional.of(processResponses(futures, State.CAN_COMMIT_SENT, State.CAN_COMMIT_SUCCESSFUL));
169 Optional<CompletionStage<Void>> preCommit() {
170 LOG.debug("{}: preCommit - successfulFromPrevious: {}", txId, successfulFromPrevious);
172 if (successfulFromPrevious.isEmpty()) {
173 changeStateFrom(State.CAN_COMMIT_SUCCESSFUL, State.PRE_COMMIT_SUCCESSFUL);
174 return Optional.empty();
177 final List<Entry<ActorRef, Future<Object>>> futures = sendMessageToSuccessful(
178 new DataTreeCohortActor.PreCommit(txId));
179 changeStateFrom(State.CAN_COMMIT_SUCCESSFUL, State.PRE_COMMIT_SENT);
180 return Optional.of(processResponses(futures, State.PRE_COMMIT_SENT, State.PRE_COMMIT_SUCCESSFUL));
183 Optional<CompletionStage<Void>> commit() {
184 LOG.debug("{}: commit - successfulFromPrevious: {}", txId, successfulFromPrevious);
185 if (successfulFromPrevious.isEmpty()) {
186 changeStateFrom(State.PRE_COMMIT_SUCCESSFUL, State.COMMITED);
187 return Optional.empty();
190 final List<Entry<ActorRef, Future<Object>>> futures = sendMessageToSuccessful(
191 new DataTreeCohortActor.Commit(txId));
192 changeStateFrom(State.PRE_COMMIT_SUCCESSFUL, State.COMMIT_SENT);
193 return Optional.of(processResponses(futures, State.COMMIT_SENT, State.COMMITED));
196 Optional<CompletionStage<?>> abort() {
197 LOG.debug("{}: abort - successfulFromPrevious: {}", txId, successfulFromPrevious);
199 state = State.ABORTED;
200 if (successfulFromPrevious.isEmpty()) {
201 return Optional.empty();
204 final DataTreeCohortActor.Abort message = new DataTreeCohortActor.Abort(txId);
205 final List<Future<Object>> futures = new ArrayList<>(successfulFromPrevious.size());
206 for (Success s : successfulFromPrevious) {
207 futures.add(Patterns.ask(s.getCohort(), message, timeout));
210 return Optional.of(FutureConverters.toJava(Futures.sequence(futures, ExecutionContexts.global())));
213 private List<Entry<ActorRef, Future<Object>>> sendMessageToSuccessful(final Object message) {
214 LOG.debug("{}: sendMesageToSuccessful: {}", txId, message);
216 final List<Entry<ActorRef, Future<Object>>> ret = new ArrayList<>(successfulFromPrevious.size());
217 for (Success s : successfulFromPrevious) {
218 final ActorRef actor = s.getCohort();
219 ret.add(new SimpleImmutableEntry<>(actor, Patterns.ask(actor, message, timeout)));
224 private @NonNull CompletionStage<Void> processResponses(final List<Entry<ActorRef, Future<Object>>> futures,
225 final State currentState, final State afterState) {
226 LOG.debug("{}: processResponses - currentState: {}, afterState: {}", txId, currentState, afterState);
227 final CompletableFuture<Void> returnFuture = new CompletableFuture<>();
228 Future<Iterable<Object>> aggregateFuture = Futures.sequence(Lists.transform(futures, Entry::getValue),
229 ExecutionContexts.global());
231 aggregateFuture.onComplete(new OnComplete<Iterable<Object>>() {
233 public void onComplete(final Throwable failure, final Iterable<Object> results) {
234 callbackExecutor.execute(
235 () -> processResponses(failure, results, currentState, afterState, returnFuture));
237 }, ExecutionContexts.global());
242 // FB issues violation for passing null to CompletableFuture#complete but it is valid and necessary when the
243 // generic type is Void.
244 @SuppressFBWarnings("NP_NONNULL_PARAM_VIOLATION")
245 private void processResponses(final Throwable failure, final Iterable<Object> results,
246 final State currentState, final State afterState, final CompletableFuture<Void> resultFuture) {
247 if (failure != null) {
248 successfulFromPrevious = Collections.emptyList();
249 resultFuture.completeExceptionally(failure);
253 final Collection<Failure> failed = new ArrayList<>(1);
254 final List<Success> successful = new ArrayList<>();
255 for (Object result : results) {
256 if (result instanceof DataTreeCohortActor.Success) {
257 successful.add((Success) result);
258 } else if (result instanceof Status.Failure) {
259 failed.add((Failure) result);
261 LOG.warn("{}: unrecognized response {}, ignoring it", txId, result);
265 LOG.debug("{}: processResponses - successful: {}, failed: {}", txId, successful, failed);
267 if (!failed.isEmpty()) {
268 changeStateFrom(currentState, State.FAILED);
269 final Iterator<Failure> it = failed.iterator();
270 final Throwable firstEx = it.next().cause();
271 while (it.hasNext()) {
272 firstEx.addSuppressed(it.next().cause());
275 successfulFromPrevious = Collections.emptyList();
276 resultFuture.completeExceptionally(firstEx);
278 successfulFromPrevious = successful;
279 changeStateFrom(currentState, afterState);
280 resultFuture.complete(null);
284 void changeStateFrom(final State expected, final State followup) {
285 Preconditions.checkState(state == expected);