a78b9a2ef6294dbd04d896b4270e69c02fbd13ce
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / main / java / org / opendaylight / controller / cluster / datastore / ThreePhaseCommitCohortProxy.java
1 /*
2  * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.controller.cluster.datastore;
10
11 import akka.actor.ActorSelection;
12 import akka.dispatch.OnComplete;
13 import com.google.common.base.Preconditions;
14 import com.google.common.base.Supplier;
15 import com.google.common.collect.Lists;
16 import com.google.common.util.concurrent.FutureCallback;
17 import com.google.common.util.concurrent.Futures;
18 import com.google.common.util.concurrent.ListenableFuture;
19 import com.google.common.util.concurrent.MoreExecutors;
20 import com.google.common.util.concurrent.SettableFuture;
21 import java.util.ArrayList;
22 import java.util.Iterator;
23 import java.util.List;
24 import java.util.concurrent.atomic.AtomicInteger;
25 import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
26 import org.opendaylight.controller.cluster.datastore.messages.AbortTransaction;
27 import org.opendaylight.controller.cluster.datastore.messages.AbortTransactionReply;
28 import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransaction;
29 import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransactionReply;
30 import org.opendaylight.controller.cluster.datastore.messages.CommitTransaction;
31 import org.opendaylight.controller.cluster.datastore.messages.CommitTransactionReply;
32 import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
33 import org.slf4j.Logger;
34 import org.slf4j.LoggerFactory;
35 import scala.concurrent.Future;
36
37 /**
38  * ThreePhaseCommitCohortProxy represents a set of remote cohort proxies.
39  */
40 public class ThreePhaseCommitCohortProxy extends AbstractThreePhaseCommitCohort<ActorSelection> {
41
42     private static final Logger LOG = LoggerFactory.getLogger(ThreePhaseCommitCohortProxy.class);
43
44     private static final MessageSupplier COMMIT_MESSAGE_SUPPLIER = new MessageSupplier() {
45         @Override
46         public Object newMessage(final TransactionIdentifier transactionId, final short version) {
47             return new CommitTransaction(transactionId, version).toSerializable();
48         }
49
50         @Override
51         public boolean isSerializedReplyType(final Object reply) {
52             return CommitTransactionReply.isSerializedType(reply);
53         }
54     };
55
56     private static final MessageSupplier ABORT_MESSAGE_SUPPLIER = new MessageSupplier() {
57         @Override
58         public Object newMessage(final TransactionIdentifier transactionId, final short version) {
59             return new AbortTransaction(transactionId, version).toSerializable();
60         }
61
62         @Override
63         public boolean isSerializedReplyType(final Object reply) {
64             return AbortTransactionReply.isSerializedType(reply);
65         }
66     };
67
68     private final ActorContext actorContext;
69     private final List<CohortInfo> cohorts;
70     private final SettableFuture<Void> cohortsResolvedFuture = SettableFuture.create();
71     private final TransactionIdentifier transactionId;
72     private volatile OperationCallback commitOperationCallback;
73
74     public ThreePhaseCommitCohortProxy(final ActorContext actorContext, final List<CohortInfo> cohorts,
75             final TransactionIdentifier transactionId) {
76         this.actorContext = actorContext;
77         this.cohorts = cohorts;
78         this.transactionId = Preconditions.checkNotNull(transactionId);
79
80         if (cohorts.isEmpty()) {
81             cohortsResolvedFuture.set(null);
82         }
83     }
84
85     private ListenableFuture<Void> resolveCohorts() {
86         if (cohortsResolvedFuture.isDone()) {
87             return cohortsResolvedFuture;
88         }
89
90         final AtomicInteger completed = new AtomicInteger(cohorts.size());
91         final Object lock = new Object();
92         for (final CohortInfo info: cohorts) {
93             info.getActorFuture().onComplete(new OnComplete<ActorSelection>() {
94                 @Override
95                 public void onComplete(final Throwable failure, final ActorSelection actor)  {
96                     synchronized (lock) {
97                         boolean done = completed.decrementAndGet() == 0;
98                         if (failure != null) {
99                             LOG.debug("Tx {}: a cohort Future failed", transactionId, failure);
100                             cohortsResolvedFuture.setException(failure);
101                         } else if (!cohortsResolvedFuture.isDone()) {
102                             LOG.debug("Tx {}: cohort actor {} resolved", transactionId, actor);
103
104                             info.setResolvedActor(actor);
105                             if (done) {
106                                 LOG.debug("Tx {}: successfully resolved all cohort actors", transactionId);
107                                 cohortsResolvedFuture.set(null);
108                             }
109                         }
110                     }
111                 }
112             }, actorContext.getClientDispatcher());
113         }
114
115         return cohortsResolvedFuture;
116     }
117
118     @Override
119     public ListenableFuture<Boolean> canCommit() {
120         LOG.debug("Tx {} canCommit", transactionId);
121
122         final SettableFuture<Boolean> returnFuture = SettableFuture.create();
123
124         // The first phase of canCommit is to gather the list of cohort actor paths that will
125         // participate in the commit. buildCohortPathsList combines the cohort path Futures into
126         // one Future which we wait on asynchronously here. The cohort actor paths are
127         // extracted from ReadyTransactionReply messages by the Futures that were obtained earlier
128         // and passed to us from upstream processing. If any one fails then  we'll fail canCommit.
129
130         Futures.addCallback(resolveCohorts(), new FutureCallback<Void>() {
131             @Override
132             public void onSuccess(final Void notUsed) {
133                 finishCanCommit(returnFuture);
134             }
135
136             @Override
137             public void onFailure(final Throwable failure) {
138                 returnFuture.setException(failure);
139             }
140         }, MoreExecutors.directExecutor());
141
142         return returnFuture;
143     }
144
145     private void finishCanCommit(final SettableFuture<Boolean> returnFuture) {
146         LOG.debug("Tx {} finishCanCommit", transactionId);
147
148         // For empty transactions return immediately
149         if (cohorts.size() == 0) {
150             LOG.debug("Tx {}: canCommit returning result true", transactionId);
151             returnFuture.set(Boolean.TRUE);
152             return;
153         }
154
155         commitOperationCallback = new TransactionRateLimitingCallback(actorContext);
156         commitOperationCallback.run();
157
158         final Iterator<CohortInfo> iterator = cohorts.iterator();
159
160         final OnComplete<Object> onComplete = new OnComplete<Object>() {
161             @Override
162             public void onComplete(final Throwable failure, final Object response) {
163                 if (failure != null) {
164                     LOG.debug("Tx {}: a canCommit cohort Future failed", transactionId, failure);
165
166                     returnFuture.setException(failure);
167                     commitOperationCallback.failure();
168                     return;
169                 }
170
171                 // Only the first call to pause takes effect - subsequent calls before resume are no-ops. So
172                 // this means we'll only time the first transaction canCommit which should be fine.
173                 commitOperationCallback.pause();
174
175                 boolean result = true;
176                 if (CanCommitTransactionReply.isSerializedType(response)) {
177                     CanCommitTransactionReply reply = CanCommitTransactionReply.fromSerializable(response);
178
179                     LOG.debug("Tx {}: received {}", transactionId, response);
180
181                     if (!reply.getCanCommit()) {
182                         result = false;
183                     }
184                 } else {
185                     LOG.error("Unexpected response type {}", response.getClass());
186                     returnFuture.setException(new IllegalArgumentException(
187                             String.format("Unexpected response type %s", response.getClass())));
188                     return;
189                 }
190
191                 if (iterator.hasNext() && result) {
192                     sendCanCommitTransaction(iterator.next(), this);
193                 } else {
194                     LOG.debug("Tx {}: canCommit returning result: {}", transactionId, result);
195                     returnFuture.set(Boolean.valueOf(result));
196                 }
197
198             }
199         };
200
201         sendCanCommitTransaction(iterator.next(), onComplete);
202     }
203
204     private void sendCanCommitTransaction(final CohortInfo toCohortInfo, final OnComplete<Object> onComplete) {
205         CanCommitTransaction message = new CanCommitTransaction(transactionId, toCohortInfo.getActorVersion());
206
207         LOG.debug("Tx {}: sending {} to {}", transactionId, message, toCohortInfo.getResolvedActor());
208
209         Future<Object> future = actorContext.executeOperationAsync(toCohortInfo.getResolvedActor(),
210                 message.toSerializable(), actorContext.getTransactionCommitOperationTimeout());
211         future.onComplete(onComplete, actorContext.getClientDispatcher());
212     }
213
214     private Future<Iterable<Object>> invokeCohorts(final MessageSupplier messageSupplier) {
215         List<Future<Object>> futureList = Lists.newArrayListWithCapacity(cohorts.size());
216         for (CohortInfo cohort : cohorts) {
217             Object message = messageSupplier.newMessage(transactionId, cohort.getActorVersion());
218
219             LOG.debug("Tx {}: Sending {} to cohort {}", transactionId, message , cohort.getResolvedActor());
220
221             futureList.add(actorContext.executeOperationAsync(cohort.getResolvedActor(), message,
222                     actorContext.getTransactionCommitOperationTimeout()));
223         }
224
225         return akka.dispatch.Futures.sequence(futureList, actorContext.getClientDispatcher());
226     }
227
228     @Override
229     public ListenableFuture<Void> preCommit() {
230         // We don't need to do anything here - preCommit is done atomically with the commit phase
231         // by the shard.
232         return IMMEDIATE_VOID_SUCCESS;
233     }
234
235     @Override
236     public ListenableFuture<Void> abort() {
237         // Note - we pass false for propagateException. In the front-end data broker, this method
238         // is called when one of the 3 phases fails with an exception. We'd rather have that
239         // original exception propagated to the client. If our abort fails and we propagate the
240         // exception then that exception will supersede and suppress the original exception. But
241         // it's the original exception that is the root cause and of more interest to the client.
242
243         return voidOperation("abort", ABORT_MESSAGE_SUPPLIER,
244                 AbortTransactionReply.class, false, OperationCallback.NO_OP_CALLBACK);
245     }
246
247     @Override
248     public ListenableFuture<Void> commit() {
249         OperationCallback operationCallback = commitOperationCallback != null ? commitOperationCallback :
250             OperationCallback.NO_OP_CALLBACK;
251
252         return voidOperation("commit", COMMIT_MESSAGE_SUPPLIER,
253                 CommitTransactionReply.class, true, operationCallback);
254     }
255
256     @SuppressWarnings("checkstyle:IllegalCatch")
257     private static boolean successfulFuture(final ListenableFuture<Void> future) {
258         if (!future.isDone()) {
259             return false;
260         }
261
262         try {
263             future.get();
264             return true;
265         } catch (Exception e) {
266             return false;
267         }
268     }
269
270     private ListenableFuture<Void> voidOperation(final String operationName,
271             final MessageSupplier messageSupplier, final Class<?> expectedResponseClass,
272             final boolean propagateException, final OperationCallback callback) {
273         LOG.debug("Tx {} {}", transactionId, operationName);
274
275         final SettableFuture<Void> returnFuture = SettableFuture.create();
276
277         // The cohort actor list should already be built at this point by the canCommit phase but,
278         // if not for some reason, we'll try to build it here.
279
280         ListenableFuture<Void> future = resolveCohorts();
281         if (successfulFuture(future)) {
282             finishVoidOperation(operationName, messageSupplier, expectedResponseClass, propagateException,
283                     returnFuture, callback);
284         } else {
285             Futures.addCallback(future, new FutureCallback<Void>() {
286                 @Override
287                 public void onSuccess(final Void notUsed) {
288                     finishVoidOperation(operationName, messageSupplier, expectedResponseClass,
289                             propagateException, returnFuture, callback);
290                 }
291
292                 @Override
293                 public void onFailure(final Throwable failure) {
294                     LOG.debug("Tx {}: a {} cohort path Future failed: {}", transactionId, operationName, failure);
295
296                     if (propagateException) {
297                         returnFuture.setException(failure);
298                     } else {
299                         returnFuture.set(null);
300                     }
301                 }
302             }, MoreExecutors.directExecutor());
303         }
304
305         return returnFuture;
306     }
307
308     private void finishVoidOperation(final String operationName, final MessageSupplier messageSupplier,
309                                      final Class<?> expectedResponseClass, final boolean propagateException,
310                                      final SettableFuture<Void> returnFuture, final OperationCallback callback) {
311         LOG.debug("Tx {} finish {}", transactionId, operationName);
312
313         callback.resume();
314
315         Future<Iterable<Object>> combinedFuture = invokeCohorts(messageSupplier);
316
317         combinedFuture.onComplete(new OnComplete<Iterable<Object>>() {
318             @Override
319             public void onComplete(final Throwable failure, final Iterable<Object> responses) {
320                 Throwable exceptionToPropagate = failure;
321                 if (exceptionToPropagate == null) {
322                     for (Object response: responses) {
323                         if (!response.getClass().equals(expectedResponseClass)) {
324                             exceptionToPropagate = new IllegalArgumentException(
325                                     String.format("Unexpected response type %s", response.getClass()));
326                             break;
327                         }
328                     }
329                 }
330
331                 if (exceptionToPropagate != null) {
332                     LOG.debug("Tx {}: a {} cohort Future failed", transactionId, operationName, exceptionToPropagate);
333                     if (propagateException) {
334                         // We don't log the exception here to avoid redundant logging since we're
335                         // propagating to the caller in MD-SAL core who will log it.
336                         returnFuture.setException(exceptionToPropagate);
337                     } else {
338                         // Since the caller doesn't want us to propagate the exception we'll also
339                         // not log it normally. But it's usually not good to totally silence
340                         // exceptions so we'll log it to debug level.
341                         returnFuture.set(null);
342                     }
343
344                     callback.failure();
345                 } else {
346                     LOG.debug("Tx {}: {} succeeded", transactionId, operationName);
347
348                     returnFuture.set(null);
349
350                     callback.success();
351                 }
352             }
353         }, actorContext.getClientDispatcher());
354     }
355
356     @Override
357     List<Future<ActorSelection>> getCohortFutures() {
358         List<Future<ActorSelection>> cohortFutures = new ArrayList<>(cohorts.size());
359         for (CohortInfo info: cohorts) {
360             cohortFutures.add(info.getActorFuture());
361         }
362
363         return cohortFutures;
364     }
365
366     static class CohortInfo {
367         private final Future<ActorSelection> actorFuture;
368         private volatile ActorSelection resolvedActor;
369         private final Supplier<Short> actorVersionSupplier;
370
371         CohortInfo(final Future<ActorSelection> actorFuture, final Supplier<Short> actorVersionSupplier) {
372             this.actorFuture = actorFuture;
373             this.actorVersionSupplier = actorVersionSupplier;
374         }
375
376         Future<ActorSelection> getActorFuture() {
377             return actorFuture;
378         }
379
380         ActorSelection getResolvedActor() {
381             return resolvedActor;
382         }
383
384         void setResolvedActor(final ActorSelection resolvedActor) {
385             this.resolvedActor = resolvedActor;
386         }
387
388         short getActorVersion() {
389             Preconditions.checkState(resolvedActor != null,
390                     "getActorVersion cannot be called until the actor is resolved");
391             return actorVersionSupplier.get();
392         }
393     }
394
395     private interface MessageSupplier {
396         Object newMessage(TransactionIdentifier transactionId, short version);
397
398         boolean isSerializedReplyType(Object reply);
399     }
400 }