09b6568e1ade016accf0ab80788065950d469ec1
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / main / java / org / opendaylight / controller / cluster / datastore / ThreePhaseCommitCohortProxy.java
1 /*
2  * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.controller.cluster.datastore;
10
11 import akka.actor.ActorSelection;
12 import akka.dispatch.Futures;
13 import akka.dispatch.OnComplete;
14 import com.google.common.collect.Lists;
15 import com.google.common.util.concurrent.ListenableFuture;
16 import com.google.common.util.concurrent.SettableFuture;
17 import java.util.Collections;
18 import java.util.Iterator;
19 import java.util.List;
20 import org.opendaylight.controller.cluster.datastore.messages.AbortTransaction;
21 import org.opendaylight.controller.cluster.datastore.messages.AbortTransactionReply;
22 import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransaction;
23 import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransactionReply;
24 import org.opendaylight.controller.cluster.datastore.messages.CommitTransaction;
25 import org.opendaylight.controller.cluster.datastore.messages.CommitTransactionReply;
26 import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
27 import org.slf4j.Logger;
28 import org.slf4j.LoggerFactory;
29 import scala.concurrent.Future;
30 import scala.runtime.AbstractFunction1;
31
32 /**
33  * ThreePhaseCommitCohortProxy represents a set of remote cohort proxies
34  */
35 public class ThreePhaseCommitCohortProxy extends AbstractThreePhaseCommitCohort<ActorSelection> {
36
37     private static final Logger LOG = LoggerFactory.getLogger(ThreePhaseCommitCohortProxy.class);
38
39     private final ActorContext actorContext;
40     private final List<Future<ActorSelection>> cohortFutures;
41     private volatile List<ActorSelection> cohorts;
42     private final String transactionId;
43     private volatile OperationCallback commitOperationCallback;
44
45     public ThreePhaseCommitCohortProxy(ActorContext actorContext,
46             List<Future<ActorSelection>> cohortFutures, String transactionId) {
47         this.actorContext = actorContext;
48         this.cohortFutures = cohortFutures;
49         this.transactionId = transactionId;
50     }
51
52     private Future<Void> buildCohortList() {
53
54         Future<Iterable<ActorSelection>> combinedFutures = Futures.sequence(cohortFutures,
55                 actorContext.getClientDispatcher());
56
57         return combinedFutures.transform(new AbstractFunction1<Iterable<ActorSelection>, Void>() {
58             @Override
59             public Void apply(Iterable<ActorSelection> actorSelections) {
60                 cohorts = Lists.newArrayList(actorSelections);
61                 if(LOG.isDebugEnabled()) {
62                     LOG.debug("Tx {} successfully built cohort path list: {}",
63                         transactionId, cohorts);
64                 }
65                 return null;
66             }
67         }, TransactionReadyReplyMapper.SAME_FAILURE_TRANSFORMER, actorContext.getClientDispatcher());
68     }
69
70     @Override
71     public ListenableFuture<Boolean> canCommit() {
72         if(LOG.isDebugEnabled()) {
73             LOG.debug("Tx {} canCommit", transactionId);
74         }
75         final SettableFuture<Boolean> returnFuture = SettableFuture.create();
76
77         // The first phase of canCommit is to gather the list of cohort actor paths that will
78         // participate in the commit. buildCohortPathsList combines the cohort path Futures into
79         // one Future which we wait on asynchronously here. The cohort actor paths are
80         // extracted from ReadyTransactionReply messages by the Futures that were obtained earlier
81         // and passed to us from upstream processing. If any one fails then  we'll fail canCommit.
82
83         buildCohortList().onComplete(new OnComplete<Void>() {
84             @Override
85             public void onComplete(Throwable failure, Void notUsed) throws Throwable {
86                 if(failure != null) {
87                     if(LOG.isDebugEnabled()) {
88                         LOG.debug("Tx {}: a cohort Future failed: {}", transactionId, failure);
89                     }
90                     returnFuture.setException(failure);
91                 } else {
92                     finishCanCommit(returnFuture);
93                 }
94             }
95         }, actorContext.getClientDispatcher());
96
97         return returnFuture;
98     }
99
100     private void finishCanCommit(final SettableFuture<Boolean> returnFuture) {
101         if(LOG.isDebugEnabled()) {
102             LOG.debug("Tx {} finishCanCommit", transactionId);
103         }
104
105         // For empty transactions return immediately
106         if(cohorts.size() == 0){
107             if(LOG.isDebugEnabled()) {
108                 LOG.debug("Tx {}: canCommit returning result: {}", transactionId, true);
109             }
110             returnFuture.set(Boolean.TRUE);
111             return;
112         }
113
114         commitOperationCallback = cohortFutures.isEmpty() ? OperationCallback.NO_OP_CALLBACK :
115             new TransactionRateLimitingCallback(actorContext);
116
117         commitOperationCallback.run();
118
119         final Object message = new CanCommitTransaction(transactionId).toSerializable();
120
121         final Iterator<ActorSelection> iterator = cohorts.iterator();
122
123         final OnComplete<Object> onComplete = new OnComplete<Object>() {
124             @Override
125             public void onComplete(Throwable failure, Object response) throws Throwable {
126                 if (failure != null) {
127                     if (LOG.isDebugEnabled()) {
128                         LOG.debug("Tx {}: a canCommit cohort Future failed: {}", transactionId, failure);
129                     }
130                     returnFuture.setException(failure);
131                     commitOperationCallback.failure();
132                     return;
133                 }
134
135                 // Only the first call to pause takes effect - subsequent calls before resume are no-ops. So
136                 // this means we'll only time the first transaction canCommit which should be fine.
137                 commitOperationCallback.pause();
138
139                 boolean result = true;
140                 if (response.getClass().equals(CanCommitTransactionReply.SERIALIZABLE_CLASS)) {
141                     CanCommitTransactionReply reply =
142                             CanCommitTransactionReply.fromSerializable(response);
143                     if (!reply.getCanCommit()) {
144                         result = false;
145                     }
146                 } else {
147                     LOG.error("Unexpected response type {}", response.getClass());
148                     returnFuture.setException(new IllegalArgumentException(
149                             String.format("Unexpected response type %s", response.getClass())));
150                     return;
151                 }
152
153                 if(iterator.hasNext() && result){
154                     Future<Object> future = actorContext.executeOperationAsync(iterator.next(), message,
155                             actorContext.getTransactionCommitOperationTimeout());
156                     future.onComplete(this, actorContext.getClientDispatcher());
157                 } else {
158                     if(LOG.isDebugEnabled()) {
159                         LOG.debug("Tx {}: canCommit returning result: {}", transactionId, result);
160                     }
161                     returnFuture.set(Boolean.valueOf(result));
162                 }
163
164             }
165         };
166
167         Future<Object> future = actorContext.executeOperationAsync(iterator.next(), message,
168                 actorContext.getTransactionCommitOperationTimeout());
169         future.onComplete(onComplete, actorContext.getClientDispatcher());
170     }
171
172     private Future<Iterable<Object>> invokeCohorts(Object message) {
173         List<Future<Object>> futureList = Lists.newArrayListWithCapacity(cohorts.size());
174         for(ActorSelection cohort : cohorts) {
175             if(LOG.isDebugEnabled()) {
176                 LOG.debug("Tx {}: Sending {} to cohort {}", transactionId, message, cohort);
177             }
178             futureList.add(actorContext.executeOperationAsync(cohort, message, actorContext.getTransactionCommitOperationTimeout()));
179         }
180
181         return Futures.sequence(futureList, actorContext.getClientDispatcher());
182     }
183
184     @Override
185     public ListenableFuture<Void> preCommit() {
186         // We don't need to do anything here - preCommit is done atomically with the commit phase
187         // by the shard.
188         return IMMEDIATE_VOID_SUCCESS;
189     }
190
191     @Override
192     public ListenableFuture<Void> abort() {
193         // Note - we pass false for propagateException. In the front-end data broker, this method
194         // is called when one of the 3 phases fails with an exception. We'd rather have that
195         // original exception propagated to the client. If our abort fails and we propagate the
196         // exception then that exception will supersede and suppress the original exception. But
197         // it's the original exception that is the root cause and of more interest to the client.
198
199         return voidOperation("abort", new AbortTransaction(transactionId).toSerializable(),
200                 AbortTransactionReply.SERIALIZABLE_CLASS, false);
201     }
202
203     @Override
204     public ListenableFuture<Void> commit() {
205         OperationCallback operationCallback = commitOperationCallback != null ? commitOperationCallback :
206             OperationCallback.NO_OP_CALLBACK;
207
208         return voidOperation("commit", new CommitTransaction(transactionId).toSerializable(),
209                 CommitTransactionReply.SERIALIZABLE_CLASS, true, operationCallback);
210     }
211
212     private ListenableFuture<Void> voidOperation(final String operationName, final Object message,
213                                                  final Class<?> expectedResponseClass, final boolean propagateException) {
214         return voidOperation(operationName, message, expectedResponseClass, propagateException,
215                 OperationCallback.NO_OP_CALLBACK);
216     }
217
218     private ListenableFuture<Void> voidOperation(final String operationName, final Object message,
219                                                  final Class<?> expectedResponseClass, final boolean propagateException, final OperationCallback callback) {
220
221         if(LOG.isDebugEnabled()) {
222             LOG.debug("Tx {} {}", transactionId, operationName);
223         }
224         final SettableFuture<Void> returnFuture = SettableFuture.create();
225
226         // The cohort actor list should already be built at this point by the canCommit phase but,
227         // if not for some reason, we'll try to build it here.
228
229         if(cohorts != null) {
230             finishVoidOperation(operationName, message, expectedResponseClass, propagateException,
231                     returnFuture, callback);
232         } else {
233             buildCohortList().onComplete(new OnComplete<Void>() {
234                 @Override
235                 public void onComplete(Throwable failure, Void notUsed) throws Throwable {
236                     if(failure != null) {
237                         if(LOG.isDebugEnabled()) {
238                             LOG.debug("Tx {}: a {} cohort path Future failed: {}", transactionId,
239                                 operationName, failure);
240                         }
241                         if(propagateException) {
242                             returnFuture.setException(failure);
243                         } else {
244                             returnFuture.set(null);
245                         }
246                     } else {
247                         finishVoidOperation(operationName, message, expectedResponseClass,
248                                 propagateException, returnFuture, callback);
249                     }
250                 }
251             }, actorContext.getClientDispatcher());
252         }
253
254         return returnFuture;
255     }
256
257     private void finishVoidOperation(final String operationName, final Object message,
258                                      final Class<?> expectedResponseClass, final boolean propagateException,
259                                      final SettableFuture<Void> returnFuture, final OperationCallback callback) {
260         if(LOG.isDebugEnabled()) {
261             LOG.debug("Tx {} finish {}", transactionId, operationName);
262         }
263
264         callback.resume();
265
266         Future<Iterable<Object>> combinedFuture = invokeCohorts(message);
267
268         combinedFuture.onComplete(new OnComplete<Iterable<Object>>() {
269             @Override
270             public void onComplete(Throwable failure, Iterable<Object> responses) throws Throwable {
271
272                 Throwable exceptionToPropagate = failure;
273                 if(exceptionToPropagate == null) {
274                     for(Object response: responses) {
275                         if(!response.getClass().equals(expectedResponseClass)) {
276                             exceptionToPropagate = new IllegalArgumentException(
277                                     String.format("Unexpected response type %s",
278                                             response.getClass()));
279                             break;
280                         }
281                     }
282                 }
283
284                 if(exceptionToPropagate != null) {
285
286                     if(LOG.isDebugEnabled()) {
287                         LOG.debug("Tx {}: a {} cohort Future failed: {}", transactionId,
288                             operationName, exceptionToPropagate);
289                     }
290                     if(propagateException) {
291                         // We don't log the exception here to avoid redundant logging since we're
292                         // propagating to the caller in MD-SAL core who will log it.
293                         returnFuture.setException(exceptionToPropagate);
294                     } else {
295                         // Since the caller doesn't want us to propagate the exception we'll also
296                         // not log it normally. But it's usually not good to totally silence
297                         // exceptions so we'll log it to debug level.
298                         if(LOG.isDebugEnabled()) {
299                             LOG.debug(String.format("%s failed", message.getClass().getSimpleName()),
300                                 exceptionToPropagate);
301                         }
302                         returnFuture.set(null);
303                     }
304
305                     callback.failure();
306                 } else {
307
308                     if(LOG.isDebugEnabled()) {
309                         LOG.debug("Tx {}: {} succeeded", transactionId, operationName);
310                     }
311                     returnFuture.set(null);
312
313                     callback.success();
314                 }
315             }
316         }, actorContext.getClientDispatcher());
317     }
318
319     @Override
320     List<Future<ActorSelection>> getCohortFutures() {
321         return Collections.unmodifiableList(cohortFutures);
322     }
323 }