17017b9b668ea699d1dab0ecfa7deac2448c84b4
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / main / java / org / opendaylight / controller / cluster / datastore / ThreePhaseCommitCohortProxy.java
1 /*
2  * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.controller.cluster.datastore;
10
11 import akka.actor.ActorSelection;
12 import akka.dispatch.OnComplete;
13 import com.google.common.base.Preconditions;
14 import com.google.common.base.Supplier;
15 import com.google.common.collect.Lists;
16 import com.google.common.util.concurrent.FutureCallback;
17 import com.google.common.util.concurrent.Futures;
18 import com.google.common.util.concurrent.ListenableFuture;
19 import com.google.common.util.concurrent.SettableFuture;
20 import java.util.ArrayList;
21 import java.util.Iterator;
22 import java.util.List;
23 import java.util.concurrent.atomic.AtomicInteger;
24 import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
25 import org.opendaylight.controller.cluster.datastore.messages.AbortTransaction;
26 import org.opendaylight.controller.cluster.datastore.messages.AbortTransactionReply;
27 import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransaction;
28 import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransactionReply;
29 import org.opendaylight.controller.cluster.datastore.messages.CommitTransaction;
30 import org.opendaylight.controller.cluster.datastore.messages.CommitTransactionReply;
31 import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
32 import org.slf4j.Logger;
33 import org.slf4j.LoggerFactory;
34 import scala.concurrent.Future;
35
36 /**
37  * ThreePhaseCommitCohortProxy represents a set of remote cohort proxies.
38  */
39 public class ThreePhaseCommitCohortProxy extends AbstractThreePhaseCommitCohort<ActorSelection> {
40
41     private static final Logger LOG = LoggerFactory.getLogger(ThreePhaseCommitCohortProxy.class);
42
43     private static final MessageSupplier COMMIT_MESSAGE_SUPPLIER = new MessageSupplier() {
44         @Override
45         public Object newMessage(TransactionIdentifier transactionId, short version) {
46             return new CommitTransaction(transactionId, version).toSerializable();
47         }
48
49         @Override
50         public boolean isSerializedReplyType(Object reply) {
51             return CommitTransactionReply.isSerializedType(reply);
52         }
53     };
54
55     private static final MessageSupplier ABORT_MESSAGE_SUPPLIER = new MessageSupplier() {
56         @Override
57         public Object newMessage(TransactionIdentifier transactionId, short version) {
58             return new AbortTransaction(transactionId, version).toSerializable();
59         }
60
61         @Override
62         public boolean isSerializedReplyType(Object reply) {
63             return AbortTransactionReply.isSerializedType(reply);
64         }
65     };
66
67     private final ActorContext actorContext;
68     private final List<CohortInfo> cohorts;
69     private final SettableFuture<Void> cohortsResolvedFuture = SettableFuture.create();
70     private final TransactionIdentifier transactionId;
71     private volatile OperationCallback commitOperationCallback;
72
73     public ThreePhaseCommitCohortProxy(ActorContext actorContext, List<CohortInfo> cohorts,
74             TransactionIdentifier transactionId) {
75         this.actorContext = actorContext;
76         this.cohorts = cohorts;
77         this.transactionId = Preconditions.checkNotNull(transactionId);
78
79         if (cohorts.isEmpty()) {
80             cohortsResolvedFuture.set(null);
81         }
82     }
83
84     private ListenableFuture<Void> resolveCohorts() {
85         if (cohortsResolvedFuture.isDone()) {
86             return cohortsResolvedFuture;
87         }
88
89         final AtomicInteger completed = new AtomicInteger(cohorts.size());
90         for (final CohortInfo info: cohorts) {
91             info.getActorFuture().onComplete(new OnComplete<ActorSelection>() {
92                 @Override
93                 public void onComplete(Throwable failure, ActorSelection actor)  {
94                     synchronized (completed) {
95                         boolean done = completed.decrementAndGet() == 0;
96                         if (failure != null) {
97                             LOG.debug("Tx {}: a cohort Future failed", transactionId, failure);
98                             cohortsResolvedFuture.setException(failure);
99                         } else if (!cohortsResolvedFuture.isDone()) {
100                             LOG.debug("Tx {}: cohort actor {} resolved", transactionId, actor);
101
102                             info.setResolvedActor(actor);
103                             if (done) {
104                                 LOG.debug("Tx {}: successfully resolved all cohort actors", transactionId);
105                                 cohortsResolvedFuture.set(null);
106                             }
107                         }
108                     }
109                 }
110             }, actorContext.getClientDispatcher());
111         }
112
113         return cohortsResolvedFuture;
114     }
115
116     @Override
117     public ListenableFuture<Boolean> canCommit() {
118         LOG.debug("Tx {} canCommit", transactionId);
119
120         final SettableFuture<Boolean> returnFuture = SettableFuture.create();
121
122         // The first phase of canCommit is to gather the list of cohort actor paths that will
123         // participate in the commit. buildCohortPathsList combines the cohort path Futures into
124         // one Future which we wait on asynchronously here. The cohort actor paths are
125         // extracted from ReadyTransactionReply messages by the Futures that were obtained earlier
126         // and passed to us from upstream processing. If any one fails then  we'll fail canCommit.
127
128         Futures.addCallback(resolveCohorts(), new FutureCallback<Void>() {
129             @Override
130             public void onSuccess(Void notUsed) {
131                 finishCanCommit(returnFuture);
132             }
133
134             @Override
135             public void onFailure(Throwable failure) {
136                 returnFuture.setException(failure);
137             }
138         });
139
140         return returnFuture;
141     }
142
143     private void finishCanCommit(final SettableFuture<Boolean> returnFuture) {
144         LOG.debug("Tx {} finishCanCommit", transactionId);
145
146         // For empty transactions return immediately
147         if (cohorts.size() == 0) {
148             LOG.debug("Tx {}: canCommit returning result true", transactionId);
149             returnFuture.set(Boolean.TRUE);
150             return;
151         }
152
153         commitOperationCallback = new TransactionRateLimitingCallback(actorContext);
154         commitOperationCallback.run();
155
156         final Iterator<CohortInfo> iterator = cohorts.iterator();
157
158         final OnComplete<Object> onComplete = new OnComplete<Object>() {
159             @Override
160             public void onComplete(Throwable failure, Object response) {
161                 if (failure != null) {
162                     LOG.debug("Tx {}: a canCommit cohort Future failed", transactionId, failure);
163
164                     returnFuture.setException(failure);
165                     commitOperationCallback.failure();
166                     return;
167                 }
168
169                 // Only the first call to pause takes effect - subsequent calls before resume are no-ops. So
170                 // this means we'll only time the first transaction canCommit which should be fine.
171                 commitOperationCallback.pause();
172
173                 boolean result = true;
174                 if (CanCommitTransactionReply.isSerializedType(response)) {
175                     CanCommitTransactionReply reply = CanCommitTransactionReply.fromSerializable(response);
176
177                     LOG.debug("Tx {}: received {}", transactionId, response);
178
179                     if (!reply.getCanCommit()) {
180                         result = false;
181                     }
182                 } else {
183                     LOG.error("Unexpected response type {}", response.getClass());
184                     returnFuture.setException(new IllegalArgumentException(
185                             String.format("Unexpected response type %s", response.getClass())));
186                     return;
187                 }
188
189                 if (iterator.hasNext() && result) {
190                     sendCanCommitTransaction(iterator.next(), this);
191                 } else {
192                     LOG.debug("Tx {}: canCommit returning result: {}", transactionId, result);
193                     returnFuture.set(Boolean.valueOf(result));
194                 }
195
196             }
197         };
198
199         sendCanCommitTransaction(iterator.next(), onComplete);
200     }
201
202     private void sendCanCommitTransaction(CohortInfo toCohortInfo, OnComplete<Object> onComplete) {
203         CanCommitTransaction message = new CanCommitTransaction(transactionId, toCohortInfo.getActorVersion());
204
205         LOG.debug("Tx {}: sending {} to {}", transactionId, message, toCohortInfo.getResolvedActor());
206
207         Future<Object> future = actorContext.executeOperationAsync(toCohortInfo.getResolvedActor(),
208                 message.toSerializable(), actorContext.getTransactionCommitOperationTimeout());
209         future.onComplete(onComplete, actorContext.getClientDispatcher());
210     }
211
212     private Future<Iterable<Object>> invokeCohorts(MessageSupplier messageSupplier) {
213         List<Future<Object>> futureList = Lists.newArrayListWithCapacity(cohorts.size());
214         for (CohortInfo cohort : cohorts) {
215             Object message = messageSupplier.newMessage(transactionId, cohort.getActorVersion());
216
217             LOG.debug("Tx {}: Sending {} to cohort {}", transactionId, message , cohort);
218
219             futureList.add(actorContext.executeOperationAsync(cohort.getResolvedActor(), message,
220                     actorContext.getTransactionCommitOperationTimeout()));
221         }
222
223         return akka.dispatch.Futures.sequence(futureList, actorContext.getClientDispatcher());
224     }
225
226     @Override
227     public ListenableFuture<Void> preCommit() {
228         // We don't need to do anything here - preCommit is done atomically with the commit phase
229         // by the shard.
230         return IMMEDIATE_VOID_SUCCESS;
231     }
232
233     @Override
234     public ListenableFuture<Void> abort() {
235         // Note - we pass false for propagateException. In the front-end data broker, this method
236         // is called when one of the 3 phases fails with an exception. We'd rather have that
237         // original exception propagated to the client. If our abort fails and we propagate the
238         // exception then that exception will supersede and suppress the original exception. But
239         // it's the original exception that is the root cause and of more interest to the client.
240
241         return voidOperation("abort", ABORT_MESSAGE_SUPPLIER,
242                 AbortTransactionReply.class, false, OperationCallback.NO_OP_CALLBACK);
243     }
244
245     @Override
246     public ListenableFuture<Void> commit() {
247         OperationCallback operationCallback = commitOperationCallback != null ? commitOperationCallback :
248             OperationCallback.NO_OP_CALLBACK;
249
250         return voidOperation("commit", COMMIT_MESSAGE_SUPPLIER,
251                 CommitTransactionReply.class, true, operationCallback);
252     }
253
254     @SuppressWarnings("checkstyle:IllegalCatch")
255     private static boolean successfulFuture(ListenableFuture<Void> future) {
256         if (!future.isDone()) {
257             return false;
258         }
259
260         try {
261             future.get();
262             return true;
263         } catch (Exception e) {
264             return false;
265         }
266     }
267
268     private ListenableFuture<Void> voidOperation(final String operationName,
269             final MessageSupplier messageSupplier, final Class<?> expectedResponseClass,
270             final boolean propagateException, final OperationCallback callback) {
271         LOG.debug("Tx {} {}", transactionId, operationName);
272
273         final SettableFuture<Void> returnFuture = SettableFuture.create();
274
275         // The cohort actor list should already be built at this point by the canCommit phase but,
276         // if not for some reason, we'll try to build it here.
277
278         ListenableFuture<Void> future = resolveCohorts();
279         if (successfulFuture(future)) {
280             finishVoidOperation(operationName, messageSupplier, expectedResponseClass, propagateException,
281                     returnFuture, callback);
282         } else {
283             Futures.addCallback(future, new FutureCallback<Void>() {
284                 @Override
285                 public void onSuccess(Void notUsed) {
286                     finishVoidOperation(operationName, messageSupplier, expectedResponseClass,
287                             propagateException, returnFuture, callback);
288                 }
289
290                 @Override
291                 public void onFailure(Throwable failure) {
292                     LOG.debug("Tx {}: a {} cohort path Future failed: {}", transactionId, operationName, failure);
293
294                     if (propagateException) {
295                         returnFuture.setException(failure);
296                     } else {
297                         returnFuture.set(null);
298                     }
299                 }
300             });
301         }
302
303         return returnFuture;
304     }
305
306     private void finishVoidOperation(final String operationName, MessageSupplier messageSupplier,
307                                      final Class<?> expectedResponseClass, final boolean propagateException,
308                                      final SettableFuture<Void> returnFuture, final OperationCallback callback) {
309         LOG.debug("Tx {} finish {}", transactionId, operationName);
310
311         callback.resume();
312
313         Future<Iterable<Object>> combinedFuture = invokeCohorts(messageSupplier);
314
315         combinedFuture.onComplete(new OnComplete<Iterable<Object>>() {
316             @Override
317             public void onComplete(Throwable failure, Iterable<Object> responses) throws Throwable {
318                 Throwable exceptionToPropagate = failure;
319                 if (exceptionToPropagate == null) {
320                     for (Object response: responses) {
321                         if (!response.getClass().equals(expectedResponseClass)) {
322                             exceptionToPropagate = new IllegalArgumentException(
323                                     String.format("Unexpected response type %s", response.getClass()));
324                             break;
325                         }
326                     }
327                 }
328
329                 if (exceptionToPropagate != null) {
330                     LOG.debug("Tx {}: a {} cohort Future failed", transactionId, operationName, exceptionToPropagate);
331                     if (propagateException) {
332                         // We don't log the exception here to avoid redundant logging since we're
333                         // propagating to the caller in MD-SAL core who will log it.
334                         returnFuture.setException(exceptionToPropagate);
335                     } else {
336                         // Since the caller doesn't want us to propagate the exception we'll also
337                         // not log it normally. But it's usually not good to totally silence
338                         // exceptions so we'll log it to debug level.
339                         returnFuture.set(null);
340                     }
341
342                     callback.failure();
343                 } else {
344                     LOG.debug("Tx {}: {} succeeded", transactionId, operationName);
345
346                     returnFuture.set(null);
347
348                     callback.success();
349                 }
350             }
351         }, actorContext.getClientDispatcher());
352     }
353
354     @Override
355     List<Future<ActorSelection>> getCohortFutures() {
356         List<Future<ActorSelection>> cohortFutures = new ArrayList<>(cohorts.size());
357         for (CohortInfo info: cohorts) {
358             cohortFutures.add(info.getActorFuture());
359         }
360
361         return cohortFutures;
362     }
363
364     static class CohortInfo {
365         private final Future<ActorSelection> actorFuture;
366         private volatile ActorSelection resolvedActor;
367         private final Supplier<Short> actorVersionSupplier;
368
369         CohortInfo(Future<ActorSelection> actorFuture, Supplier<Short> actorVersionSupplier) {
370             this.actorFuture = actorFuture;
371             this.actorVersionSupplier = actorVersionSupplier;
372         }
373
374         Future<ActorSelection> getActorFuture() {
375             return actorFuture;
376         }
377
378         ActorSelection getResolvedActor() {
379             return resolvedActor;
380         }
381
382         void setResolvedActor(ActorSelection resolvedActor) {
383             this.resolvedActor = resolvedActor;
384         }
385
386         short getActorVersion() {
387             Preconditions.checkState(resolvedActor != null,
388                     "getActorVersion cannot be called until the actor is resolved");
389             return actorVersionSupplier.get();
390         }
391     }
392
393     private interface MessageSupplier {
394         Object newMessage(TransactionIdentifier transactionId, short version);
395
396         boolean isSerializedReplyType(Object reply);
397     }
398 }