Rework CDS commit cohort impl to handle yang lists
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / main / java / org / opendaylight / controller / cluster / datastore / CompositeDataTreeCohort.java
1 /*
2  * Copyright (c) 2016 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.controller.cluster.datastore;
10
11 import akka.actor.Status;
12 import akka.actor.Status.Failure;
13 import akka.dispatch.ExecutionContexts;
14 import akka.dispatch.Futures;
15 import akka.dispatch.Recover;
16 import akka.pattern.Patterns;
17 import akka.util.Timeout;
18 import com.google.common.base.Preconditions;
19 import com.google.common.base.Throwables;
20 import com.google.common.collect.Iterables;
21 import java.util.Collection;
22 import java.util.Iterator;
23 import java.util.Optional;
24 import java.util.concurrent.ExecutionException;
25 import java.util.concurrent.TimeoutException;
26 import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
27 import org.opendaylight.controller.cluster.datastore.DataTreeCohortActor.CanCommit;
28 import org.opendaylight.controller.cluster.datastore.DataTreeCohortActor.Success;
29 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidate;
30 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
31 import org.slf4j.Logger;
32 import org.slf4j.LoggerFactory;
33 import scala.concurrent.Await;
34 import scala.concurrent.Future;
35
36 /**
37  * Composite cohort, which coordinates multiple user-provided cohorts as if it was only one cohort.
38  * <p/>
39  * It tracks current operation and list of cohorts which successfuly finished previous phase in
40  * case, if abort is necessary to invoke it only on cohort steps which are still active.
41  *
42  */
43 class CompositeDataTreeCohort {
44     private static final Logger LOG = LoggerFactory.getLogger(CompositeDataTreeCohort.class);
45
46     private enum State {
47         /**
48          * Cohorts are idle, no messages were sent.
49          */
50         IDLE,
51         /**
52          * CanCommit message was sent to all participating cohorts.
53          */
54         CAN_COMMIT_SENT,
55         /**
56          * Successful canCommit responses were received from every participating cohort.
57          */
58         CAN_COMMIT_SUCCESSFUL,
59         /**
60          * PreCommit message was sent to all participating cohorts.
61          */
62         PRE_COMMIT_SENT,
63         /**
64          * Successful preCommit responses were received from every participating cohort.
65          */
66         PRE_COMMIT_SUCCESSFUL,
67         /**
68          * Commit message was send to all participating cohorts.
69          */
70         COMMIT_SENT,
71         /**
72          * Successful commit responses were received from all participating cohorts.
73          */
74         COMMITED,
75         /**
76          * Some of cohorts responded back with unsuccessful message.
77          */
78         FAILED,
79         /**
80          * Abort message was send to all cohorts which responded with success previously.
81          */
82         ABORTED
83     }
84
85     protected static final Recover<Object> EXCEPTION_TO_MESSAGE = new Recover<Object>() {
86         @Override
87         public Failure recover(final Throwable error) throws Throwable {
88             return new Failure(error);
89         }
90     };
91
92
93     private final DataTreeCohortActorRegistry registry;
94     private final TransactionIdentifier txId;
95     private final SchemaContext schema;
96     private final Timeout timeout;
97     private Iterable<Success> successfulFromPrevious;
98     private State state = State.IDLE;
99
100     CompositeDataTreeCohort(final DataTreeCohortActorRegistry registry, final TransactionIdentifier transactionID,
101         final SchemaContext schema, final Timeout timeout) {
102         this.registry = Preconditions.checkNotNull(registry);
103         this.txId = Preconditions.checkNotNull(transactionID);
104         this.schema = Preconditions.checkNotNull(schema);
105         this.timeout = Preconditions.checkNotNull(timeout);
106     }
107
108     void reset() {
109         switch (state) {
110             case CAN_COMMIT_SENT:
111             case CAN_COMMIT_SUCCESSFUL:
112             case PRE_COMMIT_SENT:
113             case PRE_COMMIT_SUCCESSFUL:
114             case COMMIT_SENT:
115                 abort();
116                 break;
117             default :
118                 break;
119         }
120
121         successfulFromPrevious = null;
122         state = State.IDLE;
123     }
124
125     void canCommit(final DataTreeCandidate tip) throws ExecutionException, TimeoutException {
126         LOG.debug("{}: canCommit -  candidate: {}", txId, tip);
127
128         Collection<CanCommit> messages = registry.createCanCommitMessages(txId, tip, schema);
129
130         LOG.debug("{}: canCommit - messages: {}", txId, messages);
131
132         // FIXME: Optimize empty collection list with pre-created futures, containing success.
133         Future<Iterable<Object>> canCommitsFuture = Futures.traverse(messages,
134             input -> Patterns.ask(input.getCohort(), input, timeout).recover(EXCEPTION_TO_MESSAGE,
135                     ExecutionContexts.global()), ExecutionContexts.global());
136         changeStateFrom(State.IDLE, State.CAN_COMMIT_SENT);
137         processResponses(canCommitsFuture, State.CAN_COMMIT_SENT, State.CAN_COMMIT_SUCCESSFUL);
138     }
139
140     void preCommit() throws ExecutionException, TimeoutException {
141         LOG.debug("{}: preCommit - successfulFromPrevious: {}", txId, successfulFromPrevious);
142
143         Preconditions.checkState(successfulFromPrevious != null);
144         Future<Iterable<Object>> preCommitFutures = sendMesageToSuccessful(new DataTreeCohortActor.PreCommit(txId));
145         changeStateFrom(State.CAN_COMMIT_SUCCESSFUL, State.PRE_COMMIT_SENT);
146         processResponses(preCommitFutures, State.PRE_COMMIT_SENT, State.PRE_COMMIT_SUCCESSFUL);
147     }
148
149     void commit() throws ExecutionException, TimeoutException {
150         LOG.debug("{}: commit - successfulFromPrevious: {}", txId, successfulFromPrevious);
151
152         Preconditions.checkState(successfulFromPrevious != null);
153         Future<Iterable<Object>> commitsFuture = sendMesageToSuccessful(new DataTreeCohortActor.Commit(txId));
154         changeStateFrom(State.PRE_COMMIT_SUCCESSFUL, State.COMMIT_SENT);
155         processResponses(commitsFuture, State.COMMIT_SENT, State.COMMITED);
156     }
157
158     Optional<Future<Iterable<Object>>> abort() {
159         LOG.debug("{}: abort - successfulFromPrevious: {}", txId, successfulFromPrevious);
160
161         state = State.ABORTED;
162         if (successfulFromPrevious != null && !Iterables.isEmpty(successfulFromPrevious)) {
163             return Optional.of(sendMesageToSuccessful(new DataTreeCohortActor.Abort(txId)));
164         }
165
166         return Optional.empty();
167     }
168
169     private Future<Iterable<Object>> sendMesageToSuccessful(final Object message) {
170         LOG.debug("{}: sendMesageToSuccessful: {}", txId, message);
171
172         return Futures.traverse(successfulFromPrevious, cohortResponse -> Patterns.ask(
173                 cohortResponse.getCohort(), message, timeout), ExecutionContexts.global());
174     }
175
176     @SuppressWarnings("checkstyle:IllegalCatch")
177     private void processResponses(final Future<Iterable<Object>> resultsFuture, final State currentState,
178             final State afterState) throws TimeoutException, ExecutionException {
179         LOG.debug("{}: processResponses - currentState: {}, afterState: {}", txId, currentState, afterState);
180
181         final Iterable<Object> results;
182         try {
183             results = Await.result(resultsFuture, timeout.duration());
184         } catch (Exception e) {
185             successfulFromPrevious = null;
186             LOG.debug("{}: processResponses - error from Future", txId, e);
187             Throwables.propagateIfInstanceOf(e, TimeoutException.class);
188             throw Throwables.propagate(e);
189         }
190         Iterable<Failure> failed = Iterables.filter(results, Status.Failure.class);
191         Iterable<Success> successful = Iterables.filter(results, DataTreeCohortActor.Success.class);
192
193         LOG.debug("{}: processResponses - successful: {}, failed: {}", txId, successful, failed);
194
195         successfulFromPrevious = successful;
196         if (!Iterables.isEmpty(failed)) {
197             changeStateFrom(currentState, State.FAILED);
198             Iterator<Failure> it = failed.iterator();
199             Throwable firstEx = it.next().cause();
200             while (it.hasNext()) {
201                 firstEx.addSuppressed(it.next().cause());
202             }
203             Throwables.propagateIfPossible(firstEx, ExecutionException.class);
204             Throwables.propagateIfPossible(firstEx, TimeoutException.class);
205             throw Throwables.propagate(firstEx);
206         }
207         changeStateFrom(currentState, afterState);
208     }
209
210     void changeStateFrom(final State expected, final State followup) {
211         Preconditions.checkState(state == expected);
212         state = followup;
213     }
214 }