Merge "Startup archetype: Add basic unit tests for impl."
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / main / java / org / opendaylight / controller / cluster / datastore / ThreePhaseCommitCohortProxy.java
1 /*
2  * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.controller.cluster.datastore;
10
11 import akka.actor.ActorSelection;
12 import akka.dispatch.Futures;
13 import akka.dispatch.OnComplete;
14 import com.codahale.metrics.Snapshot;
15 import com.codahale.metrics.Timer;
16 import com.google.common.annotations.VisibleForTesting;
17 import com.google.common.collect.Lists;
18 import com.google.common.util.concurrent.ListenableFuture;
19 import com.google.common.util.concurrent.SettableFuture;
20 import java.util.Collections;
21 import java.util.List;
22 import java.util.concurrent.TimeUnit;
23 import org.opendaylight.controller.cluster.datastore.messages.AbortTransaction;
24 import org.opendaylight.controller.cluster.datastore.messages.AbortTransactionReply;
25 import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransaction;
26 import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransactionReply;
27 import org.opendaylight.controller.cluster.datastore.messages.CommitTransaction;
28 import org.opendaylight.controller.cluster.datastore.messages.CommitTransactionReply;
29 import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
30 import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
31 import org.slf4j.Logger;
32 import org.slf4j.LoggerFactory;
33 import scala.concurrent.Future;
34 import scala.runtime.AbstractFunction1;
35
36 /**
37  * ThreePhaseCommitCohortProxy represents a set of remote cohort proxies
38  */
39 public class ThreePhaseCommitCohortProxy implements DOMStoreThreePhaseCommitCohort{
40
41     private static final Logger LOG = LoggerFactory.getLogger(ThreePhaseCommitCohortProxy.class);
42
43     private static final ListenableFuture<Void> IMMEDIATE_SUCCESS =
44             com.google.common.util.concurrent.Futures.immediateFuture(null);
45
46     private final ActorContext actorContext;
47     private final List<Future<ActorSelection>> cohortFutures;
48     private volatile List<ActorSelection> cohorts;
49     private final String transactionId;
50     private static final OperationCallback NO_OP_CALLBACK = new OperationCallback() {
51         @Override
52         public void run() {
53         }
54
55         @Override
56         public void success() {
57         }
58
59         @Override
60         public void failure() {
61         }
62     };
63
64     public ThreePhaseCommitCohortProxy(ActorContext actorContext,
65             List<Future<ActorSelection>> cohortFutures, String transactionId) {
66         this.actorContext = actorContext;
67         this.cohortFutures = cohortFutures;
68         this.transactionId = transactionId;
69     }
70
71     private Future<Void> buildCohortList() {
72
73         Future<Iterable<ActorSelection>> combinedFutures = Futures.sequence(cohortFutures,
74                 actorContext.getActorSystem().dispatcher());
75
76         return combinedFutures.transform(new AbstractFunction1<Iterable<ActorSelection>, Void>() {
77             @Override
78             public Void apply(Iterable<ActorSelection> actorSelections) {
79                 cohorts = Lists.newArrayList(actorSelections);
80                 if(LOG.isDebugEnabled()) {
81                     LOG.debug("Tx {} successfully built cohort path list: {}",
82                         transactionId, cohorts);
83                 }
84                 return null;
85             }
86         }, TransactionProxy.SAME_FAILURE_TRANSFORMER, actorContext.getActorSystem().dispatcher());
87     }
88
89     @Override
90     public ListenableFuture<Boolean> canCommit() {
91         if(LOG.isDebugEnabled()) {
92             LOG.debug("Tx {} canCommit", transactionId);
93         }
94         final SettableFuture<Boolean> returnFuture = SettableFuture.create();
95
96         // The first phase of canCommit is to gather the list of cohort actor paths that will
97         // participate in the commit. buildCohortPathsList combines the cohort path Futures into
98         // one Future which we wait on asynchronously here. The cohort actor paths are
99         // extracted from ReadyTransactionReply messages by the Futures that were obtained earlier
100         // and passed to us from upstream processing. If any one fails then  we'll fail canCommit.
101
102         buildCohortList().onComplete(new OnComplete<Void>() {
103             @Override
104             public void onComplete(Throwable failure, Void notUsed) throws Throwable {
105                 if(failure != null) {
106                     if(LOG.isDebugEnabled()) {
107                         LOG.debug("Tx {}: a cohort Future failed: {}", transactionId, failure);
108                     }
109                     returnFuture.setException(failure);
110                 } else {
111                     finishCanCommit(returnFuture);
112                 }
113             }
114         }, actorContext.getActorSystem().dispatcher());
115
116         return returnFuture;
117     }
118
119     private void finishCanCommit(final SettableFuture<Boolean> returnFuture) {
120         if(LOG.isDebugEnabled()) {
121             LOG.debug("Tx {} finishCanCommit", transactionId);
122         }
123         // The last phase of canCommit is to invoke all the cohort actors asynchronously to perform
124         // their canCommit processing. If any one fails then we'll fail canCommit.
125
126         Future<Iterable<Object>> combinedFuture =
127                 invokeCohorts(new CanCommitTransaction(transactionId).toSerializable());
128
129         combinedFuture.onComplete(new OnComplete<Iterable<Object>>() {
130             @Override
131             public void onComplete(Throwable failure, Iterable<Object> responses) throws Throwable {
132                 if(failure != null) {
133                     if(LOG.isDebugEnabled()) {
134                         LOG.debug("Tx {}: a canCommit cohort Future failed: {}", transactionId, failure);
135                     }
136                     returnFuture.setException(failure);
137                     return;
138                 }
139
140                 boolean result = true;
141                 for(Object response: responses) {
142                     if (response.getClass().equals(CanCommitTransactionReply.SERIALIZABLE_CLASS)) {
143                         CanCommitTransactionReply reply =
144                                 CanCommitTransactionReply.fromSerializable(response);
145                         if (!reply.getCanCommit()) {
146                             result = false;
147                             break;
148                         }
149                     } else {
150                         LOG.error("Unexpected response type {}", response.getClass());
151                         returnFuture.setException(new IllegalArgumentException(
152                                 String.format("Unexpected response type %s", response.getClass())));
153                         return;
154                     }
155                 }
156                 if(LOG.isDebugEnabled()) {
157                     LOG.debug("Tx {}: canCommit returning result: {}", transactionId, result);
158                 }
159                 returnFuture.set(Boolean.valueOf(result));
160             }
161         }, actorContext.getActorSystem().dispatcher());
162     }
163
164     private Future<Iterable<Object>> invokeCohorts(Object message) {
165         List<Future<Object>> futureList = Lists.newArrayListWithCapacity(cohorts.size());
166         for(ActorSelection cohort : cohorts) {
167             if(LOG.isDebugEnabled()) {
168                 LOG.debug("Tx {}: Sending {} to cohort {}", transactionId, message, cohort);
169             }
170             futureList.add(actorContext.executeOperationAsync(cohort, message, actorContext.getTransactionCommitOperationTimeout()));
171         }
172
173         return Futures.sequence(futureList, actorContext.getActorSystem().dispatcher());
174     }
175
176     @Override
177     public ListenableFuture<Void> preCommit() {
178         // We don't need to do anything here - preCommit is done atomically with the commit phase
179         // by the shard.
180         return IMMEDIATE_SUCCESS;
181     }
182
183     @Override
184     public ListenableFuture<Void> abort() {
185         // Note - we pass false for propagateException. In the front-end data broker, this method
186         // is called when one of the 3 phases fails with an exception. We'd rather have that
187         // original exception propagated to the client. If our abort fails and we propagate the
188         // exception then that exception will supersede and suppress the original exception. But
189         // it's the original exception that is the root cause and of more interest to the client.
190
191         return voidOperation("abort", new AbortTransaction(transactionId).toSerializable(),
192                 AbortTransactionReply.SERIALIZABLE_CLASS, false);
193     }
194
195     @Override
196     public ListenableFuture<Void> commit() {
197         OperationCallback operationCallback = (cohortFutures.size() == 0) ? NO_OP_CALLBACK :
198                 new CommitCallback(actorContext);
199
200         return voidOperation("commit", new CommitTransaction(transactionId).toSerializable(),
201                 CommitTransactionReply.SERIALIZABLE_CLASS, true, operationCallback);
202     }
203
204     private ListenableFuture<Void> voidOperation(final String operationName, final Object message,
205                                                  final Class<?> expectedResponseClass, final boolean propagateException) {
206         return voidOperation(operationName, message, expectedResponseClass, propagateException, NO_OP_CALLBACK);
207     }
208
209     private ListenableFuture<Void> voidOperation(final String operationName, final Object message,
210                                                  final Class<?> expectedResponseClass, final boolean propagateException, final OperationCallback callback) {
211
212         if(LOG.isDebugEnabled()) {
213             LOG.debug("Tx {} {}", transactionId, operationName);
214         }
215         final SettableFuture<Void> returnFuture = SettableFuture.create();
216
217         // The cohort actor list should already be built at this point by the canCommit phase but,
218         // if not for some reason, we'll try to build it here.
219
220         if(cohorts != null) {
221             finishVoidOperation(operationName, message, expectedResponseClass, propagateException,
222                     returnFuture, callback);
223         } else {
224             buildCohortList().onComplete(new OnComplete<Void>() {
225                 @Override
226                 public void onComplete(Throwable failure, Void notUsed) throws Throwable {
227                     if(failure != null) {
228                         if(LOG.isDebugEnabled()) {
229                             LOG.debug("Tx {}: a {} cohort path Future failed: {}", transactionId,
230                                 operationName, failure);
231                         }
232                         if(propagateException) {
233                             returnFuture.setException(failure);
234                         } else {
235                             returnFuture.set(null);
236                         }
237                     } else {
238                         finishVoidOperation(operationName, message, expectedResponseClass,
239                                 propagateException, returnFuture, callback);
240                     }
241                 }
242             }, actorContext.getActorSystem().dispatcher());
243         }
244
245         return returnFuture;
246     }
247
248     private void finishVoidOperation(final String operationName, final Object message,
249                                      final Class<?> expectedResponseClass, final boolean propagateException,
250                                      final SettableFuture<Void> returnFuture, final OperationCallback callback) {
251         if(LOG.isDebugEnabled()) {
252             LOG.debug("Tx {} finish {}", transactionId, operationName);
253         }
254
255         callback.run();
256
257         Future<Iterable<Object>> combinedFuture = invokeCohorts(message);
258
259         combinedFuture.onComplete(new OnComplete<Iterable<Object>>() {
260             @Override
261             public void onComplete(Throwable failure, Iterable<Object> responses) throws Throwable {
262
263                 Throwable exceptionToPropagate = failure;
264                 if(exceptionToPropagate == null) {
265                     for(Object response: responses) {
266                         if(!response.getClass().equals(expectedResponseClass)) {
267                             exceptionToPropagate = new IllegalArgumentException(
268                                     String.format("Unexpected response type %s",
269                                             response.getClass()));
270                             break;
271                         }
272                     }
273                 }
274
275                 if(exceptionToPropagate != null) {
276
277                     if(LOG.isDebugEnabled()) {
278                         LOG.debug("Tx {}: a {} cohort Future failed: {}", transactionId,
279                             operationName, exceptionToPropagate);
280                     }
281                     if(propagateException) {
282                         // We don't log the exception here to avoid redundant logging since we're
283                         // propagating to the caller in MD-SAL core who will log it.
284                         returnFuture.setException(exceptionToPropagate);
285                     } else {
286                         // Since the caller doesn't want us to propagate the exception we'll also
287                         // not log it normally. But it's usually not good to totally silence
288                         // exceptions so we'll log it to debug level.
289                         if(LOG.isDebugEnabled()) {
290                             LOG.debug(String.format("%s failed", message.getClass().getSimpleName()),
291                                 exceptionToPropagate);
292                         }
293                         returnFuture.set(null);
294                     }
295
296                     callback.failure();
297                 } else {
298
299                     if(LOG.isDebugEnabled()) {
300                         LOG.debug("Tx {}: {} succeeded", transactionId, operationName);
301                     }
302                     returnFuture.set(null);
303
304                     callback.success();
305                 }
306             }
307         }, actorContext.getActorSystem().dispatcher());
308     }
309
310     @VisibleForTesting
311     List<Future<ActorSelection>> getCohortFutures() {
312         return Collections.unmodifiableList(cohortFutures);
313     }
314
315     private static interface OperationCallback {
316         void run();
317         void success();
318         void failure();
319     }
320
321     private static class CommitCallback implements OperationCallback{
322
323         private static final Logger LOG = LoggerFactory.getLogger(CommitCallback.class);
324         private static final String COMMIT = "commit";
325
326         private final Timer commitTimer;
327         private final ActorContext actorContext;
328         private Timer.Context timerContext;
329
330         CommitCallback(ActorContext actorContext){
331             this.actorContext = actorContext;
332             commitTimer = actorContext.getOperationTimer(COMMIT);
333         }
334
335         @Override
336         public void run() {
337             timerContext = commitTimer.time();
338         }
339
340         @Override
341         public void success() {
342             timerContext.stop();
343
344             Snapshot timerSnapshot = commitTimer.getSnapshot();
345             double allowedLatencyInNanos = timerSnapshot.get95thPercentile();
346
347             long commitTimeoutInSeconds = actorContext.getDatastoreContext()
348                     .getShardTransactionCommitTimeoutInSeconds();
349             long commitTimeoutInNanos = TimeUnit.SECONDS.toNanos(commitTimeoutInSeconds);
350
351             // Here we are trying to find out how many transactions per second are allowed
352             double newRateLimit = ((double) commitTimeoutInNanos / allowedLatencyInNanos) / commitTimeoutInSeconds;
353
354             LOG.debug("Data Store {} commit rateLimit adjusted to {} allowedLatencyInNanos = {}",
355                     actorContext.getDataStoreType(), newRateLimit, allowedLatencyInNanos);
356
357             actorContext.setTxCreationLimit(newRateLimit);
358         }
359
360         @Override
361         public void failure() {
362             // This would mean we couldn't get a transaction completed in 30 seconds which is
363             // the default transaction commit timeout. Using the timeout information to figure out the rate limit is
364             // not going to be useful - so we leave it as it is
365         }
366     }
367
368 }