57749a1a736bc935da74222ca984d834083d6206
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / main / java / org / opendaylight / controller / cluster / datastore / ThreePhaseCommitCohortProxy.java
1 /*
2  * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.controller.cluster.datastore;
10
11 import akka.actor.ActorSelection;
12 import akka.dispatch.Futures;
13 import akka.dispatch.OnComplete;
14 import com.google.common.collect.Lists;
15 import com.google.common.util.concurrent.ListenableFuture;
16 import com.google.common.util.concurrent.SettableFuture;
17 import java.util.Collections;
18 import java.util.Iterator;
19 import java.util.List;
20 import org.opendaylight.controller.cluster.datastore.messages.AbortTransaction;
21 import org.opendaylight.controller.cluster.datastore.messages.AbortTransactionReply;
22 import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransaction;
23 import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransactionReply;
24 import org.opendaylight.controller.cluster.datastore.messages.CommitTransaction;
25 import org.opendaylight.controller.cluster.datastore.messages.CommitTransactionReply;
26 import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
27 import org.slf4j.Logger;
28 import org.slf4j.LoggerFactory;
29 import scala.concurrent.Future;
30 import scala.runtime.AbstractFunction1;
31
32 /**
33  * ThreePhaseCommitCohortProxy represents a set of remote cohort proxies
34  */
35 public class ThreePhaseCommitCohortProxy extends AbstractThreePhaseCommitCohort<ActorSelection> {
36
37     private static final Logger LOG = LoggerFactory.getLogger(ThreePhaseCommitCohortProxy.class);
38
39     private final ActorContext actorContext;
40     private final List<Future<ActorSelection>> cohortFutures;
41     private volatile List<ActorSelection> cohorts;
42     private final String transactionId;
43
44     public ThreePhaseCommitCohortProxy(ActorContext actorContext,
45             List<Future<ActorSelection>> cohortFutures, String transactionId) {
46         this.actorContext = actorContext;
47         this.cohortFutures = cohortFutures;
48         this.transactionId = transactionId;
49     }
50
51     private Future<Void> buildCohortList() {
52
53         Future<Iterable<ActorSelection>> combinedFutures = Futures.sequence(cohortFutures,
54                 actorContext.getClientDispatcher());
55
56         return combinedFutures.transform(new AbstractFunction1<Iterable<ActorSelection>, Void>() {
57             @Override
58             public Void apply(Iterable<ActorSelection> actorSelections) {
59                 cohorts = Lists.newArrayList(actorSelections);
60                 if(LOG.isDebugEnabled()) {
61                     LOG.debug("Tx {} successfully built cohort path list: {}",
62                         transactionId, cohorts);
63                 }
64                 return null;
65             }
66         }, TransactionProxy.SAME_FAILURE_TRANSFORMER, actorContext.getClientDispatcher());
67     }
68
69     @Override
70     public ListenableFuture<Boolean> canCommit() {
71         if(LOG.isDebugEnabled()) {
72             LOG.debug("Tx {} canCommit", transactionId);
73         }
74         final SettableFuture<Boolean> returnFuture = SettableFuture.create();
75
76         // The first phase of canCommit is to gather the list of cohort actor paths that will
77         // participate in the commit. buildCohortPathsList combines the cohort path Futures into
78         // one Future which we wait on asynchronously here. The cohort actor paths are
79         // extracted from ReadyTransactionReply messages by the Futures that were obtained earlier
80         // and passed to us from upstream processing. If any one fails then  we'll fail canCommit.
81
82         buildCohortList().onComplete(new OnComplete<Void>() {
83             @Override
84             public void onComplete(Throwable failure, Void notUsed) throws Throwable {
85                 if(failure != null) {
86                     if(LOG.isDebugEnabled()) {
87                         LOG.debug("Tx {}: a cohort Future failed: {}", transactionId, failure);
88                     }
89                     returnFuture.setException(failure);
90                 } else {
91                     finishCanCommit(returnFuture);
92                 }
93             }
94         }, actorContext.getClientDispatcher());
95
96         return returnFuture;
97     }
98
99     private void finishCanCommit(final SettableFuture<Boolean> returnFuture) {
100         if(LOG.isDebugEnabled()) {
101             LOG.debug("Tx {} finishCanCommit", transactionId);
102         }
103
104         // For empty transactions return immediately
105         if(cohorts.size() == 0){
106             if(LOG.isDebugEnabled()) {
107                 LOG.debug("Tx {}: canCommit returning result: {}", transactionId, true);
108             }
109             returnFuture.set(Boolean.TRUE);
110             return;
111         }
112
113         final Object message = new CanCommitTransaction(transactionId).toSerializable();
114
115         final Iterator<ActorSelection> iterator = cohorts.iterator();
116
117         final OnComplete<Object> onComplete = new OnComplete<Object>() {
118             @Override
119             public void onComplete(Throwable failure, Object response) throws Throwable {
120                 if (failure != null) {
121                     if (LOG.isDebugEnabled()) {
122                         LOG.debug("Tx {}: a canCommit cohort Future failed: {}", transactionId, failure);
123                     }
124                     returnFuture.setException(failure);
125                     return;
126                 }
127
128                 boolean result = true;
129                 if (response.getClass().equals(CanCommitTransactionReply.SERIALIZABLE_CLASS)) {
130                     CanCommitTransactionReply reply =
131                             CanCommitTransactionReply.fromSerializable(response);
132                     if (!reply.getCanCommit()) {
133                         result = false;
134                     }
135                 } else {
136                     LOG.error("Unexpected response type {}", response.getClass());
137                     returnFuture.setException(new IllegalArgumentException(
138                             String.format("Unexpected response type %s", response.getClass())));
139                     return;
140                 }
141
142                 if(iterator.hasNext() && result){
143                     Future<Object> future = actorContext.executeOperationAsync(iterator.next(), message,
144                             actorContext.getTransactionCommitOperationTimeout());
145                     future.onComplete(this, actorContext.getClientDispatcher());
146                 } else {
147                     if(LOG.isDebugEnabled()) {
148                         LOG.debug("Tx {}: canCommit returning result: {}", transactionId, result);
149                     }
150                     returnFuture.set(Boolean.valueOf(result));
151                 }
152
153             }
154         };
155
156         Future<Object> future = actorContext.executeOperationAsync(iterator.next(), message,
157                 actorContext.getTransactionCommitOperationTimeout());
158         future.onComplete(onComplete, actorContext.getClientDispatcher());
159     }
160
161     private Future<Iterable<Object>> invokeCohorts(Object message) {
162         List<Future<Object>> futureList = Lists.newArrayListWithCapacity(cohorts.size());
163         for(ActorSelection cohort : cohorts) {
164             if(LOG.isDebugEnabled()) {
165                 LOG.debug("Tx {}: Sending {} to cohort {}", transactionId, message, cohort);
166             }
167             futureList.add(actorContext.executeOperationAsync(cohort, message, actorContext.getTransactionCommitOperationTimeout()));
168         }
169
170         return Futures.sequence(futureList, actorContext.getClientDispatcher());
171     }
172
173     @Override
174     public ListenableFuture<Void> preCommit() {
175         // We don't need to do anything here - preCommit is done atomically with the commit phase
176         // by the shard.
177         return IMMEDIATE_VOID_SUCCESS;
178     }
179
180     @Override
181     public ListenableFuture<Void> abort() {
182         // Note - we pass false for propagateException. In the front-end data broker, this method
183         // is called when one of the 3 phases fails with an exception. We'd rather have that
184         // original exception propagated to the client. If our abort fails and we propagate the
185         // exception then that exception will supersede and suppress the original exception. But
186         // it's the original exception that is the root cause and of more interest to the client.
187
188         return voidOperation("abort", new AbortTransaction(transactionId).toSerializable(),
189                 AbortTransactionReply.SERIALIZABLE_CLASS, false);
190     }
191
192     @Override
193     public ListenableFuture<Void> commit() {
194         OperationCallback operationCallback = cohortFutures.isEmpty() ? OperationCallback.NO_OP_CALLBACK :
195                 new TransactionRateLimitingCallback(actorContext);
196
197         return voidOperation("commit", new CommitTransaction(transactionId).toSerializable(),
198                 CommitTransactionReply.SERIALIZABLE_CLASS, true, operationCallback);
199     }
200
201     private ListenableFuture<Void> voidOperation(final String operationName, final Object message,
202                                                  final Class<?> expectedResponseClass, final boolean propagateException) {
203         return voidOperation(operationName, message, expectedResponseClass, propagateException,
204                 OperationCallback.NO_OP_CALLBACK);
205     }
206
207     private ListenableFuture<Void> voidOperation(final String operationName, final Object message,
208                                                  final Class<?> expectedResponseClass, final boolean propagateException, final OperationCallback callback) {
209
210         if(LOG.isDebugEnabled()) {
211             LOG.debug("Tx {} {}", transactionId, operationName);
212         }
213         final SettableFuture<Void> returnFuture = SettableFuture.create();
214
215         // The cohort actor list should already be built at this point by the canCommit phase but,
216         // if not for some reason, we'll try to build it here.
217
218         if(cohorts != null) {
219             finishVoidOperation(operationName, message, expectedResponseClass, propagateException,
220                     returnFuture, callback);
221         } else {
222             buildCohortList().onComplete(new OnComplete<Void>() {
223                 @Override
224                 public void onComplete(Throwable failure, Void notUsed) throws Throwable {
225                     if(failure != null) {
226                         if(LOG.isDebugEnabled()) {
227                             LOG.debug("Tx {}: a {} cohort path Future failed: {}", transactionId,
228                                 operationName, failure);
229                         }
230                         if(propagateException) {
231                             returnFuture.setException(failure);
232                         } else {
233                             returnFuture.set(null);
234                         }
235                     } else {
236                         finishVoidOperation(operationName, message, expectedResponseClass,
237                                 propagateException, returnFuture, callback);
238                     }
239                 }
240             }, actorContext.getClientDispatcher());
241         }
242
243         return returnFuture;
244     }
245
246     private void finishVoidOperation(final String operationName, final Object message,
247                                      final Class<?> expectedResponseClass, final boolean propagateException,
248                                      final SettableFuture<Void> returnFuture, final OperationCallback callback) {
249         if(LOG.isDebugEnabled()) {
250             LOG.debug("Tx {} finish {}", transactionId, operationName);
251         }
252
253         callback.run();
254
255         Future<Iterable<Object>> combinedFuture = invokeCohorts(message);
256
257         combinedFuture.onComplete(new OnComplete<Iterable<Object>>() {
258             @Override
259             public void onComplete(Throwable failure, Iterable<Object> responses) throws Throwable {
260
261                 Throwable exceptionToPropagate = failure;
262                 if(exceptionToPropagate == null) {
263                     for(Object response: responses) {
264                         if(!response.getClass().equals(expectedResponseClass)) {
265                             exceptionToPropagate = new IllegalArgumentException(
266                                     String.format("Unexpected response type %s",
267                                             response.getClass()));
268                             break;
269                         }
270                     }
271                 }
272
273                 if(exceptionToPropagate != null) {
274
275                     if(LOG.isDebugEnabled()) {
276                         LOG.debug("Tx {}: a {} cohort Future failed: {}", transactionId,
277                             operationName, exceptionToPropagate);
278                     }
279                     if(propagateException) {
280                         // We don't log the exception here to avoid redundant logging since we're
281                         // propagating to the caller in MD-SAL core who will log it.
282                         returnFuture.setException(exceptionToPropagate);
283                     } else {
284                         // Since the caller doesn't want us to propagate the exception we'll also
285                         // not log it normally. But it's usually not good to totally silence
286                         // exceptions so we'll log it to debug level.
287                         if(LOG.isDebugEnabled()) {
288                             LOG.debug(String.format("%s failed", message.getClass().getSimpleName()),
289                                 exceptionToPropagate);
290                         }
291                         returnFuture.set(null);
292                     }
293
294                     callback.failure();
295                 } else {
296
297                     if(LOG.isDebugEnabled()) {
298                         LOG.debug("Tx {}: {} succeeded", transactionId, operationName);
299                     }
300                     returnFuture.set(null);
301
302                     callback.success();
303                 }
304             }
305         }, actorContext.getClientDispatcher());
306     }
307
308     @Override
309     List<Future<ActorSelection>> getCohortFutures() {
310         return Collections.unmodifiableList(cohortFutures);
311     }
312 }