BUG-5280: switch transaction IDs from String to TransactionIdentifier
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / main / java / org / opendaylight / controller / cluster / datastore / ThreePhaseCommitCohortProxy.java
1 /*
2  * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.controller.cluster.datastore;
10
11 import akka.actor.ActorSelection;
12 import akka.dispatch.OnComplete;
13 import com.google.common.base.Preconditions;
14 import com.google.common.base.Supplier;
15 import com.google.common.collect.Lists;
16 import com.google.common.util.concurrent.FutureCallback;
17 import com.google.common.util.concurrent.Futures;
18 import com.google.common.util.concurrent.ListenableFuture;
19 import com.google.common.util.concurrent.SettableFuture;
20 import java.util.ArrayList;
21 import java.util.Iterator;
22 import java.util.List;
23 import java.util.concurrent.atomic.AtomicInteger;
24 import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
25 import org.opendaylight.controller.cluster.datastore.messages.AbortTransaction;
26 import org.opendaylight.controller.cluster.datastore.messages.AbortTransactionReply;
27 import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransaction;
28 import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransactionReply;
29 import org.opendaylight.controller.cluster.datastore.messages.CommitTransaction;
30 import org.opendaylight.controller.cluster.datastore.messages.CommitTransactionReply;
31 import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
32 import org.slf4j.Logger;
33 import org.slf4j.LoggerFactory;
34 import scala.concurrent.Future;
35
36 /**
37  * ThreePhaseCommitCohortProxy represents a set of remote cohort proxies
38  */
39 public class ThreePhaseCommitCohortProxy extends AbstractThreePhaseCommitCohort<ActorSelection> {
40
41     private static final Logger LOG = LoggerFactory.getLogger(ThreePhaseCommitCohortProxy.class);
42
43     private static final MessageSupplier COMMIT_MESSAGE_SUPPLIER = new MessageSupplier() {
44         @Override
45         public Object newMessage(TransactionIdentifier transactionId, short version) {
46             return new CommitTransaction(transactionId, version).toSerializable();
47         }
48
49         @Override
50         public boolean isSerializedReplyType(Object reply) {
51             return CommitTransactionReply.isSerializedType(reply);
52         }
53     };
54
55     private static final MessageSupplier ABORT_MESSAGE_SUPPLIER = new MessageSupplier() {
56         @Override
57         public Object newMessage(TransactionIdentifier transactionId, short version) {
58             return new AbortTransaction(transactionId, version).toSerializable();
59         }
60
61         @Override
62         public boolean isSerializedReplyType(Object reply) {
63             return AbortTransactionReply.isSerializedType(reply);
64         }
65     };
66
67     private final ActorContext actorContext;
68     private final List<CohortInfo> cohorts;
69     private final SettableFuture<Void> cohortsResolvedFuture = SettableFuture.create();
70     private final TransactionIdentifier transactionId;
71     private volatile OperationCallback commitOperationCallback;
72
73     public ThreePhaseCommitCohortProxy(ActorContext actorContext, List<CohortInfo> cohorts,
74             TransactionIdentifier transactionId) {
75         this.actorContext = actorContext;
76         this.cohorts = cohorts;
77         this.transactionId = Preconditions.checkNotNull(transactionId);
78
79         if(cohorts.isEmpty()) {
80             cohortsResolvedFuture.set(null);
81         }
82     }
83
84     private ListenableFuture<Void> resolveCohorts() {
85         if(cohortsResolvedFuture.isDone()) {
86             return cohortsResolvedFuture;
87         }
88
89         final AtomicInteger completed = new AtomicInteger(cohorts.size());
90         for(final CohortInfo info: cohorts) {
91             info.getActorFuture().onComplete(new OnComplete<ActorSelection>() {
92                 @Override
93                 public void onComplete(Throwable failure, ActorSelection actor)  {
94                     synchronized(completed) {
95                         boolean done = completed.decrementAndGet() == 0;
96                         if(failure != null) {
97                             LOG.debug("Tx {}: a cohort Future failed", transactionId, failure);
98                             cohortsResolvedFuture.setException(failure);
99                         } else if(!cohortsResolvedFuture.isDone()) {
100                             LOG.debug("Tx {}: cohort actor {} resolved", transactionId, actor);
101
102                             info.setResolvedActor(actor);
103                             if(done) {
104                                 LOG.debug("Tx {}: successfully resolved all cohort actors", transactionId);
105                                 cohortsResolvedFuture.set(null);
106                             }
107                         }
108                     }
109                 }
110             }, actorContext.getClientDispatcher());
111         }
112
113         return cohortsResolvedFuture;
114     }
115
116     @Override
117     public ListenableFuture<Boolean> canCommit() {
118         LOG.debug("Tx {} canCommit", transactionId);
119
120         final SettableFuture<Boolean> returnFuture = SettableFuture.create();
121
122         // The first phase of canCommit is to gather the list of cohort actor paths that will
123         // participate in the commit. buildCohortPathsList combines the cohort path Futures into
124         // one Future which we wait on asynchronously here. The cohort actor paths are
125         // extracted from ReadyTransactionReply messages by the Futures that were obtained earlier
126         // and passed to us from upstream processing. If any one fails then  we'll fail canCommit.
127
128         Futures.addCallback(resolveCohorts(), new FutureCallback<Void>() {
129             @Override
130             public void onSuccess(Void notUsed) {
131                 finishCanCommit(returnFuture);
132             }
133
134             @Override
135             public void onFailure(Throwable failure) {
136                 returnFuture.setException(failure);
137             }
138         });
139
140         return returnFuture;
141     }
142
143     private void finishCanCommit(final SettableFuture<Boolean> returnFuture) {
144         LOG.debug("Tx {} finishCanCommit", transactionId);
145
146         // For empty transactions return immediately
147         if(cohorts.size() == 0){
148             LOG.debug("Tx {}: canCommit returning result true", transactionId);
149             returnFuture.set(Boolean.TRUE);
150             return;
151         }
152
153         commitOperationCallback = new TransactionRateLimitingCallback(actorContext);
154         commitOperationCallback.run();
155
156         final Iterator<CohortInfo> iterator = cohorts.iterator();
157
158         final OnComplete<Object> onComplete = new OnComplete<Object>() {
159             @Override
160             public void onComplete(Throwable failure, Object response) {
161                 if (failure != null) {
162                     LOG.debug("Tx {}: a canCommit cohort Future failed", transactionId, failure);
163
164                     returnFuture.setException(failure);
165                     commitOperationCallback.failure();
166                     return;
167                 }
168
169                 // Only the first call to pause takes effect - subsequent calls before resume are no-ops. So
170                 // this means we'll only time the first transaction canCommit which should be fine.
171                 commitOperationCallback.pause();
172
173                 boolean result = true;
174                 if (CanCommitTransactionReply.isSerializedType(response)) {
175                     CanCommitTransactionReply reply = CanCommitTransactionReply.fromSerializable(response);
176
177                     LOG.debug("Tx {}: received {}", transactionId, response);
178
179                     if (!reply.getCanCommit()) {
180                         result = false;
181                     }
182                 } else {
183                     LOG.error("Unexpected response type {}", response.getClass());
184                     returnFuture.setException(new IllegalArgumentException(
185                             String.format("Unexpected response type %s", response.getClass())));
186                     return;
187                 }
188
189                 if(iterator.hasNext() && result) {
190                     sendCanCommitTransaction(iterator.next(), this);
191                 } else {
192                     LOG.debug("Tx {}: canCommit returning result: {}", transactionId, result);
193                     returnFuture.set(Boolean.valueOf(result));
194                 }
195
196             }
197         };
198
199         sendCanCommitTransaction(iterator.next(), onComplete);
200     }
201
202     private void sendCanCommitTransaction(CohortInfo toCohortInfo, OnComplete<Object> onComplete) {
203         CanCommitTransaction message = new CanCommitTransaction(transactionId, toCohortInfo.getActorVersion());
204
205         if(LOG.isDebugEnabled()) {
206             LOG.debug("Tx {}: sending {} to {}", transactionId, message, toCohortInfo.getResolvedActor());
207         }
208
209         Future<Object> future = actorContext.executeOperationAsync(toCohortInfo.getResolvedActor(),
210                 message.toSerializable(), actorContext.getTransactionCommitOperationTimeout());
211         future.onComplete(onComplete, actorContext.getClientDispatcher());
212     }
213
214     private Future<Iterable<Object>> invokeCohorts(MessageSupplier messageSupplier) {
215         List<Future<Object>> futureList = Lists.newArrayListWithCapacity(cohorts.size());
216         for(CohortInfo cohort : cohorts) {
217             Object message = messageSupplier.newMessage(transactionId, cohort.getActorVersion());
218
219             if(LOG.isDebugEnabled()) {
220                 LOG.debug("Tx {}: Sending {} to cohort {}", transactionId, message , cohort);
221             }
222
223             futureList.add(actorContext.executeOperationAsync(cohort.getResolvedActor(), message,
224                     actorContext.getTransactionCommitOperationTimeout()));
225         }
226
227         return akka.dispatch.Futures.sequence(futureList, actorContext.getClientDispatcher());
228     }
229
230     @Override
231     public ListenableFuture<Void> preCommit() {
232         // We don't need to do anything here - preCommit is done atomically with the commit phase
233         // by the shard.
234         return IMMEDIATE_VOID_SUCCESS;
235     }
236
237     @Override
238     public ListenableFuture<Void> abort() {
239         // Note - we pass false for propagateException. In the front-end data broker, this method
240         // is called when one of the 3 phases fails with an exception. We'd rather have that
241         // original exception propagated to the client. If our abort fails and we propagate the
242         // exception then that exception will supersede and suppress the original exception. But
243         // it's the original exception that is the root cause and of more interest to the client.
244
245         return voidOperation("abort", ABORT_MESSAGE_SUPPLIER,
246                 AbortTransactionReply.class, false, OperationCallback.NO_OP_CALLBACK);
247     }
248
249     @Override
250     public ListenableFuture<Void> commit() {
251         OperationCallback operationCallback = commitOperationCallback != null ? commitOperationCallback :
252             OperationCallback.NO_OP_CALLBACK;
253
254         return voidOperation("commit", COMMIT_MESSAGE_SUPPLIER,
255                 CommitTransactionReply.class, true, operationCallback);
256     }
257
258     private static boolean successfulFuture(ListenableFuture<Void> future) {
259         if(!future.isDone()) {
260             return false;
261         }
262
263         try {
264             future.get();
265             return true;
266         } catch(Exception e) {
267             return false;
268         }
269     }
270
271     private ListenableFuture<Void> voidOperation(final String operationName,
272             final MessageSupplier messageSupplier, final Class<?> expectedResponseClass,
273             final boolean propagateException, final OperationCallback callback) {
274         LOG.debug("Tx {} {}", transactionId, operationName);
275
276         final SettableFuture<Void> returnFuture = SettableFuture.create();
277
278         // The cohort actor list should already be built at this point by the canCommit phase but,
279         // if not for some reason, we'll try to build it here.
280
281         ListenableFuture<Void> future = resolveCohorts();
282         if(successfulFuture(future)) {
283             finishVoidOperation(operationName, messageSupplier, expectedResponseClass, propagateException,
284                     returnFuture, callback);
285         } else {
286             Futures.addCallback(future, new FutureCallback<Void>() {
287                 @Override
288                 public void onSuccess(Void notUsed) {
289                     finishVoidOperation(operationName, messageSupplier, expectedResponseClass,
290                             propagateException, returnFuture, callback);
291                 }
292
293                 @Override
294                 public void onFailure(Throwable failure) {
295                     LOG.debug("Tx {}: a {} cohort path Future failed: {}", transactionId, operationName, failure);
296
297                     if(propagateException) {
298                         returnFuture.setException(failure);
299                     } else {
300                         returnFuture.set(null);
301                     }
302                 }
303             });
304         }
305
306         return returnFuture;
307     }
308
309     private void finishVoidOperation(final String operationName, MessageSupplier messageSupplier,
310                                      final Class<?> expectedResponseClass, final boolean propagateException,
311                                      final SettableFuture<Void> returnFuture, final OperationCallback callback) {
312         LOG.debug("Tx {} finish {}", transactionId, operationName);
313
314         callback.resume();
315
316         Future<Iterable<Object>> combinedFuture = invokeCohorts(messageSupplier);
317
318         combinedFuture.onComplete(new OnComplete<Iterable<Object>>() {
319             @Override
320             public void onComplete(Throwable failure, Iterable<Object> responses) throws Throwable {
321                 Throwable exceptionToPropagate = failure;
322                 if(exceptionToPropagate == null) {
323                     for(Object response: responses) {
324                         if(!response.getClass().equals(expectedResponseClass)) {
325                             exceptionToPropagate = new IllegalArgumentException(
326                                     String.format("Unexpected response type %s", response.getClass()));
327                             break;
328                         }
329                     }
330                 }
331
332                 if(exceptionToPropagate != null) {
333                     LOG.debug("Tx {}: a {} cohort Future failed", transactionId, operationName, exceptionToPropagate);
334                     if(propagateException) {
335                         // We don't log the exception here to avoid redundant logging since we're
336                         // propagating to the caller in MD-SAL core who will log it.
337                         returnFuture.setException(exceptionToPropagate);
338                     } else {
339                         // Since the caller doesn't want us to propagate the exception we'll also
340                         // not log it normally. But it's usually not good to totally silence
341                         // exceptions so we'll log it to debug level.
342                         returnFuture.set(null);
343                     }
344
345                     callback.failure();
346                 } else {
347                     LOG.debug("Tx {}: {} succeeded", transactionId, operationName);
348
349                     returnFuture.set(null);
350
351                     callback.success();
352                 }
353             }
354         }, actorContext.getClientDispatcher());
355     }
356
357     @Override
358     List<Future<ActorSelection>> getCohortFutures() {
359         List<Future<ActorSelection>> cohortFutures = new ArrayList<>(cohorts.size());
360         for(CohortInfo info: cohorts) {
361             cohortFutures.add(info.getActorFuture());
362         }
363
364         return cohortFutures;
365     }
366
367     static class CohortInfo {
368         private final Future<ActorSelection> actorFuture;
369         private volatile ActorSelection resolvedActor;
370         private final Supplier<Short> actorVersionSupplier;
371
372         CohortInfo(Future<ActorSelection> actorFuture, Supplier<Short> actorVersionSupplier) {
373             this.actorFuture = actorFuture;
374             this.actorVersionSupplier = actorVersionSupplier;
375         }
376
377         Future<ActorSelection> getActorFuture() {
378             return actorFuture;
379         }
380
381         ActorSelection getResolvedActor() {
382             return resolvedActor;
383         }
384
385         void setResolvedActor(ActorSelection resolvedActor) {
386             this.resolvedActor = resolvedActor;
387         }
388
389         short getActorVersion() {
390             Preconditions.checkState(resolvedActor != null,
391                     "getActorVersion cannot be called until the actor is resolved");
392             return actorVersionSupplier.get();
393         }
394     }
395
396     private interface MessageSupplier {
397         Object newMessage(TransactionIdentifier transactionId, short version);
398         boolean isSerializedReplyType(Object reply);
399     }
400 }