atomic-storage: remove type dependency at segment level I/O
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / main / java / org / opendaylight / controller / cluster / datastore / ThreePhaseCommitCohortProxy.java
1 /*
2  * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.controller.cluster.datastore;
9
10 import static com.google.common.base.Preconditions.checkState;
11 import static java.util.Objects.requireNonNull;
12
13 import akka.actor.ActorSelection;
14 import akka.dispatch.OnComplete;
15 import com.google.common.util.concurrent.FutureCallback;
16 import com.google.common.util.concurrent.Futures;
17 import com.google.common.util.concurrent.ListenableFuture;
18 import com.google.common.util.concurrent.MoreExecutors;
19 import com.google.common.util.concurrent.SettableFuture;
20 import java.util.ArrayList;
21 import java.util.Iterator;
22 import java.util.List;
23 import java.util.concurrent.atomic.AtomicInteger;
24 import java.util.function.Supplier;
25 import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
26 import org.opendaylight.controller.cluster.datastore.messages.AbortTransaction;
27 import org.opendaylight.controller.cluster.datastore.messages.AbortTransactionReply;
28 import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransaction;
29 import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransactionReply;
30 import org.opendaylight.controller.cluster.datastore.messages.CommitTransaction;
31 import org.opendaylight.controller.cluster.datastore.messages.CommitTransactionReply;
32 import org.opendaylight.controller.cluster.datastore.utils.ActorUtils;
33 import org.opendaylight.mdsal.common.api.CommitInfo;
34 import org.opendaylight.yangtools.yang.common.Empty;
35 import org.slf4j.Logger;
36 import org.slf4j.LoggerFactory;
37 import scala.concurrent.Future;
38
39 /**
40  * ThreePhaseCommitCohortProxy represents a set of remote cohort proxies.
41  */
42 public class ThreePhaseCommitCohortProxy extends AbstractThreePhaseCommitCohort<ActorSelection> {
43
44     private static final Logger LOG = LoggerFactory.getLogger(ThreePhaseCommitCohortProxy.class);
45
46     private static final MessageSupplier COMMIT_MESSAGE_SUPPLIER = new MessageSupplier() {
47         @Override
48         public Object newMessage(final TransactionIdentifier transactionId, final short version) {
49             return new CommitTransaction(transactionId, version).toSerializable();
50         }
51
52         @Override
53         public boolean isSerializedReplyType(final Object reply) {
54             return CommitTransactionReply.isSerializedType(reply);
55         }
56     };
57
58     private static final MessageSupplier ABORT_MESSAGE_SUPPLIER = new MessageSupplier() {
59         @Override
60         public Object newMessage(final TransactionIdentifier transactionId, final short version) {
61             return new AbortTransaction(transactionId, version).toSerializable();
62         }
63
64         @Override
65         public boolean isSerializedReplyType(final Object reply) {
66             return AbortTransactionReply.isSerializedType(reply);
67         }
68     };
69
70     private final ActorUtils actorUtils;
71     private final List<CohortInfo> cohorts;
72     private final SettableFuture<Empty> cohortsResolvedFuture = SettableFuture.create();
73     private final TransactionIdentifier transactionId;
74     private volatile OperationCallback commitOperationCallback;
75
76     public ThreePhaseCommitCohortProxy(final ActorUtils actorUtils, final List<CohortInfo> cohorts,
77             final TransactionIdentifier transactionId) {
78         this.actorUtils = actorUtils;
79         this.cohorts = cohorts;
80         this.transactionId = requireNonNull(transactionId);
81
82         if (cohorts.isEmpty()) {
83             cohortsResolvedFuture.set(Empty.value());
84         }
85     }
86
87     private ListenableFuture<Empty> resolveCohorts() {
88         if (cohortsResolvedFuture.isDone()) {
89             return cohortsResolvedFuture;
90         }
91
92         final AtomicInteger completed = new AtomicInteger(cohorts.size());
93         final Object lock = new Object();
94         for (final CohortInfo info: cohorts) {
95             info.getActorFuture().onComplete(new OnComplete<ActorSelection>() {
96                 @Override
97                 public void onComplete(final Throwable failure, final ActorSelection actor)  {
98                     synchronized (lock) {
99                         boolean done = completed.decrementAndGet() == 0;
100                         if (failure != null) {
101                             LOG.debug("Tx {}: a cohort Future failed", transactionId, failure);
102                             cohortsResolvedFuture.setException(failure);
103                         } else if (!cohortsResolvedFuture.isDone()) {
104                             LOG.debug("Tx {}: cohort actor {} resolved", transactionId, actor);
105
106                             info.setResolvedActor(actor);
107                             if (done) {
108                                 LOG.debug("Tx {}: successfully resolved all cohort actors", transactionId);
109                                 cohortsResolvedFuture.set(Empty.value());
110                             }
111                         }
112                     }
113                 }
114             }, actorUtils.getClientDispatcher());
115         }
116
117         return cohortsResolvedFuture;
118     }
119
120     @Override
121     public ListenableFuture<Boolean> canCommit() {
122         LOG.debug("Tx {} canCommit", transactionId);
123
124         final SettableFuture<Boolean> returnFuture = SettableFuture.create();
125
126         // The first phase of canCommit is to gather the list of cohort actor paths that will
127         // participate in the commit. buildCohortPathsList combines the cohort path Futures into
128         // one Future which we wait on asynchronously here. The cohort actor paths are
129         // extracted from ReadyTransactionReply messages by the Futures that were obtained earlier
130         // and passed to us from upstream processing. If any one fails then  we'll fail canCommit.
131
132         Futures.addCallback(resolveCohorts(), new FutureCallback<>() {
133             @Override
134             public void onSuccess(final Empty result) {
135                 finishCanCommit(returnFuture);
136             }
137
138             @Override
139             public void onFailure(final Throwable failure) {
140                 returnFuture.setException(failure);
141             }
142         }, MoreExecutors.directExecutor());
143
144         return returnFuture;
145     }
146
147     private void finishCanCommit(final SettableFuture<Boolean> returnFuture) {
148         LOG.debug("Tx {} finishCanCommit", transactionId);
149
150         // For empty transactions return immediately
151         if (cohorts.size() == 0) {
152             LOG.debug("Tx {}: canCommit returning result true", transactionId);
153             returnFuture.set(Boolean.TRUE);
154             return;
155         }
156
157         commitOperationCallback = new TransactionRateLimitingCallback(actorUtils);
158         commitOperationCallback.run();
159
160         final Iterator<CohortInfo> iterator = cohorts.iterator();
161
162         final OnComplete<Object> onComplete = new OnComplete<>() {
163             @Override
164             public void onComplete(final Throwable failure, final Object response) {
165                 if (failure != null) {
166                     LOG.debug("Tx {}: a canCommit cohort Future failed", transactionId, failure);
167
168                     returnFuture.setException(failure);
169                     commitOperationCallback.failure();
170                     return;
171                 }
172
173                 // Only the first call to pause takes effect - subsequent calls before resume are no-ops. So
174                 // this means we'll only time the first transaction canCommit which should be fine.
175                 commitOperationCallback.pause();
176
177                 boolean result = true;
178                 if (CanCommitTransactionReply.isSerializedType(response)) {
179                     CanCommitTransactionReply reply = CanCommitTransactionReply.fromSerializable(response);
180
181                     LOG.debug("Tx {}: received {}", transactionId, response);
182
183                     if (!reply.getCanCommit()) {
184                         result = false;
185                     }
186                 } else {
187                     LOG.error("Unexpected response type {}", response.getClass());
188                     returnFuture.setException(new IllegalArgumentException(
189                             String.format("Unexpected response type %s", response.getClass())));
190                     return;
191                 }
192
193                 if (iterator.hasNext() && result) {
194                     sendCanCommitTransaction(iterator.next(), this);
195                 } else {
196                     LOG.debug("Tx {}: canCommit returning result: {}", transactionId, result);
197                     returnFuture.set(result);
198                 }
199
200             }
201         };
202
203         sendCanCommitTransaction(iterator.next(), onComplete);
204     }
205
206     private void sendCanCommitTransaction(final CohortInfo toCohortInfo, final OnComplete<Object> onComplete) {
207         CanCommitTransaction message = new CanCommitTransaction(transactionId, toCohortInfo.getActorVersion());
208
209         LOG.debug("Tx {}: sending {} to {}", transactionId, message, toCohortInfo.getResolvedActor());
210
211         Future<Object> future = actorUtils.executeOperationAsync(toCohortInfo.getResolvedActor(),
212                 message.toSerializable(), actorUtils.getTransactionCommitOperationTimeout());
213         future.onComplete(onComplete, actorUtils.getClientDispatcher());
214     }
215
216     private Future<Iterable<Object>> invokeCohorts(final MessageSupplier messageSupplier) {
217         List<Future<Object>> futureList = new ArrayList<>(cohorts.size());
218         for (CohortInfo cohort : cohorts) {
219             Object message = messageSupplier.newMessage(transactionId, cohort.getActorVersion());
220
221             LOG.debug("Tx {}: Sending {} to cohort {}", transactionId, message , cohort.getResolvedActor());
222
223             futureList.add(actorUtils.executeOperationAsync(cohort.getResolvedActor(), message,
224                     actorUtils.getTransactionCommitOperationTimeout()));
225         }
226
227         return akka.dispatch.Futures.sequence(futureList, actorUtils.getClientDispatcher());
228     }
229
230     @Override
231     public ListenableFuture<Empty> preCommit() {
232         // We don't need to do anything here - preCommit is done atomically with the commit phase by the shard.
233         return IMMEDIATE_EMPTY_SUCCESS;
234     }
235
236     @Override
237     public ListenableFuture<Empty> abort() {
238         // Note - we pass false for propagateException. In the front-end data broker, this method
239         // is called when one of the 3 phases fails with an exception. We'd rather have that
240         // original exception propagated to the client. If our abort fails and we propagate the
241         // exception then that exception will supersede and suppress the original exception. But
242         // it's the original exception that is the root cause and of more interest to the client.
243
244         return operation("abort", Empty.value(), ABORT_MESSAGE_SUPPLIER, AbortTransactionReply.class, false,
245             OperationCallback.NO_OP_CALLBACK);
246     }
247
248     @Override
249     public ListenableFuture<? extends CommitInfo> commit() {
250         OperationCallback operationCallback = commitOperationCallback != null ? commitOperationCallback :
251             OperationCallback.NO_OP_CALLBACK;
252
253         return operation("commit", CommitInfo.empty(), COMMIT_MESSAGE_SUPPLIER, CommitTransactionReply.class, true,
254             operationCallback);
255     }
256
257     @SuppressWarnings("checkstyle:IllegalCatch")
258     private static boolean successfulFuture(final ListenableFuture<?> future) {
259         if (!future.isDone()) {
260             return false;
261         }
262
263         try {
264             future.get();
265             return true;
266         } catch (Exception e) {
267             return false;
268         }
269     }
270
271     private <T> ListenableFuture<T> operation(final String operationName, final T futureValue,
272             final MessageSupplier messageSupplier, final Class<?> expectedResponseClass,
273             final boolean propagateException, final OperationCallback callback) {
274         LOG.debug("Tx {} {}", transactionId, operationName);
275
276         final SettableFuture<T> returnFuture = SettableFuture.create();
277
278         // The cohort actor list should already be built at this point by the canCommit phase but,
279         // if not for some reason, we'll try to build it here.
280
281         ListenableFuture<Empty> future = resolveCohorts();
282         if (successfulFuture(future)) {
283             finishOperation(operationName, messageSupplier, expectedResponseClass, propagateException, returnFuture,
284                 futureValue, callback);
285         } else {
286             Futures.addCallback(future, new FutureCallback<>() {
287                 @Override
288                 public void onSuccess(final Empty result) {
289                     finishOperation(operationName, messageSupplier, expectedResponseClass, propagateException,
290                         returnFuture, futureValue, callback);
291                 }
292
293                 @Override
294                 public void onFailure(final Throwable failure) {
295                     LOG.debug("Tx {}: a {} cohort path Future failed", transactionId, operationName, failure);
296
297                     if (propagateException) {
298                         returnFuture.setException(failure);
299                     } else {
300                         returnFuture.set(futureValue);
301                     }
302                 }
303             }, MoreExecutors.directExecutor());
304         }
305
306         return returnFuture;
307     }
308
309     private <T> void finishOperation(final String operationName, final MessageSupplier messageSupplier,
310                                      final Class<?> expectedResponseClass, final boolean propagateException,
311                                      final SettableFuture<T> returnFuture, final T futureValue,
312                                      final OperationCallback callback) {
313         LOG.debug("Tx {} finish {}", transactionId, operationName);
314
315         callback.resume();
316
317         Future<Iterable<Object>> combinedFuture = invokeCohorts(messageSupplier);
318
319         combinedFuture.onComplete(new OnComplete<Iterable<Object>>() {
320             @Override
321             public void onComplete(final Throwable failure, final Iterable<Object> responses) {
322                 Throwable exceptionToPropagate = failure;
323                 if (exceptionToPropagate == null) {
324                     for (Object response: responses) {
325                         if (!response.getClass().equals(expectedResponseClass)) {
326                             exceptionToPropagate = new IllegalArgumentException(
327                                     String.format("Unexpected response type %s", response.getClass()));
328                             break;
329                         }
330                     }
331                 }
332
333                 if (exceptionToPropagate != null) {
334                     LOG.debug("Tx {}: a {} cohort Future failed", transactionId, operationName, exceptionToPropagate);
335                     if (propagateException) {
336                         // We don't log the exception here to avoid redundant logging since we're
337                         // propagating to the caller in MD-SAL core who will log it.
338                         returnFuture.setException(exceptionToPropagate);
339                     } else {
340                         // Since the caller doesn't want us to propagate the exception we'll also
341                         // not log it normally. But it's usually not good to totally silence
342                         // exceptions so we'll log it to debug level.
343                         returnFuture.set(futureValue);
344                     }
345
346                     callback.failure();
347                 } else {
348                     LOG.debug("Tx {}: {} succeeded", transactionId, operationName);
349
350                     returnFuture.set(futureValue);
351
352                     callback.success();
353                 }
354             }
355         }, actorUtils.getClientDispatcher());
356     }
357
358     @Override
359     List<Future<ActorSelection>> getCohortFutures() {
360         List<Future<ActorSelection>> cohortFutures = new ArrayList<>(cohorts.size());
361         for (CohortInfo info: cohorts) {
362             cohortFutures.add(info.getActorFuture());
363         }
364
365         return cohortFutures;
366     }
367
368     static class CohortInfo {
369         private final Future<ActorSelection> actorFuture;
370         private final Supplier<Short> actorVersionSupplier;
371
372         private volatile ActorSelection resolvedActor;
373
374         CohortInfo(final Future<ActorSelection> actorFuture, final Supplier<Short> actorVersionSupplier) {
375             this.actorFuture = actorFuture;
376             this.actorVersionSupplier = actorVersionSupplier;
377         }
378
379         Future<ActorSelection> getActorFuture() {
380             return actorFuture;
381         }
382
383         ActorSelection getResolvedActor() {
384             return resolvedActor;
385         }
386
387         void setResolvedActor(final ActorSelection resolvedActor) {
388             this.resolvedActor = resolvedActor;
389         }
390
391         short getActorVersion() {
392             checkState(resolvedActor != null, "getActorVersion cannot be called until the actor is resolved");
393             return actorVersionSupplier.get();
394         }
395     }
396
397     private interface MessageSupplier {
398         Object newMessage(TransactionIdentifier transactionId, short version);
399
400         boolean isSerializedReplyType(Object reply);
401     }
402 }