BUG-5280: refactor CohortEntry
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / main / java / org / opendaylight / controller / cluster / datastore / CompositeDataTreeCohort.java
1 /*
2  * Copyright (c) 2016 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.controller.cluster.datastore;
10
11 import akka.actor.Status;
12 import akka.actor.Status.Failure;
13 import akka.dispatch.ExecutionContexts;
14 import akka.dispatch.Futures;
15 import akka.dispatch.Recover;
16 import akka.japi.Function;
17 import akka.pattern.Patterns;
18 import akka.util.Timeout;
19 import com.google.common.base.Preconditions;
20 import com.google.common.base.Throwables;
21 import com.google.common.collect.Iterables;
22 import java.util.Collection;
23 import java.util.Iterator;
24 import java.util.concurrent.ExecutionException;
25 import java.util.concurrent.TimeoutException;
26 import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
27 import org.opendaylight.controller.cluster.datastore.DataTreeCohortActor.CanCommit;
28 import org.opendaylight.controller.cluster.datastore.DataTreeCohortActor.Success;
29 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidateTip;
30 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
31 import scala.concurrent.Await;
32 import scala.concurrent.Future;
33
34 /**
35  *
36  * Composite cohort, which coordinates multiple user-provided cohorts as if it was only one cohort.
37  *
38  * It tracks current operation and list of cohorts which successfuly finished previous phase in
39  * case, if abort is necessary to invoke it only on cohort steps which are still active.
40  *
41  */
42 class CompositeDataTreeCohort {
43
44     private enum State {
45         /**
46          * Cohorts are idle, no messages were sent.
47          */
48         IDLE,
49         /**
50          * CanCommit message was sent to all participating cohorts.
51          */
52         CAN_COMMIT_SENT,
53         /**
54          * Successful canCommit responses were received from every participating cohort.
55          */
56         CAN_COMMIT_SUCCESSFUL,
57         /**
58          * PreCommit message was sent to all participating cohorts.
59          */
60         PRE_COMMIT_SENT,
61         /**
62          * Successful preCommit responses were received from every participating cohort.
63          */
64         PRE_COMMIT_SUCCESSFUL,
65         /**
66          * Commit message was send to all participating cohorts.
67          */
68         COMMIT_SENT,
69         /**
70          * Successful commit responses were received from all participating cohorts.
71          */
72         COMMITED,
73         /**
74          * Some of cohorts responsed back with unsuccessful message.
75          *
76          */
77         FAILED,
78         /**
79          *
80          * Abort message was send to all cohorts which responded with success previously.
81          *
82          */
83         ABORTED
84     }
85
86     protected static final Recover<Object> EXCEPTION_TO_MESSAGE = new Recover<Object>() {
87         @Override
88         public Failure recover(Throwable error) throws Throwable {
89             return new Failure(error);
90         }
91     };
92
93
94     private final DataTreeCohortActorRegistry registry;
95     private final TransactionIdentifier txId;
96     private final SchemaContext schema;
97     private final Timeout timeout;
98     private Iterable<Success> successfulFromPrevious;
99     private State state = State.IDLE;
100
101     CompositeDataTreeCohort(DataTreeCohortActorRegistry registry, TransactionIdentifier transactionID,
102         SchemaContext schema, Timeout timeout) {
103         this.registry = Preconditions.checkNotNull(registry);
104         this.txId = Preconditions.checkNotNull(transactionID);
105         this.schema = Preconditions.checkNotNull(schema);
106         this.timeout = Preconditions.checkNotNull(timeout);
107     }
108
109     void canCommit(DataTreeCandidateTip tip) throws ExecutionException, TimeoutException {
110         Collection<CanCommit> messages = registry.createCanCommitMessages(txId, tip, schema);
111         // FIXME: Optimize empty collection list with pre-created futures, containing success.
112         Future<Iterable<Object>> canCommitsFuture =
113                 Futures.traverse(messages, new Function<CanCommit, Future<Object>>() {
114                     @Override
115                     public Future<Object> apply(CanCommit input) {
116                         return Patterns.ask(input.getCohort(), input, timeout).recover(EXCEPTION_TO_MESSAGE,
117                                 ExecutionContexts.global());
118                     }
119                 }, ExecutionContexts.global());
120         changeStateFrom(State.IDLE, State.CAN_COMMIT_SENT);
121         processResponses(canCommitsFuture, State.CAN_COMMIT_SENT, State.CAN_COMMIT_SUCCESSFUL);
122     }
123
124     void preCommit() throws ExecutionException, TimeoutException {
125         Preconditions.checkState(successfulFromPrevious != null);
126         Future<Iterable<Object>> preCommitFutures = sendMesageToSuccessful(new DataTreeCohortActor.PreCommit(txId));
127         changeStateFrom(State.CAN_COMMIT_SUCCESSFUL, State.PRE_COMMIT_SENT);
128         processResponses(preCommitFutures, State.PRE_COMMIT_SENT, State.PRE_COMMIT_SUCCESSFUL);
129     }
130
131     void commit() throws ExecutionException, TimeoutException {
132         Preconditions.checkState(successfulFromPrevious != null);
133         Future<Iterable<Object>> commitsFuture = sendMesageToSuccessful(new DataTreeCohortActor.Commit(txId));
134         changeStateFrom(State.PRE_COMMIT_SUCCESSFUL, State.COMMIT_SENT);
135         processResponses(commitsFuture, State.COMMIT_SENT, State.COMMITED);
136     }
137
138     void abort() throws TimeoutException {
139         if (successfulFromPrevious != null) {
140             sendMesageToSuccessful(new DataTreeCohortActor.Abort(txId));
141         }
142     }
143
144     private Future<Iterable<Object>> sendMesageToSuccessful(final Object message) {
145         return Futures.traverse(successfulFromPrevious, new Function<DataTreeCohortActor.Success, Future<Object>>() {
146
147             @Override
148             public Future<Object> apply(DataTreeCohortActor.Success cohortResponse) throws Exception {
149                 return Patterns.ask(cohortResponse.getCohort(), message, timeout);
150             }
151
152         }, ExecutionContexts.global());
153     }
154
155     private void processResponses(Future<Iterable<Object>> resultsFuture, State currentState, State afterState)
156             throws TimeoutException, ExecutionException {
157         final Iterable<Object> results;
158         try {
159             results = Await.result(resultsFuture, timeout.duration());
160         } catch (Exception e) {
161             successfulFromPrevious = null;
162             Throwables.propagateIfInstanceOf(e, TimeoutException.class);
163             throw Throwables.propagate(e);
164         }
165         Iterable<Failure> failed = Iterables.filter(results, Status.Failure.class);
166         Iterable<Success> successful = Iterables.filter(results, DataTreeCohortActor.Success.class);
167         successfulFromPrevious = successful;
168         if (!Iterables.isEmpty(failed)) {
169             changeStateFrom(currentState, State.FAILED);
170             Iterator<Failure> it = failed.iterator();
171             Throwable firstEx = it.next().cause();
172             while (it.hasNext()) {
173                 firstEx.addSuppressed(it.next().cause());
174             }
175             Throwables.propagateIfPossible(firstEx, ExecutionException.class);
176             Throwables.propagateIfPossible(firstEx, TimeoutException.class);
177             throw Throwables.propagate(firstEx);
178         }
179         changeStateFrom(currentState, afterState);
180     }
181
182     void changeStateFrom(State expected, State followup) {
183         Preconditions.checkState(state == expected);
184         state = followup;
185     }
186 }