Convert DatastoreSnapshotRestore to OSGi DS
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / main / java / org / opendaylight / controller / cluster / datastore / ThreePhaseCommitCohortProxy.java
1 /*
2  * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.controller.cluster.datastore;
9
10 import static com.google.common.base.Preconditions.checkState;
11 import static java.util.Objects.requireNonNull;
12
13 import akka.actor.ActorSelection;
14 import akka.dispatch.OnComplete;
15 import com.google.common.util.concurrent.FutureCallback;
16 import com.google.common.util.concurrent.Futures;
17 import com.google.common.util.concurrent.ListenableFuture;
18 import com.google.common.util.concurrent.MoreExecutors;
19 import com.google.common.util.concurrent.SettableFuture;
20 import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
21 import java.util.ArrayList;
22 import java.util.Iterator;
23 import java.util.List;
24 import java.util.concurrent.atomic.AtomicInteger;
25 import java.util.function.Supplier;
26 import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
27 import org.opendaylight.controller.cluster.datastore.messages.AbortTransaction;
28 import org.opendaylight.controller.cluster.datastore.messages.AbortTransactionReply;
29 import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransaction;
30 import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransactionReply;
31 import org.opendaylight.controller.cluster.datastore.messages.CommitTransaction;
32 import org.opendaylight.controller.cluster.datastore.messages.CommitTransactionReply;
33 import org.opendaylight.controller.cluster.datastore.utils.ActorUtils;
34 import org.slf4j.Logger;
35 import org.slf4j.LoggerFactory;
36 import scala.concurrent.Future;
37
38 /**
39  * ThreePhaseCommitCohortProxy represents a set of remote cohort proxies.
40  */
41 public class ThreePhaseCommitCohortProxy extends AbstractThreePhaseCommitCohort<ActorSelection> {
42
43     private static final Logger LOG = LoggerFactory.getLogger(ThreePhaseCommitCohortProxy.class);
44
45     private static final MessageSupplier COMMIT_MESSAGE_SUPPLIER = new MessageSupplier() {
46         @Override
47         public Object newMessage(final TransactionIdentifier transactionId, final short version) {
48             return new CommitTransaction(transactionId, version).toSerializable();
49         }
50
51         @Override
52         public boolean isSerializedReplyType(final Object reply) {
53             return CommitTransactionReply.isSerializedType(reply);
54         }
55     };
56
57     private static final MessageSupplier ABORT_MESSAGE_SUPPLIER = new MessageSupplier() {
58         @Override
59         public Object newMessage(final TransactionIdentifier transactionId, final short version) {
60             return new AbortTransaction(transactionId, version).toSerializable();
61         }
62
63         @Override
64         public boolean isSerializedReplyType(final Object reply) {
65             return AbortTransactionReply.isSerializedType(reply);
66         }
67     };
68
69     private final ActorUtils actorUtils;
70     private final List<CohortInfo> cohorts;
71     private final SettableFuture<Void> cohortsResolvedFuture = SettableFuture.create();
72     private final TransactionIdentifier transactionId;
73     private volatile OperationCallback commitOperationCallback;
74
75     public ThreePhaseCommitCohortProxy(final ActorUtils actorUtils, final List<CohortInfo> cohorts,
76             final TransactionIdentifier transactionId) {
77         this.actorUtils = actorUtils;
78         this.cohorts = cohorts;
79         this.transactionId = requireNonNull(transactionId);
80
81         if (cohorts.isEmpty()) {
82             cohortsResolvedFuture.set(null);
83         }
84     }
85
86     private ListenableFuture<Void> resolveCohorts() {
87         if (cohortsResolvedFuture.isDone()) {
88             return cohortsResolvedFuture;
89         }
90
91         final AtomicInteger completed = new AtomicInteger(cohorts.size());
92         final Object lock = new Object();
93         for (final CohortInfo info: cohorts) {
94             info.getActorFuture().onComplete(new OnComplete<ActorSelection>() {
95                 @Override
96                 public void onComplete(final Throwable failure, final ActorSelection actor)  {
97                     synchronized (lock) {
98                         boolean done = completed.decrementAndGet() == 0;
99                         if (failure != null) {
100                             LOG.debug("Tx {}: a cohort Future failed", transactionId, failure);
101                             cohortsResolvedFuture.setException(failure);
102                         } else if (!cohortsResolvedFuture.isDone()) {
103                             LOG.debug("Tx {}: cohort actor {} resolved", transactionId, actor);
104
105                             info.setResolvedActor(actor);
106                             if (done) {
107                                 LOG.debug("Tx {}: successfully resolved all cohort actors", transactionId);
108                                 cohortsResolvedFuture.set(null);
109                             }
110                         }
111                     }
112                 }
113             }, actorUtils.getClientDispatcher());
114         }
115
116         return cohortsResolvedFuture;
117     }
118
119     @Override
120     public ListenableFuture<Boolean> canCommit() {
121         LOG.debug("Tx {} canCommit", transactionId);
122
123         final SettableFuture<Boolean> returnFuture = SettableFuture.create();
124
125         // The first phase of canCommit is to gather the list of cohort actor paths that will
126         // participate in the commit. buildCohortPathsList combines the cohort path Futures into
127         // one Future which we wait on asynchronously here. The cohort actor paths are
128         // extracted from ReadyTransactionReply messages by the Futures that were obtained earlier
129         // and passed to us from upstream processing. If any one fails then  we'll fail canCommit.
130
131         Futures.addCallback(resolveCohorts(), new FutureCallback<Void>() {
132             @Override
133             public void onSuccess(final Void notUsed) {
134                 finishCanCommit(returnFuture);
135             }
136
137             @Override
138             public void onFailure(final Throwable failure) {
139                 returnFuture.setException(failure);
140             }
141         }, MoreExecutors.directExecutor());
142
143         return returnFuture;
144     }
145
146     @SuppressFBWarnings(value = "UPM_UNCALLED_PRIVATE_METHOD",
147             justification = "https://github.com/spotbugs/spotbugs/issues/811")
148     private void finishCanCommit(final SettableFuture<Boolean> returnFuture) {
149         LOG.debug("Tx {} finishCanCommit", transactionId);
150
151         // For empty transactions return immediately
152         if (cohorts.size() == 0) {
153             LOG.debug("Tx {}: canCommit returning result true", transactionId);
154             returnFuture.set(Boolean.TRUE);
155             return;
156         }
157
158         commitOperationCallback = new TransactionRateLimitingCallback(actorUtils);
159         commitOperationCallback.run();
160
161         final Iterator<CohortInfo> iterator = cohorts.iterator();
162
163         final OnComplete<Object> onComplete = new OnComplete<Object>() {
164             @Override
165             public void onComplete(final Throwable failure, final Object response) {
166                 if (failure != null) {
167                     LOG.debug("Tx {}: a canCommit cohort Future failed", transactionId, failure);
168
169                     returnFuture.setException(failure);
170                     commitOperationCallback.failure();
171                     return;
172                 }
173
174                 // Only the first call to pause takes effect - subsequent calls before resume are no-ops. So
175                 // this means we'll only time the first transaction canCommit which should be fine.
176                 commitOperationCallback.pause();
177
178                 boolean result = true;
179                 if (CanCommitTransactionReply.isSerializedType(response)) {
180                     CanCommitTransactionReply reply = CanCommitTransactionReply.fromSerializable(response);
181
182                     LOG.debug("Tx {}: received {}", transactionId, response);
183
184                     if (!reply.getCanCommit()) {
185                         result = false;
186                     }
187                 } else {
188                     LOG.error("Unexpected response type {}", response.getClass());
189                     returnFuture.setException(new IllegalArgumentException(
190                             String.format("Unexpected response type %s", response.getClass())));
191                     return;
192                 }
193
194                 if (iterator.hasNext() && result) {
195                     sendCanCommitTransaction(iterator.next(), this);
196                 } else {
197                     LOG.debug("Tx {}: canCommit returning result: {}", transactionId, result);
198                     returnFuture.set(Boolean.valueOf(result));
199                 }
200
201             }
202         };
203
204         sendCanCommitTransaction(iterator.next(), onComplete);
205     }
206
207     private void sendCanCommitTransaction(final CohortInfo toCohortInfo, final OnComplete<Object> onComplete) {
208         CanCommitTransaction message = new CanCommitTransaction(transactionId, toCohortInfo.getActorVersion());
209
210         LOG.debug("Tx {}: sending {} to {}", transactionId, message, toCohortInfo.getResolvedActor());
211
212         Future<Object> future = actorUtils.executeOperationAsync(toCohortInfo.getResolvedActor(),
213                 message.toSerializable(), actorUtils.getTransactionCommitOperationTimeout());
214         future.onComplete(onComplete, actorUtils.getClientDispatcher());
215     }
216
217     private Future<Iterable<Object>> invokeCohorts(final MessageSupplier messageSupplier) {
218         List<Future<Object>> futureList = new ArrayList<>(cohorts.size());
219         for (CohortInfo cohort : cohorts) {
220             Object message = messageSupplier.newMessage(transactionId, cohort.getActorVersion());
221
222             LOG.debug("Tx {}: Sending {} to cohort {}", transactionId, message , cohort.getResolvedActor());
223
224             futureList.add(actorUtils.executeOperationAsync(cohort.getResolvedActor(), message,
225                     actorUtils.getTransactionCommitOperationTimeout()));
226         }
227
228         return akka.dispatch.Futures.sequence(futureList, actorUtils.getClientDispatcher());
229     }
230
231     @Override
232     public ListenableFuture<Void> preCommit() {
233         // We don't need to do anything here - preCommit is done atomically with the commit phase
234         // by the shard.
235         return IMMEDIATE_VOID_SUCCESS;
236     }
237
238     @Override
239     public ListenableFuture<Void> abort() {
240         // Note - we pass false for propagateException. In the front-end data broker, this method
241         // is called when one of the 3 phases fails with an exception. We'd rather have that
242         // original exception propagated to the client. If our abort fails and we propagate the
243         // exception then that exception will supersede and suppress the original exception. But
244         // it's the original exception that is the root cause and of more interest to the client.
245
246         return voidOperation("abort", ABORT_MESSAGE_SUPPLIER,
247                 AbortTransactionReply.class, false, OperationCallback.NO_OP_CALLBACK);
248     }
249
250     @Override
251     public ListenableFuture<Void> commit() {
252         OperationCallback operationCallback = commitOperationCallback != null ? commitOperationCallback :
253             OperationCallback.NO_OP_CALLBACK;
254
255         return voidOperation("commit", COMMIT_MESSAGE_SUPPLIER,
256                 CommitTransactionReply.class, true, operationCallback);
257     }
258
259     @SuppressWarnings("checkstyle:IllegalCatch")
260     private static boolean successfulFuture(final ListenableFuture<Void> future) {
261         if (!future.isDone()) {
262             return false;
263         }
264
265         try {
266             future.get();
267             return true;
268         } catch (Exception e) {
269             return false;
270         }
271     }
272
273     private ListenableFuture<Void> voidOperation(final String operationName,
274             final MessageSupplier messageSupplier, final Class<?> expectedResponseClass,
275             final boolean propagateException, final OperationCallback callback) {
276         LOG.debug("Tx {} {}", transactionId, operationName);
277
278         final SettableFuture<Void> returnFuture = SettableFuture.create();
279
280         // The cohort actor list should already be built at this point by the canCommit phase but,
281         // if not for some reason, we'll try to build it here.
282
283         ListenableFuture<Void> future = resolveCohorts();
284         if (successfulFuture(future)) {
285             finishVoidOperation(operationName, messageSupplier, expectedResponseClass, propagateException,
286                     returnFuture, callback);
287         } else {
288             Futures.addCallback(future, new FutureCallback<Void>() {
289                 @Override
290                 public void onSuccess(final Void notUsed) {
291                     finishVoidOperation(operationName, messageSupplier, expectedResponseClass,
292                             propagateException, returnFuture, callback);
293                 }
294
295                 @Override
296                 public void onFailure(final Throwable failure) {
297                     LOG.debug("Tx {}: a {} cohort path Future failed", transactionId, operationName, failure);
298
299                     if (propagateException) {
300                         returnFuture.setException(failure);
301                     } else {
302                         returnFuture.set(null);
303                     }
304                 }
305             }, MoreExecutors.directExecutor());
306         }
307
308         return returnFuture;
309     }
310
311     private void finishVoidOperation(final String operationName, final MessageSupplier messageSupplier,
312                                      final Class<?> expectedResponseClass, final boolean propagateException,
313                                      final SettableFuture<Void> returnFuture, final OperationCallback callback) {
314         LOG.debug("Tx {} finish {}", transactionId, operationName);
315
316         callback.resume();
317
318         Future<Iterable<Object>> combinedFuture = invokeCohorts(messageSupplier);
319
320         combinedFuture.onComplete(new OnComplete<Iterable<Object>>() {
321             @Override
322             public void onComplete(final Throwable failure, final Iterable<Object> responses) {
323                 Throwable exceptionToPropagate = failure;
324                 if (exceptionToPropagate == null) {
325                     for (Object response: responses) {
326                         if (!response.getClass().equals(expectedResponseClass)) {
327                             exceptionToPropagate = new IllegalArgumentException(
328                                     String.format("Unexpected response type %s", response.getClass()));
329                             break;
330                         }
331                     }
332                 }
333
334                 if (exceptionToPropagate != null) {
335                     LOG.debug("Tx {}: a {} cohort Future failed", transactionId, operationName, exceptionToPropagate);
336                     if (propagateException) {
337                         // We don't log the exception here to avoid redundant logging since we're
338                         // propagating to the caller in MD-SAL core who will log it.
339                         returnFuture.setException(exceptionToPropagate);
340                     } else {
341                         // Since the caller doesn't want us to propagate the exception we'll also
342                         // not log it normally. But it's usually not good to totally silence
343                         // exceptions so we'll log it to debug level.
344                         returnFuture.set(null);
345                     }
346
347                     callback.failure();
348                 } else {
349                     LOG.debug("Tx {}: {} succeeded", transactionId, operationName);
350
351                     returnFuture.set(null);
352
353                     callback.success();
354                 }
355             }
356         }, actorUtils.getClientDispatcher());
357     }
358
359     @Override
360     List<Future<ActorSelection>> getCohortFutures() {
361         List<Future<ActorSelection>> cohortFutures = new ArrayList<>(cohorts.size());
362         for (CohortInfo info: cohorts) {
363             cohortFutures.add(info.getActorFuture());
364         }
365
366         return cohortFutures;
367     }
368
369     static class CohortInfo {
370         private final Future<ActorSelection> actorFuture;
371         private final Supplier<Short> actorVersionSupplier;
372
373         private volatile ActorSelection resolvedActor;
374
375         CohortInfo(final Future<ActorSelection> actorFuture, final Supplier<Short> actorVersionSupplier) {
376             this.actorFuture = actorFuture;
377             this.actorVersionSupplier = actorVersionSupplier;
378         }
379
380         Future<ActorSelection> getActorFuture() {
381             return actorFuture;
382         }
383
384         ActorSelection getResolvedActor() {
385             return resolvedActor;
386         }
387
388         void setResolvedActor(final ActorSelection resolvedActor) {
389             this.resolvedActor = resolvedActor;
390         }
391
392         short getActorVersion() {
393             checkState(resolvedActor != null, "getActorVersion cannot be called until the actor is resolved");
394             return actorVersionSupplier.get();
395         }
396     }
397
398     private interface MessageSupplier {
399         Object newMessage(TransactionIdentifier transactionId, short version);
400
401         boolean isSerializedReplyType(Object reply);
402     }
403 }