Bump upstream versions
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / test / java / org / opendaylight / controller / cluster / datastore / ThreePhaseCommitCohortProxy.java
1 /*
2  * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.controller.cluster.datastore;
9
10 import static com.google.common.base.Preconditions.checkState;
11 import static java.util.Objects.requireNonNull;
12
13 import akka.actor.ActorSelection;
14 import akka.dispatch.OnComplete;
15 import com.google.common.util.concurrent.FutureCallback;
16 import com.google.common.util.concurrent.Futures;
17 import com.google.common.util.concurrent.ListenableFuture;
18 import com.google.common.util.concurrent.MoreExecutors;
19 import com.google.common.util.concurrent.SettableFuture;
20 import java.util.ArrayList;
21 import java.util.Iterator;
22 import java.util.List;
23 import java.util.concurrent.atomic.AtomicInteger;
24 import java.util.function.Supplier;
25 import org.eclipse.jdt.annotation.NonNull;
26 import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
27 import org.opendaylight.controller.cluster.datastore.messages.AbortTransaction;
28 import org.opendaylight.controller.cluster.datastore.messages.AbortTransactionReply;
29 import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransaction;
30 import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransactionReply;
31 import org.opendaylight.controller.cluster.datastore.messages.CommitTransaction;
32 import org.opendaylight.controller.cluster.datastore.messages.CommitTransactionReply;
33 import org.opendaylight.controller.cluster.datastore.utils.ActorUtils;
34 import org.opendaylight.mdsal.common.api.CommitInfo;
35 import org.opendaylight.mdsal.dom.spi.store.DOMStoreThreePhaseCommitCohort;
36 import org.opendaylight.yangtools.yang.common.Empty;
37 import org.slf4j.Logger;
38 import org.slf4j.LoggerFactory;
39 import scala.concurrent.Future;
40
41 /**
42  * ThreePhaseCommitCohortProxy represents a set of remote cohort proxies.
43  */
44 @Deprecated(since = "9.0.0", forRemoval = true)
45 final class ThreePhaseCommitCohortProxy implements DOMStoreThreePhaseCommitCohort {
46     private static final Logger LOG = LoggerFactory.getLogger(ThreePhaseCommitCohortProxy.class);
47     private static final @NonNull ListenableFuture<Empty> IMMEDIATE_EMPTY_SUCCESS =
48         Futures.immediateFuture(Empty.value());
49
50     private static final MessageSupplier COMMIT_MESSAGE_SUPPLIER = new MessageSupplier() {
51         @Override
52         public Object newMessage(final TransactionIdentifier transactionId, final short version) {
53             return new CommitTransaction(transactionId, version).toSerializable();
54         }
55
56         @Override
57         public boolean isSerializedReplyType(final Object reply) {
58             return CommitTransactionReply.isSerializedType(reply);
59         }
60     };
61
62     private static final MessageSupplier ABORT_MESSAGE_SUPPLIER = new MessageSupplier() {
63         @Override
64         public Object newMessage(final TransactionIdentifier transactionId, final short version) {
65             return new AbortTransaction(transactionId, version).toSerializable();
66         }
67
68         @Override
69         public boolean isSerializedReplyType(final Object reply) {
70             return AbortTransactionReply.isSerializedType(reply);
71         }
72     };
73
74     private static final OperationCallback NO_OP_CALLBACK = new OperationCallback() {
75         @Override
76         public void run() {
77         }
78
79         @Override
80         public void success() {
81         }
82
83         @Override
84         public void failure() {
85         }
86
87         @Override
88         public void pause() {
89         }
90
91         @Override
92         public void resume() {
93         }
94     };
95
96
97     private final ActorUtils actorUtils;
98     private final List<CohortInfo> cohorts;
99     private final SettableFuture<Empty> cohortsResolvedFuture = SettableFuture.create();
100     private final TransactionIdentifier transactionId;
101     private volatile OperationCallback commitOperationCallback;
102
103     ThreePhaseCommitCohortProxy(final ActorUtils actorUtils, final List<CohortInfo> cohorts,
104             final TransactionIdentifier transactionId) {
105         this.actorUtils = actorUtils;
106         this.cohorts = cohorts;
107         this.transactionId = requireNonNull(transactionId);
108
109         if (cohorts.isEmpty()) {
110             cohortsResolvedFuture.set(Empty.value());
111         }
112     }
113
114     private ListenableFuture<Empty> resolveCohorts() {
115         if (cohortsResolvedFuture.isDone()) {
116             return cohortsResolvedFuture;
117         }
118
119         final AtomicInteger completed = new AtomicInteger(cohorts.size());
120         final Object lock = new Object();
121         for (final CohortInfo info: cohorts) {
122             info.getActorFuture().onComplete(new OnComplete<ActorSelection>() {
123                 @Override
124                 public void onComplete(final Throwable failure, final ActorSelection actor)  {
125                     synchronized (lock) {
126                         boolean done = completed.decrementAndGet() == 0;
127                         if (failure != null) {
128                             LOG.debug("Tx {}: a cohort Future failed", transactionId, failure);
129                             cohortsResolvedFuture.setException(failure);
130                         } else if (!cohortsResolvedFuture.isDone()) {
131                             LOG.debug("Tx {}: cohort actor {} resolved", transactionId, actor);
132
133                             info.setResolvedActor(actor);
134                             if (done) {
135                                 LOG.debug("Tx {}: successfully resolved all cohort actors", transactionId);
136                                 cohortsResolvedFuture.set(Empty.value());
137                             }
138                         }
139                     }
140                 }
141             }, actorUtils.getClientDispatcher());
142         }
143
144         return cohortsResolvedFuture;
145     }
146
147     @Override
148     public ListenableFuture<Boolean> canCommit() {
149         LOG.debug("Tx {} canCommit", transactionId);
150
151         final SettableFuture<Boolean> returnFuture = SettableFuture.create();
152
153         // The first phase of canCommit is to gather the list of cohort actor paths that will
154         // participate in the commit. buildCohortPathsList combines the cohort path Futures into
155         // one Future which we wait on asynchronously here. The cohort actor paths are
156         // extracted from ReadyTransactionReply messages by the Futures that were obtained earlier
157         // and passed to us from upstream processing. If any one fails then  we'll fail canCommit.
158
159         Futures.addCallback(resolveCohorts(), new FutureCallback<>() {
160             @Override
161             public void onSuccess(final Empty result) {
162                 finishCanCommit(returnFuture);
163             }
164
165             @Override
166             public void onFailure(final Throwable failure) {
167                 returnFuture.setException(failure);
168             }
169         }, MoreExecutors.directExecutor());
170
171         return returnFuture;
172     }
173
174     private void finishCanCommit(final SettableFuture<Boolean> returnFuture) {
175         LOG.debug("Tx {} finishCanCommit", transactionId);
176
177         // For empty transactions return immediately
178         if (cohorts.size() == 0) {
179             LOG.debug("Tx {}: canCommit returning result true", transactionId);
180             returnFuture.set(Boolean.TRUE);
181             return;
182         }
183
184         commitOperationCallback = new TransactionRateLimitingCallback(actorUtils);
185         commitOperationCallback.run();
186
187         final Iterator<CohortInfo> iterator = cohorts.iterator();
188
189         final OnComplete<Object> onComplete = new OnComplete<>() {
190             @Override
191             public void onComplete(final Throwable failure, final Object response) {
192                 if (failure != null) {
193                     LOG.debug("Tx {}: a canCommit cohort Future failed", transactionId, failure);
194
195                     returnFuture.setException(failure);
196                     commitOperationCallback.failure();
197                     return;
198                 }
199
200                 // Only the first call to pause takes effect - subsequent calls before resume are no-ops. So
201                 // this means we'll only time the first transaction canCommit which should be fine.
202                 commitOperationCallback.pause();
203
204                 boolean result = true;
205                 if (CanCommitTransactionReply.isSerializedType(response)) {
206                     CanCommitTransactionReply reply = CanCommitTransactionReply.fromSerializable(response);
207
208                     LOG.debug("Tx {}: received {}", transactionId, response);
209
210                     if (!reply.getCanCommit()) {
211                         result = false;
212                     }
213                 } else {
214                     LOG.error("Unexpected response type {}", response.getClass());
215                     returnFuture.setException(new IllegalArgumentException(
216                             String.format("Unexpected response type %s", response.getClass())));
217                     return;
218                 }
219
220                 if (iterator.hasNext() && result) {
221                     sendCanCommitTransaction(iterator.next(), this);
222                 } else {
223                     LOG.debug("Tx {}: canCommit returning result: {}", transactionId, result);
224                     returnFuture.set(result);
225                 }
226
227             }
228         };
229
230         sendCanCommitTransaction(iterator.next(), onComplete);
231     }
232
233     private void sendCanCommitTransaction(final CohortInfo toCohortInfo, final OnComplete<Object> onComplete) {
234         CanCommitTransaction message = new CanCommitTransaction(transactionId, toCohortInfo.getActorVersion());
235
236         LOG.debug("Tx {}: sending {} to {}", transactionId, message, toCohortInfo.getResolvedActor());
237
238         Future<Object> future = actorUtils.executeOperationAsync(toCohortInfo.getResolvedActor(),
239                 message.toSerializable(), actorUtils.getTransactionCommitOperationTimeout());
240         future.onComplete(onComplete, actorUtils.getClientDispatcher());
241     }
242
243     private Future<Iterable<Object>> invokeCohorts(final MessageSupplier messageSupplier) {
244         List<Future<Object>> futureList = new ArrayList<>(cohorts.size());
245         for (CohortInfo cohort : cohorts) {
246             Object message = messageSupplier.newMessage(transactionId, cohort.getActorVersion());
247
248             LOG.debug("Tx {}: Sending {} to cohort {}", transactionId, message , cohort.getResolvedActor());
249
250             futureList.add(actorUtils.executeOperationAsync(cohort.getResolvedActor(), message,
251                     actorUtils.getTransactionCommitOperationTimeout()));
252         }
253
254         return akka.dispatch.Futures.sequence(futureList, actorUtils.getClientDispatcher());
255     }
256
257     @Override
258     public ListenableFuture<Empty> preCommit() {
259         // We don't need to do anything here - preCommit is done atomically with the commit phase by the shard.
260         return IMMEDIATE_EMPTY_SUCCESS;
261     }
262
263     @Override
264     public ListenableFuture<Empty> abort() {
265         // Note - we pass false for propagateException. In the front-end data broker, this method
266         // is called when one of the 3 phases fails with an exception. We'd rather have that
267         // original exception propagated to the client. If our abort fails and we propagate the
268         // exception then that exception will supersede and suppress the original exception. But
269         // it's the original exception that is the root cause and of more interest to the client.
270
271         return operation("abort", Empty.value(), ABORT_MESSAGE_SUPPLIER, AbortTransactionReply.class, false,
272             NO_OP_CALLBACK);
273     }
274
275     @Override
276     public ListenableFuture<? extends CommitInfo> commit() {
277         OperationCallback operationCallback = commitOperationCallback != null ? commitOperationCallback :
278             NO_OP_CALLBACK;
279
280         return operation("commit", CommitInfo.empty(), COMMIT_MESSAGE_SUPPLIER, CommitTransactionReply.class, true,
281             operationCallback);
282     }
283
284     @SuppressWarnings("checkstyle:IllegalCatch")
285     private static boolean successfulFuture(final ListenableFuture<?> future) {
286         if (!future.isDone()) {
287             return false;
288         }
289
290         try {
291             future.get();
292             return true;
293         } catch (Exception e) {
294             return false;
295         }
296     }
297
298     private <T> ListenableFuture<T> operation(final String operationName, final T futureValue,
299             final MessageSupplier messageSupplier, final Class<?> expectedResponseClass,
300             final boolean propagateException, final OperationCallback callback) {
301         LOG.debug("Tx {} {}", transactionId, operationName);
302
303         final SettableFuture<T> returnFuture = SettableFuture.create();
304
305         // The cohort actor list should already be built at this point by the canCommit phase but,
306         // if not for some reason, we'll try to build it here.
307
308         ListenableFuture<Empty> future = resolveCohorts();
309         if (successfulFuture(future)) {
310             finishOperation(operationName, messageSupplier, expectedResponseClass, propagateException, returnFuture,
311                 futureValue, callback);
312         } else {
313             Futures.addCallback(future, new FutureCallback<>() {
314                 @Override
315                 public void onSuccess(final Empty result) {
316                     finishOperation(operationName, messageSupplier, expectedResponseClass, propagateException,
317                         returnFuture, futureValue, callback);
318                 }
319
320                 @Override
321                 public void onFailure(final Throwable failure) {
322                     LOG.debug("Tx {}: a {} cohort path Future failed", transactionId, operationName, failure);
323
324                     if (propagateException) {
325                         returnFuture.setException(failure);
326                     } else {
327                         returnFuture.set(futureValue);
328                     }
329                 }
330             }, MoreExecutors.directExecutor());
331         }
332
333         return returnFuture;
334     }
335
336     private <T> void finishOperation(final String operationName, final MessageSupplier messageSupplier,
337                                      final Class<?> expectedResponseClass, final boolean propagateException,
338                                      final SettableFuture<T> returnFuture, final T futureValue,
339                                      final OperationCallback callback) {
340         LOG.debug("Tx {} finish {}", transactionId, operationName);
341
342         callback.resume();
343
344         Future<Iterable<Object>> combinedFuture = invokeCohorts(messageSupplier);
345
346         combinedFuture.onComplete(new OnComplete<Iterable<Object>>() {
347             @Override
348             public void onComplete(final Throwable failure, final Iterable<Object> responses) {
349                 Throwable exceptionToPropagate = failure;
350                 if (exceptionToPropagate == null) {
351                     for (Object response: responses) {
352                         if (!response.getClass().equals(expectedResponseClass)) {
353                             exceptionToPropagate = new IllegalArgumentException(
354                                     String.format("Unexpected response type %s", response.getClass()));
355                             break;
356                         }
357                     }
358                 }
359
360                 if (exceptionToPropagate != null) {
361                     LOG.debug("Tx {}: a {} cohort Future failed", transactionId, operationName, exceptionToPropagate);
362                     if (propagateException) {
363                         // We don't log the exception here to avoid redundant logging since we're
364                         // propagating to the caller in MD-SAL core who will log it.
365                         returnFuture.setException(exceptionToPropagate);
366                     } else {
367                         // Since the caller doesn't want us to propagate the exception we'll also
368                         // not log it normally. But it's usually not good to totally silence
369                         // exceptions so we'll log it to debug level.
370                         returnFuture.set(futureValue);
371                     }
372
373                     callback.failure();
374                 } else {
375                     LOG.debug("Tx {}: {} succeeded", transactionId, operationName);
376
377                     returnFuture.set(futureValue);
378
379                     callback.success();
380                 }
381             }
382         }, actorUtils.getClientDispatcher());
383     }
384
385     static class CohortInfo {
386         private final Future<ActorSelection> actorFuture;
387         private final Supplier<Short> actorVersionSupplier;
388
389         private volatile ActorSelection resolvedActor;
390
391         CohortInfo(final Future<ActorSelection> actorFuture, final Supplier<Short> actorVersionSupplier) {
392             this.actorFuture = actorFuture;
393             this.actorVersionSupplier = actorVersionSupplier;
394         }
395
396         Future<ActorSelection> getActorFuture() {
397             return actorFuture;
398         }
399
400         ActorSelection getResolvedActor() {
401             return resolvedActor;
402         }
403
404         void setResolvedActor(final ActorSelection resolvedActor) {
405             this.resolvedActor = resolvedActor;
406         }
407
408         short getActorVersion() {
409             checkState(resolvedActor != null, "getActorVersion cannot be called until the actor is resolved");
410             return actorVersionSupplier.get();
411         }
412     }
413
414     private interface MessageSupplier {
415         Object newMessage(TransactionIdentifier transactionId, short version);
416
417         boolean isSerializedReplyType(Object reply);
418     }
419 }