BUG 2762 : Compute rate limit based on a more lenient algorithm
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / main / java / org / opendaylight / controller / cluster / datastore / ThreePhaseCommitCohortProxy.java
1 /*
2  * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.controller.cluster.datastore;
10
11 import akka.actor.ActorSelection;
12 import akka.dispatch.Futures;
13 import akka.dispatch.OnComplete;
14 import com.google.common.annotations.VisibleForTesting;
15 import com.google.common.collect.Lists;
16 import com.google.common.util.concurrent.ListenableFuture;
17 import com.google.common.util.concurrent.SettableFuture;
18 import java.util.Collections;
19 import java.util.List;
20 import org.opendaylight.controller.cluster.datastore.messages.AbortTransaction;
21 import org.opendaylight.controller.cluster.datastore.messages.AbortTransactionReply;
22 import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransaction;
23 import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransactionReply;
24 import org.opendaylight.controller.cluster.datastore.messages.CommitTransaction;
25 import org.opendaylight.controller.cluster.datastore.messages.CommitTransactionReply;
26 import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
27 import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
28 import org.slf4j.Logger;
29 import org.slf4j.LoggerFactory;
30 import scala.concurrent.Future;
31 import scala.runtime.AbstractFunction1;
32
33 /**
34  * ThreePhaseCommitCohortProxy represents a set of remote cohort proxies
35  */
36 public class ThreePhaseCommitCohortProxy implements DOMStoreThreePhaseCommitCohort{
37
38     private static final Logger LOG = LoggerFactory.getLogger(ThreePhaseCommitCohortProxy.class);
39
40     private static final ListenableFuture<Void> IMMEDIATE_SUCCESS =
41             com.google.common.util.concurrent.Futures.immediateFuture(null);
42
43     private final ActorContext actorContext;
44     private final List<Future<ActorSelection>> cohortFutures;
45     private volatile List<ActorSelection> cohorts;
46     private final String transactionId;
47     private static final OperationCallback NO_OP_CALLBACK = new OperationCallback() {
48         @Override
49         public void run() {
50         }
51
52         @Override
53         public void success() {
54         }
55
56         @Override
57         public void failure() {
58         }
59     };
60
61     public ThreePhaseCommitCohortProxy(ActorContext actorContext,
62             List<Future<ActorSelection>> cohortFutures, String transactionId) {
63         this.actorContext = actorContext;
64         this.cohortFutures = cohortFutures;
65         this.transactionId = transactionId;
66     }
67
68     private Future<Void> buildCohortList() {
69
70         Future<Iterable<ActorSelection>> combinedFutures = Futures.sequence(cohortFutures,
71                 actorContext.getClientDispatcher());
72
73         return combinedFutures.transform(new AbstractFunction1<Iterable<ActorSelection>, Void>() {
74             @Override
75             public Void apply(Iterable<ActorSelection> actorSelections) {
76                 cohorts = Lists.newArrayList(actorSelections);
77                 if(LOG.isDebugEnabled()) {
78                     LOG.debug("Tx {} successfully built cohort path list: {}",
79                         transactionId, cohorts);
80                 }
81                 return null;
82             }
83         }, TransactionProxy.SAME_FAILURE_TRANSFORMER, actorContext.getClientDispatcher());
84     }
85
86     @Override
87     public ListenableFuture<Boolean> canCommit() {
88         if(LOG.isDebugEnabled()) {
89             LOG.debug("Tx {} canCommit", transactionId);
90         }
91         final SettableFuture<Boolean> returnFuture = SettableFuture.create();
92
93         // The first phase of canCommit is to gather the list of cohort actor paths that will
94         // participate in the commit. buildCohortPathsList combines the cohort path Futures into
95         // one Future which we wait on asynchronously here. The cohort actor paths are
96         // extracted from ReadyTransactionReply messages by the Futures that were obtained earlier
97         // and passed to us from upstream processing. If any one fails then  we'll fail canCommit.
98
99         buildCohortList().onComplete(new OnComplete<Void>() {
100             @Override
101             public void onComplete(Throwable failure, Void notUsed) throws Throwable {
102                 if(failure != null) {
103                     if(LOG.isDebugEnabled()) {
104                         LOG.debug("Tx {}: a cohort Future failed: {}", transactionId, failure);
105                     }
106                     returnFuture.setException(failure);
107                 } else {
108                     finishCanCommit(returnFuture);
109                 }
110             }
111         }, actorContext.getClientDispatcher());
112
113         return returnFuture;
114     }
115
116     private void finishCanCommit(final SettableFuture<Boolean> returnFuture) {
117         if(LOG.isDebugEnabled()) {
118             LOG.debug("Tx {} finishCanCommit", transactionId);
119         }
120         // The last phase of canCommit is to invoke all the cohort actors asynchronously to perform
121         // their canCommit processing. If any one fails then we'll fail canCommit.
122
123         Future<Iterable<Object>> combinedFuture =
124                 invokeCohorts(new CanCommitTransaction(transactionId).toSerializable());
125
126         combinedFuture.onComplete(new OnComplete<Iterable<Object>>() {
127             @Override
128             public void onComplete(Throwable failure, Iterable<Object> responses) throws Throwable {
129                 if(failure != null) {
130                     if(LOG.isDebugEnabled()) {
131                         LOG.debug("Tx {}: a canCommit cohort Future failed: {}", transactionId, failure);
132                     }
133                     returnFuture.setException(failure);
134                     return;
135                 }
136
137                 boolean result = true;
138                 for(Object response: responses) {
139                     if (response.getClass().equals(CanCommitTransactionReply.SERIALIZABLE_CLASS)) {
140                         CanCommitTransactionReply reply =
141                                 CanCommitTransactionReply.fromSerializable(response);
142                         if (!reply.getCanCommit()) {
143                             result = false;
144                             break;
145                         }
146                     } else {
147                         LOG.error("Unexpected response type {}", response.getClass());
148                         returnFuture.setException(new IllegalArgumentException(
149                                 String.format("Unexpected response type %s", response.getClass())));
150                         return;
151                     }
152                 }
153                 if(LOG.isDebugEnabled()) {
154                     LOG.debug("Tx {}: canCommit returning result: {}", transactionId, result);
155                 }
156                 returnFuture.set(Boolean.valueOf(result));
157             }
158         }, actorContext.getClientDispatcher());
159     }
160
161     private Future<Iterable<Object>> invokeCohorts(Object message) {
162         List<Future<Object>> futureList = Lists.newArrayListWithCapacity(cohorts.size());
163         for(ActorSelection cohort : cohorts) {
164             if(LOG.isDebugEnabled()) {
165                 LOG.debug("Tx {}: Sending {} to cohort {}", transactionId, message, cohort);
166             }
167             futureList.add(actorContext.executeOperationAsync(cohort, message, actorContext.getTransactionCommitOperationTimeout()));
168         }
169
170         return Futures.sequence(futureList, actorContext.getClientDispatcher());
171     }
172
173     @Override
174     public ListenableFuture<Void> preCommit() {
175         // We don't need to do anything here - preCommit is done atomically with the commit phase
176         // by the shard.
177         return IMMEDIATE_SUCCESS;
178     }
179
180     @Override
181     public ListenableFuture<Void> abort() {
182         // Note - we pass false for propagateException. In the front-end data broker, this method
183         // is called when one of the 3 phases fails with an exception. We'd rather have that
184         // original exception propagated to the client. If our abort fails and we propagate the
185         // exception then that exception will supersede and suppress the original exception. But
186         // it's the original exception that is the root cause and of more interest to the client.
187
188         return voidOperation("abort", new AbortTransaction(transactionId).toSerializable(),
189                 AbortTransactionReply.SERIALIZABLE_CLASS, false);
190     }
191
192     @Override
193     public ListenableFuture<Void> commit() {
194         OperationCallback operationCallback = (cohortFutures.size() == 0) ? NO_OP_CALLBACK :
195                 new TransactionRateLimitingCallback(actorContext);
196
197         return voidOperation("commit", new CommitTransaction(transactionId).toSerializable(),
198                 CommitTransactionReply.SERIALIZABLE_CLASS, true, operationCallback);
199     }
200
201     private ListenableFuture<Void> voidOperation(final String operationName, final Object message,
202                                                  final Class<?> expectedResponseClass, final boolean propagateException) {
203         return voidOperation(operationName, message, expectedResponseClass, propagateException, NO_OP_CALLBACK);
204     }
205
206     private ListenableFuture<Void> voidOperation(final String operationName, final Object message,
207                                                  final Class<?> expectedResponseClass, final boolean propagateException, final OperationCallback callback) {
208
209         if(LOG.isDebugEnabled()) {
210             LOG.debug("Tx {} {}", transactionId, operationName);
211         }
212         final SettableFuture<Void> returnFuture = SettableFuture.create();
213
214         // The cohort actor list should already be built at this point by the canCommit phase but,
215         // if not for some reason, we'll try to build it here.
216
217         if(cohorts != null) {
218             finishVoidOperation(operationName, message, expectedResponseClass, propagateException,
219                     returnFuture, callback);
220         } else {
221             buildCohortList().onComplete(new OnComplete<Void>() {
222                 @Override
223                 public void onComplete(Throwable failure, Void notUsed) throws Throwable {
224                     if(failure != null) {
225                         if(LOG.isDebugEnabled()) {
226                             LOG.debug("Tx {}: a {} cohort path Future failed: {}", transactionId,
227                                 operationName, failure);
228                         }
229                         if(propagateException) {
230                             returnFuture.setException(failure);
231                         } else {
232                             returnFuture.set(null);
233                         }
234                     } else {
235                         finishVoidOperation(operationName, message, expectedResponseClass,
236                                 propagateException, returnFuture, callback);
237                     }
238                 }
239             }, actorContext.getClientDispatcher());
240         }
241
242         return returnFuture;
243     }
244
245     private void finishVoidOperation(final String operationName, final Object message,
246                                      final Class<?> expectedResponseClass, final boolean propagateException,
247                                      final SettableFuture<Void> returnFuture, final OperationCallback callback) {
248         if(LOG.isDebugEnabled()) {
249             LOG.debug("Tx {} finish {}", transactionId, operationName);
250         }
251
252         callback.run();
253
254         Future<Iterable<Object>> combinedFuture = invokeCohorts(message);
255
256         combinedFuture.onComplete(new OnComplete<Iterable<Object>>() {
257             @Override
258             public void onComplete(Throwable failure, Iterable<Object> responses) throws Throwable {
259
260                 Throwable exceptionToPropagate = failure;
261                 if(exceptionToPropagate == null) {
262                     for(Object response: responses) {
263                         if(!response.getClass().equals(expectedResponseClass)) {
264                             exceptionToPropagate = new IllegalArgumentException(
265                                     String.format("Unexpected response type %s",
266                                             response.getClass()));
267                             break;
268                         }
269                     }
270                 }
271
272                 if(exceptionToPropagate != null) {
273
274                     if(LOG.isDebugEnabled()) {
275                         LOG.debug("Tx {}: a {} cohort Future failed: {}", transactionId,
276                             operationName, exceptionToPropagate);
277                     }
278                     if(propagateException) {
279                         // We don't log the exception here to avoid redundant logging since we're
280                         // propagating to the caller in MD-SAL core who will log it.
281                         returnFuture.setException(exceptionToPropagate);
282                     } else {
283                         // Since the caller doesn't want us to propagate the exception we'll also
284                         // not log it normally. But it's usually not good to totally silence
285                         // exceptions so we'll log it to debug level.
286                         if(LOG.isDebugEnabled()) {
287                             LOG.debug(String.format("%s failed", message.getClass().getSimpleName()),
288                                 exceptionToPropagate);
289                         }
290                         returnFuture.set(null);
291                     }
292
293                     callback.failure();
294                 } else {
295
296                     if(LOG.isDebugEnabled()) {
297                         LOG.debug("Tx {}: {} succeeded", transactionId, operationName);
298                     }
299                     returnFuture.set(null);
300
301                     callback.success();
302                 }
303             }
304         }, actorContext.getClientDispatcher());
305     }
306
307     @VisibleForTesting
308     List<Future<ActorSelection>> getCohortFutures() {
309         return Collections.unmodifiableList(cohortFutures);
310     }
311 }