BUG 2799: SPI for EventSources
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / main / java / org / opendaylight / controller / cluster / datastore / ThreePhaseCommitCohortProxy.java
1 /*
2  * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.controller.cluster.datastore;
10
11 import akka.actor.ActorSelection;
12 import akka.dispatch.Futures;
13 import akka.dispatch.OnComplete;
14 import com.google.common.annotations.VisibleForTesting;
15 import com.google.common.collect.Lists;
16 import com.google.common.util.concurrent.ListenableFuture;
17 import com.google.common.util.concurrent.SettableFuture;
18 import java.util.Collections;
19 import java.util.Iterator;
20 import java.util.List;
21 import org.opendaylight.controller.cluster.datastore.messages.AbortTransaction;
22 import org.opendaylight.controller.cluster.datastore.messages.AbortTransactionReply;
23 import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransaction;
24 import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransactionReply;
25 import org.opendaylight.controller.cluster.datastore.messages.CommitTransaction;
26 import org.opendaylight.controller.cluster.datastore.messages.CommitTransactionReply;
27 import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
28 import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
29 import org.slf4j.Logger;
30 import org.slf4j.LoggerFactory;
31 import scala.concurrent.Future;
32 import scala.runtime.AbstractFunction1;
33
34 /**
35  * ThreePhaseCommitCohortProxy represents a set of remote cohort proxies
36  */
37 public class ThreePhaseCommitCohortProxy implements DOMStoreThreePhaseCommitCohort{
38
39     private static final Logger LOG = LoggerFactory.getLogger(ThreePhaseCommitCohortProxy.class);
40
41     private static final ListenableFuture<Void> IMMEDIATE_SUCCESS =
42             com.google.common.util.concurrent.Futures.immediateFuture(null);
43
44     private final ActorContext actorContext;
45     private final List<Future<ActorSelection>> cohortFutures;
46     private volatile List<ActorSelection> cohorts;
47     private final String transactionId;
48     private static final OperationCallback NO_OP_CALLBACK = new OperationCallback() {
49         @Override
50         public void run() {
51         }
52
53         @Override
54         public void success() {
55         }
56
57         @Override
58         public void failure() {
59         }
60     };
61
62     public ThreePhaseCommitCohortProxy(ActorContext actorContext,
63             List<Future<ActorSelection>> cohortFutures, String transactionId) {
64         this.actorContext = actorContext;
65         this.cohortFutures = cohortFutures;
66         this.transactionId = transactionId;
67     }
68
69     private Future<Void> buildCohortList() {
70
71         Future<Iterable<ActorSelection>> combinedFutures = Futures.sequence(cohortFutures,
72                 actorContext.getClientDispatcher());
73
74         return combinedFutures.transform(new AbstractFunction1<Iterable<ActorSelection>, Void>() {
75             @Override
76             public Void apply(Iterable<ActorSelection> actorSelections) {
77                 cohorts = Lists.newArrayList(actorSelections);
78                 if(LOG.isDebugEnabled()) {
79                     LOG.debug("Tx {} successfully built cohort path list: {}",
80                         transactionId, cohorts);
81                 }
82                 return null;
83             }
84         }, TransactionProxy.SAME_FAILURE_TRANSFORMER, actorContext.getClientDispatcher());
85     }
86
87     @Override
88     public ListenableFuture<Boolean> canCommit() {
89         if(LOG.isDebugEnabled()) {
90             LOG.debug("Tx {} canCommit", transactionId);
91         }
92         final SettableFuture<Boolean> returnFuture = SettableFuture.create();
93
94         // The first phase of canCommit is to gather the list of cohort actor paths that will
95         // participate in the commit. buildCohortPathsList combines the cohort path Futures into
96         // one Future which we wait on asynchronously here. The cohort actor paths are
97         // extracted from ReadyTransactionReply messages by the Futures that were obtained earlier
98         // and passed to us from upstream processing. If any one fails then  we'll fail canCommit.
99
100         buildCohortList().onComplete(new OnComplete<Void>() {
101             @Override
102             public void onComplete(Throwable failure, Void notUsed) throws Throwable {
103                 if(failure != null) {
104                     if(LOG.isDebugEnabled()) {
105                         LOG.debug("Tx {}: a cohort Future failed: {}", transactionId, failure);
106                     }
107                     returnFuture.setException(failure);
108                 } else {
109                     finishCanCommit(returnFuture);
110                 }
111             }
112         }, actorContext.getClientDispatcher());
113
114         return returnFuture;
115     }
116
117     private void finishCanCommit(final SettableFuture<Boolean> returnFuture) {
118         if(LOG.isDebugEnabled()) {
119             LOG.debug("Tx {} finishCanCommit", transactionId);
120         }
121
122         // For empty transactions return immediately
123         if(cohorts.size() == 0){
124             if(LOG.isDebugEnabled()) {
125                 LOG.debug("Tx {}: canCommit returning result: {}", transactionId, true);
126             }
127             returnFuture.set(Boolean.TRUE);
128             return;
129         }
130
131         final Object message = new CanCommitTransaction(transactionId).toSerializable();
132
133         final Iterator<ActorSelection> iterator = cohorts.iterator();
134
135         final OnComplete<Object> onComplete = new OnComplete<Object>() {
136             @Override
137             public void onComplete(Throwable failure, Object response) throws Throwable {
138                 if (failure != null) {
139                     if (LOG.isDebugEnabled()) {
140                         LOG.debug("Tx {}: a canCommit cohort Future failed: {}", transactionId, failure);
141                     }
142                     returnFuture.setException(failure);
143                     return;
144                 }
145
146                 boolean result = true;
147                 if (response.getClass().equals(CanCommitTransactionReply.SERIALIZABLE_CLASS)) {
148                     CanCommitTransactionReply reply =
149                             CanCommitTransactionReply.fromSerializable(response);
150                     if (!reply.getCanCommit()) {
151                         result = false;
152                     }
153                 } else {
154                     LOG.error("Unexpected response type {}", response.getClass());
155                     returnFuture.setException(new IllegalArgumentException(
156                             String.format("Unexpected response type %s", response.getClass())));
157                     return;
158                 }
159
160                 if(iterator.hasNext() && result){
161                     Future<Object> future = actorContext.executeOperationAsync(iterator.next(), message,
162                             actorContext.getTransactionCommitOperationTimeout());
163                     future.onComplete(this, actorContext.getClientDispatcher());
164                 } else {
165                     if(LOG.isDebugEnabled()) {
166                         LOG.debug("Tx {}: canCommit returning result: {}", transactionId, result);
167                     }
168                     returnFuture.set(Boolean.valueOf(result));
169                 }
170
171             }
172         };
173
174         Future<Object> future = actorContext.executeOperationAsync(iterator.next(), message,
175                 actorContext.getTransactionCommitOperationTimeout());
176         future.onComplete(onComplete, actorContext.getClientDispatcher());
177     }
178
179     private Future<Iterable<Object>> invokeCohorts(Object message) {
180         List<Future<Object>> futureList = Lists.newArrayListWithCapacity(cohorts.size());
181         for(ActorSelection cohort : cohorts) {
182             if(LOG.isDebugEnabled()) {
183                 LOG.debug("Tx {}: Sending {} to cohort {}", transactionId, message, cohort);
184             }
185             futureList.add(actorContext.executeOperationAsync(cohort, message, actorContext.getTransactionCommitOperationTimeout()));
186         }
187
188         return Futures.sequence(futureList, actorContext.getClientDispatcher());
189     }
190
191     @Override
192     public ListenableFuture<Void> preCommit() {
193         // We don't need to do anything here - preCommit is done atomically with the commit phase
194         // by the shard.
195         return IMMEDIATE_SUCCESS;
196     }
197
198     @Override
199     public ListenableFuture<Void> abort() {
200         // Note - we pass false for propagateException. In the front-end data broker, this method
201         // is called when one of the 3 phases fails with an exception. We'd rather have that
202         // original exception propagated to the client. If our abort fails and we propagate the
203         // exception then that exception will supersede and suppress the original exception. But
204         // it's the original exception that is the root cause and of more interest to the client.
205
206         return voidOperation("abort", new AbortTransaction(transactionId).toSerializable(),
207                 AbortTransactionReply.SERIALIZABLE_CLASS, false);
208     }
209
210     @Override
211     public ListenableFuture<Void> commit() {
212         OperationCallback operationCallback = (cohortFutures.size() == 0) ? NO_OP_CALLBACK :
213                 new TransactionRateLimitingCallback(actorContext);
214
215         return voidOperation("commit", new CommitTransaction(transactionId).toSerializable(),
216                 CommitTransactionReply.SERIALIZABLE_CLASS, true, operationCallback);
217     }
218
219     private ListenableFuture<Void> voidOperation(final String operationName, final Object message,
220                                                  final Class<?> expectedResponseClass, final boolean propagateException) {
221         return voidOperation(operationName, message, expectedResponseClass, propagateException, NO_OP_CALLBACK);
222     }
223
224     private ListenableFuture<Void> voidOperation(final String operationName, final Object message,
225                                                  final Class<?> expectedResponseClass, final boolean propagateException, final OperationCallback callback) {
226
227         if(LOG.isDebugEnabled()) {
228             LOG.debug("Tx {} {}", transactionId, operationName);
229         }
230         final SettableFuture<Void> returnFuture = SettableFuture.create();
231
232         // The cohort actor list should already be built at this point by the canCommit phase but,
233         // if not for some reason, we'll try to build it here.
234
235         if(cohorts != null) {
236             finishVoidOperation(operationName, message, expectedResponseClass, propagateException,
237                     returnFuture, callback);
238         } else {
239             buildCohortList().onComplete(new OnComplete<Void>() {
240                 @Override
241                 public void onComplete(Throwable failure, Void notUsed) throws Throwable {
242                     if(failure != null) {
243                         if(LOG.isDebugEnabled()) {
244                             LOG.debug("Tx {}: a {} cohort path Future failed: {}", transactionId,
245                                 operationName, failure);
246                         }
247                         if(propagateException) {
248                             returnFuture.setException(failure);
249                         } else {
250                             returnFuture.set(null);
251                         }
252                     } else {
253                         finishVoidOperation(operationName, message, expectedResponseClass,
254                                 propagateException, returnFuture, callback);
255                     }
256                 }
257             }, actorContext.getClientDispatcher());
258         }
259
260         return returnFuture;
261     }
262
263     private void finishVoidOperation(final String operationName, final Object message,
264                                      final Class<?> expectedResponseClass, final boolean propagateException,
265                                      final SettableFuture<Void> returnFuture, final OperationCallback callback) {
266         if(LOG.isDebugEnabled()) {
267             LOG.debug("Tx {} finish {}", transactionId, operationName);
268         }
269
270         callback.run();
271
272         Future<Iterable<Object>> combinedFuture = invokeCohorts(message);
273
274         combinedFuture.onComplete(new OnComplete<Iterable<Object>>() {
275             @Override
276             public void onComplete(Throwable failure, Iterable<Object> responses) throws Throwable {
277
278                 Throwable exceptionToPropagate = failure;
279                 if(exceptionToPropagate == null) {
280                     for(Object response: responses) {
281                         if(!response.getClass().equals(expectedResponseClass)) {
282                             exceptionToPropagate = new IllegalArgumentException(
283                                     String.format("Unexpected response type %s",
284                                             response.getClass()));
285                             break;
286                         }
287                     }
288                 }
289
290                 if(exceptionToPropagate != null) {
291
292                     if(LOG.isDebugEnabled()) {
293                         LOG.debug("Tx {}: a {} cohort Future failed: {}", transactionId,
294                             operationName, exceptionToPropagate);
295                     }
296                     if(propagateException) {
297                         // We don't log the exception here to avoid redundant logging since we're
298                         // propagating to the caller in MD-SAL core who will log it.
299                         returnFuture.setException(exceptionToPropagate);
300                     } else {
301                         // Since the caller doesn't want us to propagate the exception we'll also
302                         // not log it normally. But it's usually not good to totally silence
303                         // exceptions so we'll log it to debug level.
304                         if(LOG.isDebugEnabled()) {
305                             LOG.debug(String.format("%s failed", message.getClass().getSimpleName()),
306                                 exceptionToPropagate);
307                         }
308                         returnFuture.set(null);
309                     }
310
311                     callback.failure();
312                 } else {
313
314                     if(LOG.isDebugEnabled()) {
315                         LOG.debug("Tx {}: {} succeeded", transactionId, operationName);
316                     }
317                     returnFuture.set(null);
318
319                     callback.success();
320                 }
321             }
322         }, actorContext.getClientDispatcher());
323     }
324
325     @VisibleForTesting
326     List<Future<ActorSelection>> getCohortFutures() {
327         return Collections.unmodifiableList(cohortFutures);
328     }
329 }