BUG-5280: Remove PeristentMessages
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / main / java / org / opendaylight / controller / cluster / datastore / ThreePhaseCommitCohortProxy.java
1 /*
2  * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.controller.cluster.datastore;
10
11 import akka.actor.ActorSelection;
12 import akka.dispatch.OnComplete;
13 import com.google.common.base.Preconditions;
14 import com.google.common.base.Supplier;
15 import com.google.common.collect.Lists;
16 import com.google.common.util.concurrent.FutureCallback;
17 import com.google.common.util.concurrent.Futures;
18 import com.google.common.util.concurrent.ListenableFuture;
19 import com.google.common.util.concurrent.SettableFuture;
20 import java.util.ArrayList;
21 import java.util.Iterator;
22 import java.util.List;
23 import java.util.concurrent.atomic.AtomicInteger;
24 import org.opendaylight.controller.cluster.datastore.messages.AbortTransaction;
25 import org.opendaylight.controller.cluster.datastore.messages.AbortTransactionReply;
26 import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransaction;
27 import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransactionReply;
28 import org.opendaylight.controller.cluster.datastore.messages.CommitTransaction;
29 import org.opendaylight.controller.cluster.datastore.messages.CommitTransactionReply;
30 import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
31 import org.slf4j.Logger;
32 import org.slf4j.LoggerFactory;
33 import scala.concurrent.Future;
34
35 /**
36  * ThreePhaseCommitCohortProxy represents a set of remote cohort proxies
37  */
38 public class ThreePhaseCommitCohortProxy extends AbstractThreePhaseCommitCohort<ActorSelection> {
39
40     private static final Logger LOG = LoggerFactory.getLogger(ThreePhaseCommitCohortProxy.class);
41
42     private static final MessageSupplier COMMIT_MESSAGE_SUPPLIER = new MessageSupplier() {
43         @Override
44         public Object newMessage(String transactionId, short version) {
45             return new CommitTransaction(transactionId, version).toSerializable();
46         }
47
48         @Override
49         public boolean isSerializedReplyType(Object reply) {
50             return CommitTransactionReply.isSerializedType(reply);
51         }
52     };
53
54     private static final MessageSupplier ABORT_MESSAGE_SUPPLIER = new MessageSupplier() {
55         @Override
56         public Object newMessage(String transactionId, short version) {
57             return new AbortTransaction(transactionId, version).toSerializable();
58         }
59
60         @Override
61         public boolean isSerializedReplyType(Object reply) {
62             return AbortTransactionReply.isSerializedType(reply);
63         }
64     };
65
66     private final ActorContext actorContext;
67     private final List<CohortInfo> cohorts;
68     private final SettableFuture<Void> cohortsResolvedFuture = SettableFuture.create();
69     private final String transactionId;
70     private volatile OperationCallback commitOperationCallback;
71
72     public ThreePhaseCommitCohortProxy(ActorContext actorContext, List<CohortInfo> cohorts, String transactionId) {
73         this.actorContext = actorContext;
74         this.cohorts = cohorts;
75         this.transactionId = transactionId;
76
77         if(cohorts.isEmpty()) {
78             cohortsResolvedFuture.set(null);
79         }
80     }
81
82     private ListenableFuture<Void> resolveCohorts() {
83         if(cohortsResolvedFuture.isDone()) {
84             return cohortsResolvedFuture;
85         }
86
87         final AtomicInteger completed = new AtomicInteger(cohorts.size());
88         for(final CohortInfo info: cohorts) {
89             info.getActorFuture().onComplete(new OnComplete<ActorSelection>() {
90                 @Override
91                 public void onComplete(Throwable failure, ActorSelection actor)  {
92                     synchronized(completed) {
93                         boolean done = completed.decrementAndGet() == 0;
94                         if(failure != null) {
95                             LOG.debug("Tx {}: a cohort Future failed", transactionId, failure);
96                             cohortsResolvedFuture.setException(failure);
97                         } else if(!cohortsResolvedFuture.isDone()) {
98                             LOG.debug("Tx {}: cohort actor {} resolved", transactionId, actor);
99
100                             info.setResolvedActor(actor);
101                             if(done) {
102                                 LOG.debug("Tx {}: successfully resolved all cohort actors", transactionId);
103                                 cohortsResolvedFuture.set(null);
104                             }
105                         }
106                     }
107                 }
108             }, actorContext.getClientDispatcher());
109         }
110
111         return cohortsResolvedFuture;
112     }
113
114     @Override
115     public ListenableFuture<Boolean> canCommit() {
116         LOG.debug("Tx {} canCommit", transactionId);
117
118         final SettableFuture<Boolean> returnFuture = SettableFuture.create();
119
120         // The first phase of canCommit is to gather the list of cohort actor paths that will
121         // participate in the commit. buildCohortPathsList combines the cohort path Futures into
122         // one Future which we wait on asynchronously here. The cohort actor paths are
123         // extracted from ReadyTransactionReply messages by the Futures that were obtained earlier
124         // and passed to us from upstream processing. If any one fails then  we'll fail canCommit.
125
126         Futures.addCallback(resolveCohorts(), new FutureCallback<Void>() {
127             @Override
128             public void onSuccess(Void notUsed) {
129                 finishCanCommit(returnFuture);
130             }
131
132             @Override
133             public void onFailure(Throwable failure) {
134                 returnFuture.setException(failure);
135             }
136         });
137
138         return returnFuture;
139     }
140
141     private void finishCanCommit(final SettableFuture<Boolean> returnFuture) {
142         LOG.debug("Tx {} finishCanCommit", transactionId);
143
144         // For empty transactions return immediately
145         if(cohorts.size() == 0){
146             LOG.debug("Tx {}: canCommit returning result true", transactionId);
147             returnFuture.set(Boolean.TRUE);
148             return;
149         }
150
151         commitOperationCallback = new TransactionRateLimitingCallback(actorContext);
152         commitOperationCallback.run();
153
154         final Iterator<CohortInfo> iterator = cohorts.iterator();
155
156         final OnComplete<Object> onComplete = new OnComplete<Object>() {
157             @Override
158             public void onComplete(Throwable failure, Object response) {
159                 if (failure != null) {
160                     LOG.debug("Tx {}: a canCommit cohort Future failed", transactionId, failure);
161
162                     returnFuture.setException(failure);
163                     commitOperationCallback.failure();
164                     return;
165                 }
166
167                 // Only the first call to pause takes effect - subsequent calls before resume are no-ops. So
168                 // this means we'll only time the first transaction canCommit which should be fine.
169                 commitOperationCallback.pause();
170
171                 boolean result = true;
172                 if (CanCommitTransactionReply.isSerializedType(response)) {
173                     CanCommitTransactionReply reply = CanCommitTransactionReply.fromSerializable(response);
174
175                     LOG.debug("Tx {}: received {}", transactionId, response);
176
177                     if (!reply.getCanCommit()) {
178                         result = false;
179                     }
180                 } else {
181                     LOG.error("Unexpected response type {}", response.getClass());
182                     returnFuture.setException(new IllegalArgumentException(
183                             String.format("Unexpected response type %s", response.getClass())));
184                     return;
185                 }
186
187                 if(iterator.hasNext() && result) {
188                     sendCanCommitTransaction(iterator.next(), this);
189                 } else {
190                     LOG.debug("Tx {}: canCommit returning result: {}", transactionId, result);
191                     returnFuture.set(Boolean.valueOf(result));
192                 }
193
194             }
195         };
196
197         sendCanCommitTransaction(iterator.next(), onComplete);
198     }
199
200     private void sendCanCommitTransaction(CohortInfo toCohortInfo, OnComplete<Object> onComplete) {
201         CanCommitTransaction message = new CanCommitTransaction(transactionId, toCohortInfo.getActorVersion());
202
203         if(LOG.isDebugEnabled()) {
204             LOG.debug("Tx {}: sending {} to {}", transactionId, message, toCohortInfo.getResolvedActor());
205         }
206
207         Future<Object> future = actorContext.executeOperationAsync(toCohortInfo.getResolvedActor(),
208                 message.toSerializable(), actorContext.getTransactionCommitOperationTimeout());
209         future.onComplete(onComplete, actorContext.getClientDispatcher());
210     }
211
212     private Future<Iterable<Object>> invokeCohorts(MessageSupplier messageSupplier) {
213         List<Future<Object>> futureList = Lists.newArrayListWithCapacity(cohorts.size());
214         for(CohortInfo cohort : cohorts) {
215             Object message = messageSupplier.newMessage(transactionId, cohort.getActorVersion());
216
217             if(LOG.isDebugEnabled()) {
218                 LOG.debug("Tx {}: Sending {} to cohort {}", transactionId, message , cohort);
219             }
220
221             futureList.add(actorContext.executeOperationAsync(cohort.getResolvedActor(), message,
222                     actorContext.getTransactionCommitOperationTimeout()));
223         }
224
225         return akka.dispatch.Futures.sequence(futureList, actorContext.getClientDispatcher());
226     }
227
228     @Override
229     public ListenableFuture<Void> preCommit() {
230         // We don't need to do anything here - preCommit is done atomically with the commit phase
231         // by the shard.
232         return IMMEDIATE_VOID_SUCCESS;
233     }
234
235     @Override
236     public ListenableFuture<Void> abort() {
237         // Note - we pass false for propagateException. In the front-end data broker, this method
238         // is called when one of the 3 phases fails with an exception. We'd rather have that
239         // original exception propagated to the client. If our abort fails and we propagate the
240         // exception then that exception will supersede and suppress the original exception. But
241         // it's the original exception that is the root cause and of more interest to the client.
242
243         return voidOperation("abort", ABORT_MESSAGE_SUPPLIER,
244                 AbortTransactionReply.class, false, OperationCallback.NO_OP_CALLBACK);
245     }
246
247     @Override
248     public ListenableFuture<Void> commit() {
249         OperationCallback operationCallback = commitOperationCallback != null ? commitOperationCallback :
250             OperationCallback.NO_OP_CALLBACK;
251
252         return voidOperation("commit", COMMIT_MESSAGE_SUPPLIER,
253                 CommitTransactionReply.class, true, operationCallback);
254     }
255
256     private static boolean successfulFuture(ListenableFuture<Void> future) {
257         if(!future.isDone()) {
258             return false;
259         }
260
261         try {
262             future.get();
263             return true;
264         } catch(Exception e) {
265             return false;
266         }
267     }
268
269     private ListenableFuture<Void> voidOperation(final String operationName,
270             final MessageSupplier messageSupplier, final Class<?> expectedResponseClass,
271             final boolean propagateException, final OperationCallback callback) {
272         LOG.debug("Tx {} {}", transactionId, operationName);
273
274         final SettableFuture<Void> returnFuture = SettableFuture.create();
275
276         // The cohort actor list should already be built at this point by the canCommit phase but,
277         // if not for some reason, we'll try to build it here.
278
279         ListenableFuture<Void> future = resolveCohorts();
280         if(successfulFuture(future)) {
281             finishVoidOperation(operationName, messageSupplier, expectedResponseClass, propagateException,
282                     returnFuture, callback);
283         } else {
284             Futures.addCallback(future, new FutureCallback<Void>() {
285                 @Override
286                 public void onSuccess(Void notUsed) {
287                     finishVoidOperation(operationName, messageSupplier, expectedResponseClass,
288                             propagateException, returnFuture, callback);
289                 }
290
291                 @Override
292                 public void onFailure(Throwable failure) {
293                     LOG.debug("Tx {}: a {} cohort path Future failed: {}", transactionId, operationName, failure);
294
295                     if(propagateException) {
296                         returnFuture.setException(failure);
297                     } else {
298                         returnFuture.set(null);
299                     }
300                 }
301             });
302         }
303
304         return returnFuture;
305     }
306
307     private void finishVoidOperation(final String operationName, MessageSupplier messageSupplier,
308                                      final Class<?> expectedResponseClass, final boolean propagateException,
309                                      final SettableFuture<Void> returnFuture, final OperationCallback callback) {
310         LOG.debug("Tx {} finish {}", transactionId, operationName);
311
312         callback.resume();
313
314         Future<Iterable<Object>> combinedFuture = invokeCohorts(messageSupplier);
315
316         combinedFuture.onComplete(new OnComplete<Iterable<Object>>() {
317             @Override
318             public void onComplete(Throwable failure, Iterable<Object> responses) throws Throwable {
319                 Throwable exceptionToPropagate = failure;
320                 if(exceptionToPropagate == null) {
321                     for(Object response: responses) {
322                         if(!response.getClass().equals(expectedResponseClass)) {
323                             exceptionToPropagate = new IllegalArgumentException(
324                                     String.format("Unexpected response type %s", response.getClass()));
325                             break;
326                         }
327                     }
328                 }
329
330                 if(exceptionToPropagate != null) {
331                     LOG.debug("Tx {}: a {} cohort Future failed", transactionId, operationName, exceptionToPropagate);
332                     if(propagateException) {
333                         // We don't log the exception here to avoid redundant logging since we're
334                         // propagating to the caller in MD-SAL core who will log it.
335                         returnFuture.setException(exceptionToPropagate);
336                     } else {
337                         // Since the caller doesn't want us to propagate the exception we'll also
338                         // not log it normally. But it's usually not good to totally silence
339                         // exceptions so we'll log it to debug level.
340                         returnFuture.set(null);
341                     }
342
343                     callback.failure();
344                 } else {
345                     LOG.debug("Tx {}: {} succeeded", transactionId, operationName);
346
347                     returnFuture.set(null);
348
349                     callback.success();
350                 }
351             }
352         }, actorContext.getClientDispatcher());
353     }
354
355     @Override
356     List<Future<ActorSelection>> getCohortFutures() {
357         List<Future<ActorSelection>> cohortFutures = new ArrayList<>(cohorts.size());
358         for(CohortInfo info: cohorts) {
359             cohortFutures.add(info.getActorFuture());
360         }
361
362         return cohortFutures;
363     }
364
365     static class CohortInfo {
366         private final Future<ActorSelection> actorFuture;
367         private volatile ActorSelection resolvedActor;
368         private final Supplier<Short> actorVersionSupplier;
369
370         CohortInfo(Future<ActorSelection> actorFuture, Supplier<Short> actorVersionSupplier) {
371             this.actorFuture = actorFuture;
372             this.actorVersionSupplier = actorVersionSupplier;
373         }
374
375         Future<ActorSelection> getActorFuture() {
376             return actorFuture;
377         }
378
379         ActorSelection getResolvedActor() {
380             return resolvedActor;
381         }
382
383         void setResolvedActor(ActorSelection resolvedActor) {
384             this.resolvedActor = resolvedActor;
385         }
386
387         short getActorVersion() {
388             Preconditions.checkState(resolvedActor != null,
389                     "getActorVersion cannot be called until the actor is resolved");
390             return actorVersionSupplier.get();
391         }
392     }
393
394     private interface MessageSupplier {
395         Object newMessage(String transactionId, short version);
396         boolean isSerializedReplyType(Object reply);
397     }
398 }