Merge "Bug 2933: BidningDOMDataBrokerAdaper implements DataTreeChangeService"
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / main / java / org / opendaylight / controller / cluster / datastore / ThreePhaseCommitCohortProxy.java
1 /*
2  * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.controller.cluster.datastore;
10
11 import akka.actor.ActorSelection;
12 import akka.dispatch.Futures;
13 import akka.dispatch.OnComplete;
14 import com.google.common.collect.Lists;
15 import com.google.common.util.concurrent.ListenableFuture;
16 import com.google.common.util.concurrent.SettableFuture;
17 import java.util.Collections;
18 import java.util.Iterator;
19 import java.util.List;
20 import org.opendaylight.controller.cluster.datastore.messages.AbortTransaction;
21 import org.opendaylight.controller.cluster.datastore.messages.AbortTransactionReply;
22 import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransaction;
23 import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransactionReply;
24 import org.opendaylight.controller.cluster.datastore.messages.CommitTransaction;
25 import org.opendaylight.controller.cluster.datastore.messages.CommitTransactionReply;
26 import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
27 import org.slf4j.Logger;
28 import org.slf4j.LoggerFactory;
29 import scala.concurrent.Future;
30 import scala.runtime.AbstractFunction1;
31
32 /**
33  * ThreePhaseCommitCohortProxy represents a set of remote cohort proxies
34  */
35 public class ThreePhaseCommitCohortProxy extends AbstractThreePhaseCommitCohort {
36
37     private static final Logger LOG = LoggerFactory.getLogger(ThreePhaseCommitCohortProxy.class);
38
39     private static final ListenableFuture<Void> IMMEDIATE_SUCCESS =
40             com.google.common.util.concurrent.Futures.immediateFuture(null);
41
42     private final ActorContext actorContext;
43     private final List<Future<ActorSelection>> cohortFutures;
44     private volatile List<ActorSelection> cohorts;
45     private final String transactionId;
46     private static final OperationCallback NO_OP_CALLBACK = new OperationCallback() {
47         @Override
48         public void run() {
49         }
50
51         @Override
52         public void success() {
53         }
54
55         @Override
56         public void failure() {
57         }
58     };
59
60     public ThreePhaseCommitCohortProxy(ActorContext actorContext,
61             List<Future<ActorSelection>> cohortFutures, String transactionId) {
62         this.actorContext = actorContext;
63         this.cohortFutures = cohortFutures;
64         this.transactionId = transactionId;
65     }
66
67     private Future<Void> buildCohortList() {
68
69         Future<Iterable<ActorSelection>> combinedFutures = Futures.sequence(cohortFutures,
70                 actorContext.getClientDispatcher());
71
72         return combinedFutures.transform(new AbstractFunction1<Iterable<ActorSelection>, Void>() {
73             @Override
74             public Void apply(Iterable<ActorSelection> actorSelections) {
75                 cohorts = Lists.newArrayList(actorSelections);
76                 if(LOG.isDebugEnabled()) {
77                     LOG.debug("Tx {} successfully built cohort path list: {}",
78                         transactionId, cohorts);
79                 }
80                 return null;
81             }
82         }, TransactionProxy.SAME_FAILURE_TRANSFORMER, actorContext.getClientDispatcher());
83     }
84
85     @Override
86     public ListenableFuture<Boolean> canCommit() {
87         if(LOG.isDebugEnabled()) {
88             LOG.debug("Tx {} canCommit", transactionId);
89         }
90         final SettableFuture<Boolean> returnFuture = SettableFuture.create();
91
92         // The first phase of canCommit is to gather the list of cohort actor paths that will
93         // participate in the commit. buildCohortPathsList combines the cohort path Futures into
94         // one Future which we wait on asynchronously here. The cohort actor paths are
95         // extracted from ReadyTransactionReply messages by the Futures that were obtained earlier
96         // and passed to us from upstream processing. If any one fails then  we'll fail canCommit.
97
98         buildCohortList().onComplete(new OnComplete<Void>() {
99             @Override
100             public void onComplete(Throwable failure, Void notUsed) throws Throwable {
101                 if(failure != null) {
102                     if(LOG.isDebugEnabled()) {
103                         LOG.debug("Tx {}: a cohort Future failed: {}", transactionId, failure);
104                     }
105                     returnFuture.setException(failure);
106                 } else {
107                     finishCanCommit(returnFuture);
108                 }
109             }
110         }, actorContext.getClientDispatcher());
111
112         return returnFuture;
113     }
114
115     private void finishCanCommit(final SettableFuture<Boolean> returnFuture) {
116         if(LOG.isDebugEnabled()) {
117             LOG.debug("Tx {} finishCanCommit", transactionId);
118         }
119
120         // For empty transactions return immediately
121         if(cohorts.size() == 0){
122             if(LOG.isDebugEnabled()) {
123                 LOG.debug("Tx {}: canCommit returning result: {}", transactionId, true);
124             }
125             returnFuture.set(Boolean.TRUE);
126             return;
127         }
128
129         final Object message = new CanCommitTransaction(transactionId).toSerializable();
130
131         final Iterator<ActorSelection> iterator = cohorts.iterator();
132
133         final OnComplete<Object> onComplete = new OnComplete<Object>() {
134             @Override
135             public void onComplete(Throwable failure, Object response) throws Throwable {
136                 if (failure != null) {
137                     if (LOG.isDebugEnabled()) {
138                         LOG.debug("Tx {}: a canCommit cohort Future failed: {}", transactionId, failure);
139                     }
140                     returnFuture.setException(failure);
141                     return;
142                 }
143
144                 boolean result = true;
145                 if (response.getClass().equals(CanCommitTransactionReply.SERIALIZABLE_CLASS)) {
146                     CanCommitTransactionReply reply =
147                             CanCommitTransactionReply.fromSerializable(response);
148                     if (!reply.getCanCommit()) {
149                         result = false;
150                     }
151                 } else {
152                     LOG.error("Unexpected response type {}", response.getClass());
153                     returnFuture.setException(new IllegalArgumentException(
154                             String.format("Unexpected response type %s", response.getClass())));
155                     return;
156                 }
157
158                 if(iterator.hasNext() && result){
159                     Future<Object> future = actorContext.executeOperationAsync(iterator.next(), message,
160                             actorContext.getTransactionCommitOperationTimeout());
161                     future.onComplete(this, actorContext.getClientDispatcher());
162                 } else {
163                     if(LOG.isDebugEnabled()) {
164                         LOG.debug("Tx {}: canCommit returning result: {}", transactionId, result);
165                     }
166                     returnFuture.set(Boolean.valueOf(result));
167                 }
168
169             }
170         };
171
172         Future<Object> future = actorContext.executeOperationAsync(iterator.next(), message,
173                 actorContext.getTransactionCommitOperationTimeout());
174         future.onComplete(onComplete, actorContext.getClientDispatcher());
175     }
176
177     private Future<Iterable<Object>> invokeCohorts(Object message) {
178         List<Future<Object>> futureList = Lists.newArrayListWithCapacity(cohorts.size());
179         for(ActorSelection cohort : cohorts) {
180             if(LOG.isDebugEnabled()) {
181                 LOG.debug("Tx {}: Sending {} to cohort {}", transactionId, message, cohort);
182             }
183             futureList.add(actorContext.executeOperationAsync(cohort, message, actorContext.getTransactionCommitOperationTimeout()));
184         }
185
186         return Futures.sequence(futureList, actorContext.getClientDispatcher());
187     }
188
189     @Override
190     public ListenableFuture<Void> preCommit() {
191         // We don't need to do anything here - preCommit is done atomically with the commit phase
192         // by the shard.
193         return IMMEDIATE_SUCCESS;
194     }
195
196     @Override
197     public ListenableFuture<Void> abort() {
198         // Note - we pass false for propagateException. In the front-end data broker, this method
199         // is called when one of the 3 phases fails with an exception. We'd rather have that
200         // original exception propagated to the client. If our abort fails and we propagate the
201         // exception then that exception will supersede and suppress the original exception. But
202         // it's the original exception that is the root cause and of more interest to the client.
203
204         return voidOperation("abort", new AbortTransaction(transactionId).toSerializable(),
205                 AbortTransactionReply.SERIALIZABLE_CLASS, false);
206     }
207
208     @Override
209     public ListenableFuture<Void> commit() {
210         OperationCallback operationCallback = cohortFutures.isEmpty() ? NO_OP_CALLBACK :
211                 new TransactionRateLimitingCallback(actorContext);
212
213         return voidOperation("commit", new CommitTransaction(transactionId).toSerializable(),
214                 CommitTransactionReply.SERIALIZABLE_CLASS, true, operationCallback);
215     }
216
217     private ListenableFuture<Void> voidOperation(final String operationName, final Object message,
218                                                  final Class<?> expectedResponseClass, final boolean propagateException) {
219         return voidOperation(operationName, message, expectedResponseClass, propagateException, NO_OP_CALLBACK);
220     }
221
222     private ListenableFuture<Void> voidOperation(final String operationName, final Object message,
223                                                  final Class<?> expectedResponseClass, final boolean propagateException, final OperationCallback callback) {
224
225         if(LOG.isDebugEnabled()) {
226             LOG.debug("Tx {} {}", transactionId, operationName);
227         }
228         final SettableFuture<Void> returnFuture = SettableFuture.create();
229
230         // The cohort actor list should already be built at this point by the canCommit phase but,
231         // if not for some reason, we'll try to build it here.
232
233         if(cohorts != null) {
234             finishVoidOperation(operationName, message, expectedResponseClass, propagateException,
235                     returnFuture, callback);
236         } else {
237             buildCohortList().onComplete(new OnComplete<Void>() {
238                 @Override
239                 public void onComplete(Throwable failure, Void notUsed) throws Throwable {
240                     if(failure != null) {
241                         if(LOG.isDebugEnabled()) {
242                             LOG.debug("Tx {}: a {} cohort path Future failed: {}", transactionId,
243                                 operationName, failure);
244                         }
245                         if(propagateException) {
246                             returnFuture.setException(failure);
247                         } else {
248                             returnFuture.set(null);
249                         }
250                     } else {
251                         finishVoidOperation(operationName, message, expectedResponseClass,
252                                 propagateException, returnFuture, callback);
253                     }
254                 }
255             }, actorContext.getClientDispatcher());
256         }
257
258         return returnFuture;
259     }
260
261     private void finishVoidOperation(final String operationName, final Object message,
262                                      final Class<?> expectedResponseClass, final boolean propagateException,
263                                      final SettableFuture<Void> returnFuture, final OperationCallback callback) {
264         if(LOG.isDebugEnabled()) {
265             LOG.debug("Tx {} finish {}", transactionId, operationName);
266         }
267
268         callback.run();
269
270         Future<Iterable<Object>> combinedFuture = invokeCohorts(message);
271
272         combinedFuture.onComplete(new OnComplete<Iterable<Object>>() {
273             @Override
274             public void onComplete(Throwable failure, Iterable<Object> responses) throws Throwable {
275
276                 Throwable exceptionToPropagate = failure;
277                 if(exceptionToPropagate == null) {
278                     for(Object response: responses) {
279                         if(!response.getClass().equals(expectedResponseClass)) {
280                             exceptionToPropagate = new IllegalArgumentException(
281                                     String.format("Unexpected response type %s",
282                                             response.getClass()));
283                             break;
284                         }
285                     }
286                 }
287
288                 if(exceptionToPropagate != null) {
289
290                     if(LOG.isDebugEnabled()) {
291                         LOG.debug("Tx {}: a {} cohort Future failed: {}", transactionId,
292                             operationName, exceptionToPropagate);
293                     }
294                     if(propagateException) {
295                         // We don't log the exception here to avoid redundant logging since we're
296                         // propagating to the caller in MD-SAL core who will log it.
297                         returnFuture.setException(exceptionToPropagate);
298                     } else {
299                         // Since the caller doesn't want us to propagate the exception we'll also
300                         // not log it normally. But it's usually not good to totally silence
301                         // exceptions so we'll log it to debug level.
302                         if(LOG.isDebugEnabled()) {
303                             LOG.debug(String.format("%s failed", message.getClass().getSimpleName()),
304                                 exceptionToPropagate);
305                         }
306                         returnFuture.set(null);
307                     }
308
309                     callback.failure();
310                 } else {
311
312                     if(LOG.isDebugEnabled()) {
313                         LOG.debug("Tx {}: {} succeeded", transactionId, operationName);
314                     }
315                     returnFuture.set(null);
316
317                     callback.success();
318                 }
319             }
320         }, actorContext.getClientDispatcher());
321     }
322
323     @Override
324     List<Future<ActorSelection>> getCohortFutures() {
325         return Collections.unmodifiableList(cohortFutures);
326     }
327 }