2 * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
9 package org.opendaylight.controller.cluster.datastore;
11 import akka.actor.Status;
12 import akka.actor.Status.Failure;
13 import akka.dispatch.ExecutionContexts;
14 import akka.dispatch.Futures;
15 import akka.dispatch.Recover;
16 import akka.japi.Function;
17 import akka.pattern.Patterns;
18 import akka.util.Timeout;
19 import com.google.common.base.Preconditions;
20 import com.google.common.base.Throwables;
21 import com.google.common.collect.Iterables;
22 import java.util.Collection;
23 import java.util.Iterator;
24 import java.util.concurrent.ExecutionException;
25 import java.util.concurrent.TimeoutException;
26 import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
27 import org.opendaylight.controller.cluster.datastore.DataTreeCohortActor.CanCommit;
28 import org.opendaylight.controller.cluster.datastore.DataTreeCohortActor.Success;
29 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidateTip;
30 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
31 import scala.concurrent.Await;
32 import scala.concurrent.Future;
36 * Composite cohort, which coordinates multiple user-provided cohorts as if it was only one cohort.
38 * It tracks current operation and list of cohorts which successfuly finished previous phase in
39 * case, if abort is necessary to invoke it only on cohort steps which are still active.
42 class CompositeDataTreeCohort {
46 * Cohorts are idle, no messages were sent.
50 * CanCommit message was sent to all participating cohorts.
54 * Successful canCommit responses were received from every participating cohort.
56 CAN_COMMIT_SUCCESSFUL,
58 * PreCommit message was sent to all participating cohorts.
62 * Successful preCommit responses were received from every participating cohort.
64 PRE_COMMIT_SUCCESSFUL,
66 * Commit message was send to all participating cohorts.
70 * Successful commit responses were received from all participating cohorts.
74 * Some of cohorts responsed back with unsuccessful message.
80 * Abort message was send to all cohorts which responded with success previously.
86 protected static final Recover<Object> EXCEPTION_TO_MESSAGE = new Recover<Object>() {
88 public Failure recover(Throwable error) throws Throwable {
89 return new Failure(error);
94 private final DataTreeCohortActorRegistry registry;
95 private final TransactionIdentifier txId;
96 private final SchemaContext schema;
97 private final Timeout timeout;
98 private Iterable<Success> successfulFromPrevious;
99 private State state = State.IDLE;
101 CompositeDataTreeCohort(DataTreeCohortActorRegistry registry, TransactionIdentifier transactionID,
102 SchemaContext schema, Timeout timeout) {
103 this.registry = Preconditions.checkNotNull(registry);
104 this.txId = Preconditions.checkNotNull(transactionID);
105 this.schema = Preconditions.checkNotNull(schema);
106 this.timeout = Preconditions.checkNotNull(timeout);
109 void canCommit(DataTreeCandidateTip tip) throws ExecutionException, TimeoutException {
110 Collection<CanCommit> messages = registry.createCanCommitMessages(txId, tip, schema);
111 // FIXME: Optimize empty collection list with pre-created futures, containing success.
112 Future<Iterable<Object>> canCommitsFuture =
113 Futures.traverse(messages, new Function<CanCommit, Future<Object>>() {
115 public Future<Object> apply(CanCommit input) {
116 return Patterns.ask(input.getCohort(), input, timeout).recover(EXCEPTION_TO_MESSAGE,
117 ExecutionContexts.global());
119 }, ExecutionContexts.global());
120 changeStateFrom(State.IDLE, State.CAN_COMMIT_SENT);
121 processResponses(canCommitsFuture, State.CAN_COMMIT_SENT, State.CAN_COMMIT_SUCCESSFUL);
124 void preCommit() throws ExecutionException, TimeoutException {
125 Preconditions.checkState(successfulFromPrevious != null);
126 Future<Iterable<Object>> preCommitFutures = sendMesageToSuccessful(new DataTreeCohortActor.PreCommit(txId));
127 changeStateFrom(State.CAN_COMMIT_SUCCESSFUL, State.PRE_COMMIT_SENT);
128 processResponses(preCommitFutures, State.PRE_COMMIT_SENT, State.PRE_COMMIT_SUCCESSFUL);
131 void commit() throws ExecutionException, TimeoutException {
132 Preconditions.checkState(successfulFromPrevious != null);
133 Future<Iterable<Object>> commitsFuture = sendMesageToSuccessful(new DataTreeCohortActor.Commit(txId));
134 changeStateFrom(State.PRE_COMMIT_SUCCESSFUL, State.COMMIT_SENT);
135 processResponses(commitsFuture, State.COMMIT_SENT, State.COMMITED);
138 void abort() throws TimeoutException {
139 if (successfulFromPrevious != null) {
140 sendMesageToSuccessful(new DataTreeCohortActor.Abort(txId));
144 private Future<Iterable<Object>> sendMesageToSuccessful(final Object message) {
145 return Futures.traverse(successfulFromPrevious, new Function<DataTreeCohortActor.Success, Future<Object>>() {
148 public Future<Object> apply(DataTreeCohortActor.Success cohortResponse) throws Exception {
149 return Patterns.ask(cohortResponse.getCohort(), message, timeout);
152 }, ExecutionContexts.global());
155 private void processResponses(Future<Iterable<Object>> resultsFuture, State currentState, State afterState)
156 throws TimeoutException, ExecutionException {
157 final Iterable<Object> results;
159 results = Await.result(resultsFuture, timeout.duration());
160 } catch (Exception e) {
161 successfulFromPrevious = null;
162 Throwables.propagateIfInstanceOf(e, TimeoutException.class);
163 throw Throwables.propagate(e);
165 Iterable<Failure> failed = Iterables.filter(results, Status.Failure.class);
166 Iterable<Success> successful = Iterables.filter(results, DataTreeCohortActor.Success.class);
167 successfulFromPrevious = successful;
168 if (!Iterables.isEmpty(failed)) {
169 changeStateFrom(currentState, State.FAILED);
170 Iterator<Failure> it = failed.iterator();
171 Throwable firstEx = it.next().cause();
172 while (it.hasNext()) {
173 firstEx.addSuppressed(it.next().cause());
175 Throwables.propagateIfPossible(firstEx, ExecutionException.class);
176 Throwables.propagateIfPossible(firstEx, TimeoutException.class);
177 throw Throwables.propagate(firstEx);
179 changeStateFrom(currentState, afterState);
182 void changeStateFrom(State expected, State followup) {
183 Preconditions.checkState(state == expected);