BUG-8219: optimize empty CompositeDataTreeCohort case
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / main / java / org / opendaylight / controller / cluster / datastore / CompositeDataTreeCohort.java
1 /*
2  * Copyright (c) 2016 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.controller.cluster.datastore;
10
11 import akka.actor.Status;
12 import akka.actor.Status.Failure;
13 import akka.dispatch.ExecutionContexts;
14 import akka.dispatch.Futures;
15 import akka.dispatch.Recover;
16 import akka.pattern.Patterns;
17 import akka.util.Timeout;
18 import com.google.common.base.Preconditions;
19 import com.google.common.base.Throwables;
20 import com.google.common.collect.ImmutableList;
21 import com.google.common.collect.Iterables;
22 import java.util.Collection;
23 import java.util.Iterator;
24 import java.util.Optional;
25 import java.util.concurrent.ExecutionException;
26 import java.util.concurrent.TimeoutException;
27 import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
28 import org.opendaylight.controller.cluster.datastore.DataTreeCohortActor.CanCommit;
29 import org.opendaylight.controller.cluster.datastore.DataTreeCohortActor.Success;
30 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidate;
31 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
32 import scala.concurrent.Await;
33 import scala.concurrent.Future;
34
35 /**
36  *
37  * Composite cohort, which coordinates multiple user-provided cohorts as if it was only one cohort.
38  *
39  * It tracks current operation and list of cohorts which successfuly finished previous phase in
40  * case, if abort is necessary to invoke it only on cohort steps which are still active.
41  *
42  */
43 class CompositeDataTreeCohort {
44
45     private enum State {
46         /**
47          * Cohorts are idle, no messages were sent.
48          */
49         IDLE,
50         /**
51          * CanCommit message was sent to all participating cohorts.
52          */
53         CAN_COMMIT_SENT,
54         /**
55          * Successful canCommit responses were received from every participating cohort.
56          */
57         CAN_COMMIT_SUCCESSFUL,
58         /**
59          * PreCommit message was sent to all participating cohorts.
60          */
61         PRE_COMMIT_SENT,
62         /**
63          * Successful preCommit responses were received from every participating cohort.
64          */
65         PRE_COMMIT_SUCCESSFUL,
66         /**
67          * Commit message was send to all participating cohorts.
68          */
69         COMMIT_SENT,
70         /**
71          * Successful commit responses were received from all participating cohorts.
72          */
73         COMMITED,
74         /**
75          * Some of cohorts responsed back with unsuccessful message.
76          *
77          */
78         FAILED,
79         /**
80          *
81          * Abort message was send to all cohorts which responded with success previously.
82          *
83          */
84         ABORTED
85     }
86
87     protected static final Recover<Object> EXCEPTION_TO_MESSAGE = new Recover<Object>() {
88         @Override
89         public Failure recover(final Throwable error) throws Throwable {
90             return new Failure(error);
91         }
92     };
93
94     private final DataTreeCohortActorRegistry registry;
95     private final TransactionIdentifier txId;
96     private final SchemaContext schema;
97     private final Timeout timeout;
98     private Iterable<Success> successfulFromPrevious;
99     private State state = State.IDLE;
100
101     CompositeDataTreeCohort(final DataTreeCohortActorRegistry registry, final TransactionIdentifier transactionID,
102         final SchemaContext schema, final Timeout timeout) {
103         this.registry = Preconditions.checkNotNull(registry);
104         this.txId = Preconditions.checkNotNull(transactionID);
105         this.schema = Preconditions.checkNotNull(schema);
106         this.timeout = Preconditions.checkNotNull(timeout);
107     }
108
109     void canCommit(final DataTreeCandidate tip) throws ExecutionException, TimeoutException {
110         Collection<CanCommit> messages = registry.createCanCommitMessages(txId, tip, schema);
111         if (messages.isEmpty()) {
112             successfulFromPrevious = ImmutableList.of();
113             changeStateFrom(State.IDLE, State.CAN_COMMIT_SUCCESSFUL);
114             return;
115         }
116
117         Future<Iterable<Object>> canCommitsFuture = Futures.traverse(messages,
118             input -> Patterns.ask(input.getCohort(), input, timeout).recover(EXCEPTION_TO_MESSAGE,
119                 ExecutionContexts.global()), ExecutionContexts.global());
120         changeStateFrom(State.IDLE, State.CAN_COMMIT_SENT);
121         processResponses(canCommitsFuture, State.CAN_COMMIT_SENT, State.CAN_COMMIT_SUCCESSFUL);
122     }
123
124     void preCommit() throws ExecutionException, TimeoutException {
125         Preconditions.checkState(successfulFromPrevious != null);
126         if (isEmpty()) {
127             changeStateFrom(State.CAN_COMMIT_SUCCESSFUL, State.PRE_COMMIT_SUCCESSFUL);
128             return;
129         }
130
131         Future<Iterable<Object>> preCommitFutures = sendMesageToSuccessful(new DataTreeCohortActor.PreCommit(txId));
132         changeStateFrom(State.CAN_COMMIT_SUCCESSFUL, State.PRE_COMMIT_SENT);
133         processResponses(preCommitFutures, State.PRE_COMMIT_SENT, State.PRE_COMMIT_SUCCESSFUL);
134     }
135
136     void commit() throws ExecutionException, TimeoutException {
137         Preconditions.checkState(successfulFromPrevious != null);
138         if (isEmpty()) {
139             changeStateFrom(State.PRE_COMMIT_SUCCESSFUL, State.COMMITED);
140             return;
141         }
142
143         Future<Iterable<Object>> commitsFuture = sendMesageToSuccessful(new DataTreeCohortActor.Commit(txId));
144         changeStateFrom(State.PRE_COMMIT_SUCCESSFUL, State.COMMIT_SENT);
145         processResponses(commitsFuture, State.COMMIT_SENT, State.COMMITED);
146     }
147
148     Optional<Future<Iterable<Object>>> abort() {
149         if (successfulFromPrevious != null) {
150             return Optional.of(sendMesageToSuccessful(new DataTreeCohortActor.Abort(txId)));
151         }
152
153         return Optional.empty();
154     }
155
156     private boolean isEmpty() {
157         return successfulFromPrevious instanceof Collection && ((Collection<?>) successfulFromPrevious).isEmpty();
158     }
159
160     private Future<Iterable<Object>> sendMesageToSuccessful(final Object message) {
161         return Futures.traverse(successfulFromPrevious, cohortResponse -> Patterns.ask(cohortResponse.getCohort(),
162             message, timeout), ExecutionContexts.global());
163     }
164
165     private void processResponses(final Future<Iterable<Object>> resultsFuture, final State currentState,
166             final State afterState) throws TimeoutException, ExecutionException {
167         final Iterable<Object> results;
168         try {
169             results = Await.result(resultsFuture, timeout.duration());
170         } catch (Exception e) {
171             successfulFromPrevious = null;
172             Throwables.propagateIfInstanceOf(e, TimeoutException.class);
173             throw Throwables.propagate(e);
174         }
175         Iterable<Failure> failed = Iterables.filter(results, Status.Failure.class);
176         Iterable<Success> successful = Iterables.filter(results, DataTreeCohortActor.Success.class);
177         successfulFromPrevious = successful;
178         if (!Iterables.isEmpty(failed)) {
179             changeStateFrom(currentState, State.FAILED);
180             Iterator<Failure> it = failed.iterator();
181             Throwable firstEx = it.next().cause();
182             while (it.hasNext()) {
183                 firstEx.addSuppressed(it.next().cause());
184             }
185             Throwables.propagateIfPossible(firstEx, ExecutionException.class);
186             Throwables.propagateIfPossible(firstEx, TimeoutException.class);
187             throw Throwables.propagate(firstEx);
188         }
189         changeStateFrom(currentState, afterState);
190     }
191
192     private void changeStateFrom(final State expected, final State followup) {
193         Preconditions.checkState(state == expected);
194         state = followup;
195     }
196 }