BUG-5280: do not cache modify responses
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / main / java / org / opendaylight / controller / cluster / datastore / ThreePhaseCommitCohortProxy.java
1 /*
2  * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.controller.cluster.datastore;
10
11 import akka.actor.ActorSelection;
12 import akka.dispatch.OnComplete;
13 import com.google.common.base.Preconditions;
14 import com.google.common.base.Supplier;
15 import com.google.common.collect.Lists;
16 import com.google.common.util.concurrent.FutureCallback;
17 import com.google.common.util.concurrent.Futures;
18 import com.google.common.util.concurrent.ListenableFuture;
19 import com.google.common.util.concurrent.SettableFuture;
20 import java.util.ArrayList;
21 import java.util.Iterator;
22 import java.util.List;
23 import java.util.concurrent.atomic.AtomicInteger;
24 import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
25 import org.opendaylight.controller.cluster.datastore.messages.AbortTransaction;
26 import org.opendaylight.controller.cluster.datastore.messages.AbortTransactionReply;
27 import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransaction;
28 import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransactionReply;
29 import org.opendaylight.controller.cluster.datastore.messages.CommitTransaction;
30 import org.opendaylight.controller.cluster.datastore.messages.CommitTransactionReply;
31 import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
32 import org.slf4j.Logger;
33 import org.slf4j.LoggerFactory;
34 import scala.concurrent.Future;
35
36 /**
37  * ThreePhaseCommitCohortProxy represents a set of remote cohort proxies.
38  */
39 public class ThreePhaseCommitCohortProxy extends AbstractThreePhaseCommitCohort<ActorSelection> {
40
41     private static final Logger LOG = LoggerFactory.getLogger(ThreePhaseCommitCohortProxy.class);
42
43     private static final MessageSupplier COMMIT_MESSAGE_SUPPLIER = new MessageSupplier() {
44         @Override
45         public Object newMessage(TransactionIdentifier transactionId, short version) {
46             return new CommitTransaction(transactionId, version).toSerializable();
47         }
48
49         @Override
50         public boolean isSerializedReplyType(Object reply) {
51             return CommitTransactionReply.isSerializedType(reply);
52         }
53     };
54
55     private static final MessageSupplier ABORT_MESSAGE_SUPPLIER = new MessageSupplier() {
56         @Override
57         public Object newMessage(TransactionIdentifier transactionId, short version) {
58             return new AbortTransaction(transactionId, version).toSerializable();
59         }
60
61         @Override
62         public boolean isSerializedReplyType(Object reply) {
63             return AbortTransactionReply.isSerializedType(reply);
64         }
65     };
66
67     private final ActorContext actorContext;
68     private final List<CohortInfo> cohorts;
69     private final SettableFuture<Void> cohortsResolvedFuture = SettableFuture.create();
70     private final TransactionIdentifier transactionId;
71     private volatile OperationCallback commitOperationCallback;
72
73     public ThreePhaseCommitCohortProxy(ActorContext actorContext, List<CohortInfo> cohorts,
74             TransactionIdentifier transactionId) {
75         this.actorContext = actorContext;
76         this.cohorts = cohorts;
77         this.transactionId = Preconditions.checkNotNull(transactionId);
78
79         if (cohorts.isEmpty()) {
80             cohortsResolvedFuture.set(null);
81         }
82     }
83
84     private ListenableFuture<Void> resolveCohorts() {
85         if (cohortsResolvedFuture.isDone()) {
86             return cohortsResolvedFuture;
87         }
88
89         final AtomicInteger completed = new AtomicInteger(cohorts.size());
90         final Object lock = new Object();
91         for (final CohortInfo info: cohorts) {
92             info.getActorFuture().onComplete(new OnComplete<ActorSelection>() {
93                 @Override
94                 public void onComplete(Throwable failure, ActorSelection actor)  {
95                     synchronized (lock) {
96                         boolean done = completed.decrementAndGet() == 0;
97                         if (failure != null) {
98                             LOG.debug("Tx {}: a cohort Future failed", transactionId, failure);
99                             cohortsResolvedFuture.setException(failure);
100                         } else if (!cohortsResolvedFuture.isDone()) {
101                             LOG.debug("Tx {}: cohort actor {} resolved", transactionId, actor);
102
103                             info.setResolvedActor(actor);
104                             if (done) {
105                                 LOG.debug("Tx {}: successfully resolved all cohort actors", transactionId);
106                                 cohortsResolvedFuture.set(null);
107                             }
108                         }
109                     }
110                 }
111             }, actorContext.getClientDispatcher());
112         }
113
114         return cohortsResolvedFuture;
115     }
116
117     @Override
118     public ListenableFuture<Boolean> canCommit() {
119         LOG.debug("Tx {} canCommit", transactionId);
120
121         final SettableFuture<Boolean> returnFuture = SettableFuture.create();
122
123         // The first phase of canCommit is to gather the list of cohort actor paths that will
124         // participate in the commit. buildCohortPathsList combines the cohort path Futures into
125         // one Future which we wait on asynchronously here. The cohort actor paths are
126         // extracted from ReadyTransactionReply messages by the Futures that were obtained earlier
127         // and passed to us from upstream processing. If any one fails then  we'll fail canCommit.
128
129         Futures.addCallback(resolveCohorts(), new FutureCallback<Void>() {
130             @Override
131             public void onSuccess(Void notUsed) {
132                 finishCanCommit(returnFuture);
133             }
134
135             @Override
136             public void onFailure(Throwable failure) {
137                 returnFuture.setException(failure);
138             }
139         });
140
141         return returnFuture;
142     }
143
144     private void finishCanCommit(final SettableFuture<Boolean> returnFuture) {
145         LOG.debug("Tx {} finishCanCommit", transactionId);
146
147         // For empty transactions return immediately
148         if (cohorts.size() == 0) {
149             LOG.debug("Tx {}: canCommit returning result true", transactionId);
150             returnFuture.set(Boolean.TRUE);
151             return;
152         }
153
154         commitOperationCallback = new TransactionRateLimitingCallback(actorContext);
155         commitOperationCallback.run();
156
157         final Iterator<CohortInfo> iterator = cohorts.iterator();
158
159         final OnComplete<Object> onComplete = new OnComplete<Object>() {
160             @Override
161             public void onComplete(Throwable failure, Object response) {
162                 if (failure != null) {
163                     LOG.debug("Tx {}: a canCommit cohort Future failed", transactionId, failure);
164
165                     returnFuture.setException(failure);
166                     commitOperationCallback.failure();
167                     return;
168                 }
169
170                 // Only the first call to pause takes effect - subsequent calls before resume are no-ops. So
171                 // this means we'll only time the first transaction canCommit which should be fine.
172                 commitOperationCallback.pause();
173
174                 boolean result = true;
175                 if (CanCommitTransactionReply.isSerializedType(response)) {
176                     CanCommitTransactionReply reply = CanCommitTransactionReply.fromSerializable(response);
177
178                     LOG.debug("Tx {}: received {}", transactionId, response);
179
180                     if (!reply.getCanCommit()) {
181                         result = false;
182                     }
183                 } else {
184                     LOG.error("Unexpected response type {}", response.getClass());
185                     returnFuture.setException(new IllegalArgumentException(
186                             String.format("Unexpected response type %s", response.getClass())));
187                     return;
188                 }
189
190                 if (iterator.hasNext() && result) {
191                     sendCanCommitTransaction(iterator.next(), this);
192                 } else {
193                     LOG.debug("Tx {}: canCommit returning result: {}", transactionId, result);
194                     returnFuture.set(Boolean.valueOf(result));
195                 }
196
197             }
198         };
199
200         sendCanCommitTransaction(iterator.next(), onComplete);
201     }
202
203     private void sendCanCommitTransaction(CohortInfo toCohortInfo, OnComplete<Object> onComplete) {
204         CanCommitTransaction message = new CanCommitTransaction(transactionId, toCohortInfo.getActorVersion());
205
206         LOG.debug("Tx {}: sending {} to {}", transactionId, message, toCohortInfo.getResolvedActor());
207
208         Future<Object> future = actorContext.executeOperationAsync(toCohortInfo.getResolvedActor(),
209                 message.toSerializable(), actorContext.getTransactionCommitOperationTimeout());
210         future.onComplete(onComplete, actorContext.getClientDispatcher());
211     }
212
213     private Future<Iterable<Object>> invokeCohorts(MessageSupplier messageSupplier) {
214         List<Future<Object>> futureList = Lists.newArrayListWithCapacity(cohorts.size());
215         for (CohortInfo cohort : cohorts) {
216             Object message = messageSupplier.newMessage(transactionId, cohort.getActorVersion());
217
218             LOG.debug("Tx {}: Sending {} to cohort {}", transactionId, message , cohort);
219
220             futureList.add(actorContext.executeOperationAsync(cohort.getResolvedActor(), message,
221                     actorContext.getTransactionCommitOperationTimeout()));
222         }
223
224         return akka.dispatch.Futures.sequence(futureList, actorContext.getClientDispatcher());
225     }
226
227     @Override
228     public ListenableFuture<Void> preCommit() {
229         // We don't need to do anything here - preCommit is done atomically with the commit phase
230         // by the shard.
231         return IMMEDIATE_VOID_SUCCESS;
232     }
233
234     @Override
235     public ListenableFuture<Void> abort() {
236         // Note - we pass false for propagateException. In the front-end data broker, this method
237         // is called when one of the 3 phases fails with an exception. We'd rather have that
238         // original exception propagated to the client. If our abort fails and we propagate the
239         // exception then that exception will supersede and suppress the original exception. But
240         // it's the original exception that is the root cause and of more interest to the client.
241
242         return voidOperation("abort", ABORT_MESSAGE_SUPPLIER,
243                 AbortTransactionReply.class, false, OperationCallback.NO_OP_CALLBACK);
244     }
245
246     @Override
247     public ListenableFuture<Void> commit() {
248         OperationCallback operationCallback = commitOperationCallback != null ? commitOperationCallback :
249             OperationCallback.NO_OP_CALLBACK;
250
251         return voidOperation("commit", COMMIT_MESSAGE_SUPPLIER,
252                 CommitTransactionReply.class, true, operationCallback);
253     }
254
255     @SuppressWarnings("checkstyle:IllegalCatch")
256     private static boolean successfulFuture(ListenableFuture<Void> future) {
257         if (!future.isDone()) {
258             return false;
259         }
260
261         try {
262             future.get();
263             return true;
264         } catch (Exception e) {
265             return false;
266         }
267     }
268
269     private ListenableFuture<Void> voidOperation(final String operationName,
270             final MessageSupplier messageSupplier, final Class<?> expectedResponseClass,
271             final boolean propagateException, final OperationCallback callback) {
272         LOG.debug("Tx {} {}", transactionId, operationName);
273
274         final SettableFuture<Void> returnFuture = SettableFuture.create();
275
276         // The cohort actor list should already be built at this point by the canCommit phase but,
277         // if not for some reason, we'll try to build it here.
278
279         ListenableFuture<Void> future = resolveCohorts();
280         if (successfulFuture(future)) {
281             finishVoidOperation(operationName, messageSupplier, expectedResponseClass, propagateException,
282                     returnFuture, callback);
283         } else {
284             Futures.addCallback(future, new FutureCallback<Void>() {
285                 @Override
286                 public void onSuccess(Void notUsed) {
287                     finishVoidOperation(operationName, messageSupplier, expectedResponseClass,
288                             propagateException, returnFuture, callback);
289                 }
290
291                 @Override
292                 public void onFailure(Throwable failure) {
293                     LOG.debug("Tx {}: a {} cohort path Future failed: {}", transactionId, operationName, failure);
294
295                     if (propagateException) {
296                         returnFuture.setException(failure);
297                     } else {
298                         returnFuture.set(null);
299                     }
300                 }
301             });
302         }
303
304         return returnFuture;
305     }
306
307     private void finishVoidOperation(final String operationName, MessageSupplier messageSupplier,
308                                      final Class<?> expectedResponseClass, final boolean propagateException,
309                                      final SettableFuture<Void> returnFuture, final OperationCallback callback) {
310         LOG.debug("Tx {} finish {}", transactionId, operationName);
311
312         callback.resume();
313
314         Future<Iterable<Object>> combinedFuture = invokeCohorts(messageSupplier);
315
316         combinedFuture.onComplete(new OnComplete<Iterable<Object>>() {
317             @Override
318             public void onComplete(Throwable failure, Iterable<Object> responses) throws Throwable {
319                 Throwable exceptionToPropagate = failure;
320                 if (exceptionToPropagate == null) {
321                     for (Object response: responses) {
322                         if (!response.getClass().equals(expectedResponseClass)) {
323                             exceptionToPropagate = new IllegalArgumentException(
324                                     String.format("Unexpected response type %s", response.getClass()));
325                             break;
326                         }
327                     }
328                 }
329
330                 if (exceptionToPropagate != null) {
331                     LOG.debug("Tx {}: a {} cohort Future failed", transactionId, operationName, exceptionToPropagate);
332                     if (propagateException) {
333                         // We don't log the exception here to avoid redundant logging since we're
334                         // propagating to the caller in MD-SAL core who will log it.
335                         returnFuture.setException(exceptionToPropagate);
336                     } else {
337                         // Since the caller doesn't want us to propagate the exception we'll also
338                         // not log it normally. But it's usually not good to totally silence
339                         // exceptions so we'll log it to debug level.
340                         returnFuture.set(null);
341                     }
342
343                     callback.failure();
344                 } else {
345                     LOG.debug("Tx {}: {} succeeded", transactionId, operationName);
346
347                     returnFuture.set(null);
348
349                     callback.success();
350                 }
351             }
352         }, actorContext.getClientDispatcher());
353     }
354
355     @Override
356     List<Future<ActorSelection>> getCohortFutures() {
357         List<Future<ActorSelection>> cohortFutures = new ArrayList<>(cohorts.size());
358         for (CohortInfo info: cohorts) {
359             cohortFutures.add(info.getActorFuture());
360         }
361
362         return cohortFutures;
363     }
364
365     static class CohortInfo {
366         private final Future<ActorSelection> actorFuture;
367         private volatile ActorSelection resolvedActor;
368         private final Supplier<Short> actorVersionSupplier;
369
370         CohortInfo(Future<ActorSelection> actorFuture, Supplier<Short> actorVersionSupplier) {
371             this.actorFuture = actorFuture;
372             this.actorVersionSupplier = actorVersionSupplier;
373         }
374
375         Future<ActorSelection> getActorFuture() {
376             return actorFuture;
377         }
378
379         ActorSelection getResolvedActor() {
380             return resolvedActor;
381         }
382
383         void setResolvedActor(ActorSelection resolvedActor) {
384             this.resolvedActor = resolvedActor;
385         }
386
387         short getActorVersion() {
388             Preconditions.checkState(resolvedActor != null,
389                     "getActorVersion cannot be called until the actor is resolved");
390             return actorVersionSupplier.get();
391         }
392     }
393
394     private interface MessageSupplier {
395         Object newMessage(TransactionIdentifier transactionId, short version);
396
397         boolean isSerializedReplyType(Object reply);
398     }
399 }