2 * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
9 package org.opendaylight.controller.cluster.datastore;
11 import akka.actor.ActorRef;
12 import akka.actor.Status;
13 import akka.actor.Status.Failure;
14 import akka.dispatch.ExecutionContexts;
15 import akka.dispatch.Futures;
16 import akka.dispatch.OnComplete;
17 import akka.dispatch.Recover;
18 import akka.pattern.Patterns;
19 import akka.util.Timeout;
20 import com.google.common.base.Preconditions;
21 import com.google.common.collect.Lists;
22 import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
23 import java.util.AbstractMap.SimpleImmutableEntry;
24 import java.util.ArrayList;
25 import java.util.Collection;
26 import java.util.Collections;
27 import java.util.Iterator;
28 import java.util.List;
29 import java.util.Map.Entry;
30 import java.util.Optional;
31 import java.util.concurrent.CompletableFuture;
32 import java.util.concurrent.CompletionStage;
33 import java.util.concurrent.Executor;
34 import javax.annotation.Nonnull;
35 import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
36 import org.opendaylight.controller.cluster.datastore.DataTreeCohortActor.CanCommit;
37 import org.opendaylight.controller.cluster.datastore.DataTreeCohortActor.Success;
38 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidate;
39 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
40 import org.slf4j.Logger;
41 import org.slf4j.LoggerFactory;
42 import scala.compat.java8.FutureConverters;
43 import scala.concurrent.Future;
46 * Composite cohort, which coordinates multiple user-provided cohorts as if it was only one cohort.
48 * It tracks current operation and list of cohorts which successfuly finished previous phase in
49 * case, if abort is necessary to invoke it only on cohort steps which are still active.
52 class CompositeDataTreeCohort {
53 private static final Logger LOG = LoggerFactory.getLogger(CompositeDataTreeCohort.class);
57 * Cohorts are idle, no messages were sent.
61 * CanCommit message was sent to all participating cohorts.
65 * Successful canCommit responses were received from every participating cohort.
67 CAN_COMMIT_SUCCESSFUL,
69 * PreCommit message was sent to all participating cohorts.
73 * Successful preCommit responses were received from every participating cohort.
75 PRE_COMMIT_SUCCESSFUL,
77 * Commit message was send to all participating cohorts.
81 * Successful commit responses were received from all participating cohorts.
85 * Some of cohorts responded back with unsuccessful message.
89 * Abort message was send to all cohorts which responded with success previously.
94 static final Recover<Object> EXCEPTION_TO_MESSAGE = new Recover<Object>() {
96 public Failure recover(final Throwable error) {
97 return new Failure(error);
101 private final DataTreeCohortActorRegistry registry;
102 private final TransactionIdentifier txId;
103 private final SchemaContext schema;
104 private final Executor callbackExecutor;
105 private final Timeout timeout;
108 private List<Success> successfulFromPrevious = Collections.emptyList();
109 private State state = State.IDLE;
111 CompositeDataTreeCohort(final DataTreeCohortActorRegistry registry, final TransactionIdentifier transactionID,
112 final SchemaContext schema, final Executor callbackExecutor, final Timeout timeout) {
113 this.registry = Preconditions.checkNotNull(registry);
114 this.txId = Preconditions.checkNotNull(transactionID);
115 this.schema = Preconditions.checkNotNull(schema);
116 this.callbackExecutor = Preconditions.checkNotNull(callbackExecutor);
117 this.timeout = Preconditions.checkNotNull(timeout);
122 case CAN_COMMIT_SENT:
123 case CAN_COMMIT_SUCCESSFUL:
124 case PRE_COMMIT_SENT:
125 case PRE_COMMIT_SUCCESSFUL:
135 throw new IllegalStateException("Unhandled state " + state);
138 successfulFromPrevious = Collections.emptyList();
142 Optional<CompletionStage<Void>> canCommit(final DataTreeCandidate tip) {
143 if (LOG.isTraceEnabled()) {
144 LOG.trace("{}: canCommit - candidate: {}", txId, tip);
146 LOG.debug("{}: canCommit - candidate rootPath: {}", txId, tip.getRootPath());
149 final List<CanCommit> messages = registry.createCanCommitMessages(txId, tip, schema);
150 LOG.debug("{}: canCommit - messages: {}", txId, messages);
151 if (messages.isEmpty()) {
152 successfulFromPrevious = Collections.emptyList();
153 changeStateFrom(State.IDLE, State.CAN_COMMIT_SUCCESSFUL);
154 return Optional.empty();
157 final List<Entry<ActorRef, Future<Object>>> futures = new ArrayList<>(messages.size());
158 for (CanCommit message : messages) {
159 final ActorRef actor = message.getCohort();
160 final Future<Object> future = Patterns.ask(actor, message, timeout).recover(EXCEPTION_TO_MESSAGE,
161 ExecutionContexts.global());
162 LOG.trace("{}: requesting canCommit from {}", txId, actor);
163 futures.add(new SimpleImmutableEntry<>(actor, future));
166 changeStateFrom(State.IDLE, State.CAN_COMMIT_SENT);
167 return Optional.of(processResponses(futures, State.CAN_COMMIT_SENT, State.CAN_COMMIT_SUCCESSFUL));
170 Optional<CompletionStage<Void>> preCommit() {
171 LOG.debug("{}: preCommit - successfulFromPrevious: {}", txId, successfulFromPrevious);
173 if (successfulFromPrevious.isEmpty()) {
174 changeStateFrom(State.CAN_COMMIT_SUCCESSFUL, State.PRE_COMMIT_SUCCESSFUL);
175 return Optional.empty();
178 final List<Entry<ActorRef, Future<Object>>> futures = sendMessageToSuccessful(
179 new DataTreeCohortActor.PreCommit(txId));
180 changeStateFrom(State.CAN_COMMIT_SUCCESSFUL, State.PRE_COMMIT_SENT);
181 return Optional.of(processResponses(futures, State.PRE_COMMIT_SENT, State.PRE_COMMIT_SUCCESSFUL));
184 Optional<CompletionStage<Void>> commit() {
185 LOG.debug("{}: commit - successfulFromPrevious: {}", txId, successfulFromPrevious);
186 if (successfulFromPrevious.isEmpty()) {
187 changeStateFrom(State.PRE_COMMIT_SUCCESSFUL, State.COMMITED);
188 return Optional.empty();
191 final List<Entry<ActorRef, Future<Object>>> futures = sendMessageToSuccessful(
192 new DataTreeCohortActor.Commit(txId));
193 changeStateFrom(State.PRE_COMMIT_SUCCESSFUL, State.COMMIT_SENT);
194 return Optional.of(processResponses(futures, State.COMMIT_SENT, State.COMMITED));
197 Optional<CompletionStage<?>> abort() {
198 LOG.debug("{}: abort - successfulFromPrevious: {}", txId, successfulFromPrevious);
200 state = State.ABORTED;
201 if (successfulFromPrevious.isEmpty()) {
202 return Optional.empty();
205 final DataTreeCohortActor.Abort message = new DataTreeCohortActor.Abort(txId);
206 final List<Future<Object>> futures = new ArrayList<>(successfulFromPrevious.size());
207 for (Success s : successfulFromPrevious) {
208 futures.add(Patterns.ask(s.getCohort(), message, timeout));
211 return Optional.of(FutureConverters.toJava(Futures.sequence(futures, ExecutionContexts.global())));
214 private List<Entry<ActorRef, Future<Object>>> sendMessageToSuccessful(final Object message) {
215 LOG.debug("{}: sendMesageToSuccessful: {}", txId, message);
217 final List<Entry<ActorRef, Future<Object>>> ret = new ArrayList<>(successfulFromPrevious.size());
218 for (Success s : successfulFromPrevious) {
219 final ActorRef actor = s.getCohort();
220 ret.add(new SimpleImmutableEntry<>(actor, Patterns.ask(actor, message, timeout)));
226 private CompletionStage<Void> processResponses(final List<Entry<ActorRef, Future<Object>>> futures,
227 final State currentState, final State afterState) {
228 LOG.debug("{}: processResponses - currentState: {}, afterState: {}", txId, currentState, afterState);
229 final CompletableFuture<Void> returnFuture = new CompletableFuture<>();
230 Future<Iterable<Object>> aggregateFuture = Futures.sequence(Lists.transform(futures, Entry::getValue),
231 ExecutionContexts.global());
233 aggregateFuture.onComplete(new OnComplete<Iterable<Object>>() {
235 public void onComplete(Throwable failure, Iterable<Object> results) {
236 callbackExecutor.execute(
237 () -> processResponses(failure, results, currentState, afterState, returnFuture));
239 }, ExecutionContexts.global());
244 // FB issues violation for passing null to CompletableFuture#complete but it is valid and necessary when the
245 // generic type is Void.
246 @SuppressFBWarnings("NP_NONNULL_PARAM_VIOLATION")
247 private void processResponses(Throwable failure, Iterable<Object> results, State currentState, State afterState,
248 CompletableFuture<Void> resultFuture) {
249 if (failure != null) {
250 successfulFromPrevious = Collections.emptyList();
251 resultFuture.completeExceptionally(failure);
255 final Collection<Failure> failed = new ArrayList<>(1);
256 final List<Success> successful = new ArrayList<>();
257 for (Object result : results) {
258 if (result instanceof DataTreeCohortActor.Success) {
259 successful.add((Success) result);
260 } else if (result instanceof Status.Failure) {
261 failed.add((Failure) result);
263 LOG.warn("{}: unrecognized response {}, ignoring it", result);
267 LOG.debug("{}: processResponses - successful: {}, failed: {}", txId, successful, failed);
269 if (!failed.isEmpty()) {
270 changeStateFrom(currentState, State.FAILED);
271 final Iterator<Failure> it = failed.iterator();
272 final Throwable firstEx = it.next().cause();
273 while (it.hasNext()) {
274 firstEx.addSuppressed(it.next().cause());
277 successfulFromPrevious = Collections.emptyList();
278 resultFuture.completeExceptionally(firstEx);
280 successfulFromPrevious = successful;
281 changeStateFrom(currentState, afterState);
282 resultFuture.complete(null);
286 void changeStateFrom(final State expected, final State followup) {
287 Preconditions.checkState(state == expected);