BUG-7033: Implement pipe-lining in ShardDataTree
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / main / java / org / opendaylight / controller / cluster / datastore / CompositeDataTreeCohort.java
1 /*
2  * Copyright (c) 2016 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.controller.cluster.datastore;
10
11 import akka.actor.Status;
12 import akka.actor.Status.Failure;
13 import akka.dispatch.ExecutionContexts;
14 import akka.dispatch.Futures;
15 import akka.dispatch.Recover;
16 import akka.pattern.Patterns;
17 import akka.util.Timeout;
18 import com.google.common.base.Preconditions;
19 import com.google.common.base.Throwables;
20 import com.google.common.collect.Iterables;
21 import java.util.Collection;
22 import java.util.Iterator;
23 import java.util.Optional;
24 import java.util.concurrent.ExecutionException;
25 import java.util.concurrent.TimeoutException;
26 import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
27 import org.opendaylight.controller.cluster.datastore.DataTreeCohortActor.CanCommit;
28 import org.opendaylight.controller.cluster.datastore.DataTreeCohortActor.Success;
29 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidate;
30 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
31 import scala.concurrent.Await;
32 import scala.concurrent.Future;
33
34 /**
35  * Composite cohort, which coordinates multiple user-provided cohorts as if it was only one cohort.
36  * <p/>
37  * It tracks current operation and list of cohorts which successfuly finished previous phase in
38  * case, if abort is necessary to invoke it only on cohort steps which are still active.
39  *
40  */
41 class CompositeDataTreeCohort {
42
43     private enum State {
44         /**
45          * Cohorts are idle, no messages were sent.
46          */
47         IDLE,
48         /**
49          * CanCommit message was sent to all participating cohorts.
50          */
51         CAN_COMMIT_SENT,
52         /**
53          * Successful canCommit responses were received from every participating cohort.
54          */
55         CAN_COMMIT_SUCCESSFUL,
56         /**
57          * PreCommit message was sent to all participating cohorts.
58          */
59         PRE_COMMIT_SENT,
60         /**
61          * Successful preCommit responses were received from every participating cohort.
62          */
63         PRE_COMMIT_SUCCESSFUL,
64         /**
65          * Commit message was send to all participating cohorts.
66          */
67         COMMIT_SENT,
68         /**
69          * Successful commit responses were received from all participating cohorts.
70          */
71         COMMITED,
72         /**
73          * Some of cohorts responded back with unsuccessful message.
74          */
75         FAILED,
76         /**
77          * Abort message was send to all cohorts which responded with success previously.
78          */
79         ABORTED
80     }
81
82     protected static final Recover<Object> EXCEPTION_TO_MESSAGE = new Recover<Object>() {
83         @Override
84         public Failure recover(final Throwable error) throws Throwable {
85             return new Failure(error);
86         }
87     };
88
89
90     private final DataTreeCohortActorRegistry registry;
91     private final TransactionIdentifier txId;
92     private final SchemaContext schema;
93     private final Timeout timeout;
94     private Iterable<Success> successfulFromPrevious;
95     private State state = State.IDLE;
96
97     CompositeDataTreeCohort(final DataTreeCohortActorRegistry registry, final TransactionIdentifier transactionID,
98         final SchemaContext schema, final Timeout timeout) {
99         this.registry = Preconditions.checkNotNull(registry);
100         this.txId = Preconditions.checkNotNull(transactionID);
101         this.schema = Preconditions.checkNotNull(schema);
102         this.timeout = Preconditions.checkNotNull(timeout);
103     }
104
105     void reset() {
106         switch (state) {
107             case CAN_COMMIT_SENT:
108             case CAN_COMMIT_SUCCESSFUL:
109             case PRE_COMMIT_SENT:
110             case PRE_COMMIT_SUCCESSFUL:
111             case COMMIT_SENT:
112                 abort();
113                 break;
114             default :
115                 break;
116         }
117
118         successfulFromPrevious = null;
119         state = State.IDLE;
120     }
121
122     void canCommit(final DataTreeCandidate tip) throws ExecutionException, TimeoutException {
123         Collection<CanCommit> messages = registry.createCanCommitMessages(txId, tip, schema);
124         // FIXME: Optimize empty collection list with pre-created futures, containing success.
125         Future<Iterable<Object>> canCommitsFuture = Futures.traverse(messages,
126             input -> Patterns.ask(input.getCohort(), input, timeout).recover(EXCEPTION_TO_MESSAGE,
127                     ExecutionContexts.global()), ExecutionContexts.global());
128         changeStateFrom(State.IDLE, State.CAN_COMMIT_SENT);
129         processResponses(canCommitsFuture, State.CAN_COMMIT_SENT, State.CAN_COMMIT_SUCCESSFUL);
130     }
131
132     void preCommit() throws ExecutionException, TimeoutException {
133         Preconditions.checkState(successfulFromPrevious != null);
134         Future<Iterable<Object>> preCommitFutures = sendMesageToSuccessful(new DataTreeCohortActor.PreCommit(txId));
135         changeStateFrom(State.CAN_COMMIT_SUCCESSFUL, State.PRE_COMMIT_SENT);
136         processResponses(preCommitFutures, State.PRE_COMMIT_SENT, State.PRE_COMMIT_SUCCESSFUL);
137     }
138
139     void commit() throws ExecutionException, TimeoutException {
140         Preconditions.checkState(successfulFromPrevious != null);
141         Future<Iterable<Object>> commitsFuture = sendMesageToSuccessful(new DataTreeCohortActor.Commit(txId));
142         changeStateFrom(State.PRE_COMMIT_SUCCESSFUL, State.COMMIT_SENT);
143         processResponses(commitsFuture, State.COMMIT_SENT, State.COMMITED);
144     }
145
146     Optional<Future<Iterable<Object>>> abort() {
147         state = State.ABORTED;
148         if (successfulFromPrevious != null && !Iterables.isEmpty(successfulFromPrevious)) {
149             return Optional.of(sendMesageToSuccessful(new DataTreeCohortActor.Abort(txId)));
150         }
151
152         return Optional.empty();
153     }
154
155     private Future<Iterable<Object>> sendMesageToSuccessful(final Object message) {
156         return Futures.traverse(successfulFromPrevious, cohortResponse -> Patterns.ask(
157                 cohortResponse.getCohort(), message, timeout), ExecutionContexts.global());
158     }
159
160     @SuppressWarnings("checkstyle:IllegalCatch")
161     private void processResponses(final Future<Iterable<Object>> resultsFuture, final State currentState,
162             final State afterState) throws TimeoutException, ExecutionException {
163         final Iterable<Object> results;
164         try {
165             results = Await.result(resultsFuture, timeout.duration());
166         } catch (Exception e) {
167             successfulFromPrevious = null;
168             Throwables.propagateIfInstanceOf(e, TimeoutException.class);
169             throw Throwables.propagate(e);
170         }
171         Iterable<Failure> failed = Iterables.filter(results, Status.Failure.class);
172         Iterable<Success> successful = Iterables.filter(results, DataTreeCohortActor.Success.class);
173         successfulFromPrevious = successful;
174         if (!Iterables.isEmpty(failed)) {
175             changeStateFrom(currentState, State.FAILED);
176             Iterator<Failure> it = failed.iterator();
177             Throwable firstEx = it.next().cause();
178             while (it.hasNext()) {
179                 firstEx.addSuppressed(it.next().cause());
180             }
181             Throwables.propagateIfPossible(firstEx, ExecutionException.class);
182             Throwables.propagateIfPossible(firstEx, TimeoutException.class);
183             throw Throwables.propagate(firstEx);
184         }
185         changeStateFrom(currentState, afterState);
186     }
187
188     void changeStateFrom(final State expected, final State followup) {
189         Preconditions.checkState(state == expected);
190         state = followup;
191     }
192 }