ea4ea34f0310f227c290960212026e6d989f5e07
[controller.git] /
1 /*
2  * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.controller.cluster.datastore;
9
10 import static com.google.common.base.Preconditions.checkState;
11 import static java.util.Objects.requireNonNull;
12
13 import akka.actor.ActorSelection;
14 import akka.dispatch.OnComplete;
15 import com.google.common.util.concurrent.FutureCallback;
16 import com.google.common.util.concurrent.Futures;
17 import com.google.common.util.concurrent.ListenableFuture;
18 import com.google.common.util.concurrent.MoreExecutors;
19 import com.google.common.util.concurrent.SettableFuture;
20 import java.util.ArrayList;
21 import java.util.Iterator;
22 import java.util.List;
23 import java.util.concurrent.atomic.AtomicInteger;
24 import java.util.function.Supplier;
25 import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
26 import org.opendaylight.controller.cluster.datastore.messages.AbortTransaction;
27 import org.opendaylight.controller.cluster.datastore.messages.AbortTransactionReply;
28 import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransaction;
29 import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransactionReply;
30 import org.opendaylight.controller.cluster.datastore.messages.CommitTransaction;
31 import org.opendaylight.controller.cluster.datastore.messages.CommitTransactionReply;
32 import org.opendaylight.controller.cluster.datastore.utils.ActorUtils;
33 import org.opendaylight.mdsal.common.api.CommitInfo;
34 import org.opendaylight.mdsal.dom.spi.store.DOMStoreThreePhaseCommitCohort;
35 import org.opendaylight.yangtools.yang.common.Empty;
36 import org.slf4j.Logger;
37 import org.slf4j.LoggerFactory;
38 import scala.concurrent.Future;
39
40 /**
41  * ThreePhaseCommitCohortProxy represents a set of remote cohort proxies.
42  */
43 @Deprecated(since = "9.0.0", forRemoval = true)
44 final class ThreePhaseCommitCohortProxy implements DOMStoreThreePhaseCommitCohort {
45     private static final Logger LOG = LoggerFactory.getLogger(ThreePhaseCommitCohortProxy.class);
46
47     private static final MessageSupplier COMMIT_MESSAGE_SUPPLIER = new MessageSupplier() {
48         @Override
49         public Object newMessage(final TransactionIdentifier transactionId, final short version) {
50             return new CommitTransaction(transactionId, version).toSerializable();
51         }
52
53         @Override
54         public boolean isSerializedReplyType(final Object reply) {
55             return CommitTransactionReply.isSerializedType(reply);
56         }
57     };
58
59     private static final MessageSupplier ABORT_MESSAGE_SUPPLIER = new MessageSupplier() {
60         @Override
61         public Object newMessage(final TransactionIdentifier transactionId, final short version) {
62             return new AbortTransaction(transactionId, version).toSerializable();
63         }
64
65         @Override
66         public boolean isSerializedReplyType(final Object reply) {
67             return AbortTransactionReply.isSerializedType(reply);
68         }
69     };
70
71     private static final OperationCallback NO_OP_CALLBACK = new OperationCallback() {
72         @Override
73         public void run() {
74         }
75
76         @Override
77         public void success() {
78         }
79
80         @Override
81         public void failure() {
82         }
83
84         @Override
85         public void pause() {
86         }
87
88         @Override
89         public void resume() {
90         }
91     };
92
93
94     private final ActorUtils actorUtils;
95     private final List<CohortInfo> cohorts;
96     private final SettableFuture<Empty> cohortsResolvedFuture = SettableFuture.create();
97     private final TransactionIdentifier transactionId;
98     private volatile OperationCallback commitOperationCallback;
99
100     ThreePhaseCommitCohortProxy(final ActorUtils actorUtils, final List<CohortInfo> cohorts,
101             final TransactionIdentifier transactionId) {
102         this.actorUtils = actorUtils;
103         this.cohorts = cohorts;
104         this.transactionId = requireNonNull(transactionId);
105
106         if (cohorts.isEmpty()) {
107             cohortsResolvedFuture.set(Empty.value());
108         }
109     }
110
111     private ListenableFuture<Empty> resolveCohorts() {
112         if (cohortsResolvedFuture.isDone()) {
113             return cohortsResolvedFuture;
114         }
115
116         final AtomicInteger completed = new AtomicInteger(cohorts.size());
117         final Object lock = new Object();
118         for (final CohortInfo info: cohorts) {
119             info.getActorFuture().onComplete(new OnComplete<ActorSelection>() {
120                 @Override
121                 public void onComplete(final Throwable failure, final ActorSelection actor)  {
122                     synchronized (lock) {
123                         boolean done = completed.decrementAndGet() == 0;
124                         if (failure != null) {
125                             LOG.debug("Tx {}: a cohort Future failed", transactionId, failure);
126                             cohortsResolvedFuture.setException(failure);
127                         } else if (!cohortsResolvedFuture.isDone()) {
128                             LOG.debug("Tx {}: cohort actor {} resolved", transactionId, actor);
129
130                             info.setResolvedActor(actor);
131                             if (done) {
132                                 LOG.debug("Tx {}: successfully resolved all cohort actors", transactionId);
133                                 cohortsResolvedFuture.set(Empty.value());
134                             }
135                         }
136                     }
137                 }
138             }, actorUtils.getClientDispatcher());
139         }
140
141         return cohortsResolvedFuture;
142     }
143
144     @Override
145     public ListenableFuture<Boolean> canCommit() {
146         LOG.debug("Tx {} canCommit", transactionId);
147
148         final SettableFuture<Boolean> returnFuture = SettableFuture.create();
149
150         // The first phase of canCommit is to gather the list of cohort actor paths that will
151         // participate in the commit. buildCohortPathsList combines the cohort path Futures into
152         // one Future which we wait on asynchronously here. The cohort actor paths are
153         // extracted from ReadyTransactionReply messages by the Futures that were obtained earlier
154         // and passed to us from upstream processing. If any one fails then  we'll fail canCommit.
155
156         Futures.addCallback(resolveCohorts(), new FutureCallback<>() {
157             @Override
158             public void onSuccess(final Empty result) {
159                 finishCanCommit(returnFuture);
160             }
161
162             @Override
163             public void onFailure(final Throwable failure) {
164                 returnFuture.setException(failure);
165             }
166         }, MoreExecutors.directExecutor());
167
168         return returnFuture;
169     }
170
171     private void finishCanCommit(final SettableFuture<Boolean> returnFuture) {
172         LOG.debug("Tx {} finishCanCommit", transactionId);
173
174         // For empty transactions return immediately
175         if (cohorts.size() == 0) {
176             LOG.debug("Tx {}: canCommit returning result true", transactionId);
177             returnFuture.set(Boolean.TRUE);
178             return;
179         }
180
181         commitOperationCallback = new TransactionRateLimitingCallback(actorUtils);
182         commitOperationCallback.run();
183
184         final Iterator<CohortInfo> iterator = cohorts.iterator();
185
186         final OnComplete<Object> onComplete = new OnComplete<>() {
187             @Override
188             public void onComplete(final Throwable failure, final Object response) {
189                 if (failure != null) {
190                     LOG.debug("Tx {}: a canCommit cohort Future failed", transactionId, failure);
191
192                     returnFuture.setException(failure);
193                     commitOperationCallback.failure();
194                     return;
195                 }
196
197                 // Only the first call to pause takes effect - subsequent calls before resume are no-ops. So
198                 // this means we'll only time the first transaction canCommit which should be fine.
199                 commitOperationCallback.pause();
200
201                 boolean result = true;
202                 if (CanCommitTransactionReply.isSerializedType(response)) {
203                     CanCommitTransactionReply reply = CanCommitTransactionReply.fromSerializable(response);
204
205                     LOG.debug("Tx {}: received {}", transactionId, response);
206
207                     if (!reply.getCanCommit()) {
208                         result = false;
209                     }
210                 } else {
211                     LOG.error("Unexpected response type {}", response.getClass());
212                     returnFuture.setException(new IllegalArgumentException(
213                             String.format("Unexpected response type %s", response.getClass())));
214                     return;
215                 }
216
217                 if (iterator.hasNext() && result) {
218                     sendCanCommitTransaction(iterator.next(), this);
219                 } else {
220                     LOG.debug("Tx {}: canCommit returning result: {}", transactionId, result);
221                     returnFuture.set(result);
222                 }
223
224             }
225         };
226
227         sendCanCommitTransaction(iterator.next(), onComplete);
228     }
229
230     private void sendCanCommitTransaction(final CohortInfo toCohortInfo, final OnComplete<Object> onComplete) {
231         CanCommitTransaction message = new CanCommitTransaction(transactionId, toCohortInfo.getActorVersion());
232
233         LOG.debug("Tx {}: sending {} to {}", transactionId, message, toCohortInfo.getResolvedActor());
234
235         Future<Object> future = actorUtils.executeOperationAsync(toCohortInfo.getResolvedActor(),
236                 message.toSerializable(), actorUtils.getTransactionCommitOperationTimeout());
237         future.onComplete(onComplete, actorUtils.getClientDispatcher());
238     }
239
240     private Future<Iterable<Object>> invokeCohorts(final MessageSupplier messageSupplier) {
241         List<Future<Object>> futureList = new ArrayList<>(cohorts.size());
242         for (CohortInfo cohort : cohorts) {
243             Object message = messageSupplier.newMessage(transactionId, cohort.getActorVersion());
244
245             LOG.debug("Tx {}: Sending {} to cohort {}", transactionId, message , cohort.getResolvedActor());
246
247             futureList.add(actorUtils.executeOperationAsync(cohort.getResolvedActor(), message,
248                     actorUtils.getTransactionCommitOperationTimeout()));
249         }
250
251         return akka.dispatch.Futures.sequence(futureList, actorUtils.getClientDispatcher());
252     }
253
254     @Override
255     public ListenableFuture<Empty> preCommit() {
256         // We don't need to do anything here - preCommit is done atomically with the commit phase by the shard.
257         return Empty.immediateFuture();
258     }
259
260     @Override
261     public ListenableFuture<Empty> abort() {
262         // Note - we pass false for propagateException. In the front-end data broker, this method
263         // is called when one of the 3 phases fails with an exception. We'd rather have that
264         // original exception propagated to the client. If our abort fails and we propagate the
265         // exception then that exception will supersede and suppress the original exception. But
266         // it's the original exception that is the root cause and of more interest to the client.
267
268         return operation("abort", Empty.value(), ABORT_MESSAGE_SUPPLIER, AbortTransactionReply.class, false,
269             NO_OP_CALLBACK);
270     }
271
272     @Override
273     public ListenableFuture<? extends CommitInfo> commit() {
274         OperationCallback operationCallback = commitOperationCallback != null ? commitOperationCallback :
275             NO_OP_CALLBACK;
276
277         return operation("commit", CommitInfo.empty(), COMMIT_MESSAGE_SUPPLIER, CommitTransactionReply.class, true,
278             operationCallback);
279     }
280
281     @SuppressWarnings("checkstyle:IllegalCatch")
282     private static boolean successfulFuture(final ListenableFuture<?> future) {
283         if (!future.isDone()) {
284             return false;
285         }
286
287         try {
288             future.get();
289             return true;
290         } catch (Exception e) {
291             return false;
292         }
293     }
294
295     private <T> ListenableFuture<T> operation(final String operationName, final T futureValue,
296             final MessageSupplier messageSupplier, final Class<?> expectedResponseClass,
297             final boolean propagateException, final OperationCallback callback) {
298         LOG.debug("Tx {} {}", transactionId, operationName);
299
300         final SettableFuture<T> returnFuture = SettableFuture.create();
301
302         // The cohort actor list should already be built at this point by the canCommit phase but,
303         // if not for some reason, we'll try to build it here.
304
305         ListenableFuture<Empty> future = resolveCohorts();
306         if (successfulFuture(future)) {
307             finishOperation(operationName, messageSupplier, expectedResponseClass, propagateException, returnFuture,
308                 futureValue, callback);
309         } else {
310             Futures.addCallback(future, new FutureCallback<>() {
311                 @Override
312                 public void onSuccess(final Empty result) {
313                     finishOperation(operationName, messageSupplier, expectedResponseClass, propagateException,
314                         returnFuture, futureValue, callback);
315                 }
316
317                 @Override
318                 public void onFailure(final Throwable failure) {
319                     LOG.debug("Tx {}: a {} cohort path Future failed", transactionId, operationName, failure);
320
321                     if (propagateException) {
322                         returnFuture.setException(failure);
323                     } else {
324                         returnFuture.set(futureValue);
325                     }
326                 }
327             }, MoreExecutors.directExecutor());
328         }
329
330         return returnFuture;
331     }
332
333     private <T> void finishOperation(final String operationName, final MessageSupplier messageSupplier,
334                                      final Class<?> expectedResponseClass, final boolean propagateException,
335                                      final SettableFuture<T> returnFuture, final T futureValue,
336                                      final OperationCallback callback) {
337         LOG.debug("Tx {} finish {}", transactionId, operationName);
338
339         callback.resume();
340
341         Future<Iterable<Object>> combinedFuture = invokeCohorts(messageSupplier);
342
343         combinedFuture.onComplete(new OnComplete<Iterable<Object>>() {
344             @Override
345             public void onComplete(final Throwable failure, final Iterable<Object> responses) {
346                 Throwable exceptionToPropagate = failure;
347                 if (exceptionToPropagate == null) {
348                     for (Object response: responses) {
349                         if (!response.getClass().equals(expectedResponseClass)) {
350                             exceptionToPropagate = new IllegalArgumentException(
351                                     String.format("Unexpected response type %s", response.getClass()));
352                             break;
353                         }
354                     }
355                 }
356
357                 if (exceptionToPropagate != null) {
358                     LOG.debug("Tx {}: a {} cohort Future failed", transactionId, operationName, exceptionToPropagate);
359                     if (propagateException) {
360                         // We don't log the exception here to avoid redundant logging since we're
361                         // propagating to the caller in MD-SAL core who will log it.
362                         returnFuture.setException(exceptionToPropagate);
363                     } else {
364                         // Since the caller doesn't want us to propagate the exception we'll also
365                         // not log it normally. But it's usually not good to totally silence
366                         // exceptions so we'll log it to debug level.
367                         returnFuture.set(futureValue);
368                     }
369
370                     callback.failure();
371                 } else {
372                     LOG.debug("Tx {}: {} succeeded", transactionId, operationName);
373
374                     returnFuture.set(futureValue);
375
376                     callback.success();
377                 }
378             }
379         }, actorUtils.getClientDispatcher());
380     }
381
382     static class CohortInfo {
383         private final Future<ActorSelection> actorFuture;
384         private final Supplier<Short> actorVersionSupplier;
385
386         private volatile ActorSelection resolvedActor;
387
388         CohortInfo(final Future<ActorSelection> actorFuture, final Supplier<Short> actorVersionSupplier) {
389             this.actorFuture = actorFuture;
390             this.actorVersionSupplier = actorVersionSupplier;
391         }
392
393         Future<ActorSelection> getActorFuture() {
394             return actorFuture;
395         }
396
397         ActorSelection getResolvedActor() {
398             return resolvedActor;
399         }
400
401         void setResolvedActor(final ActorSelection resolvedActor) {
402             this.resolvedActor = resolvedActor;
403         }
404
405         short getActorVersion() {
406             checkState(resolvedActor != null, "getActorVersion cannot be called until the actor is resolved");
407             return actorVersionSupplier.get();
408         }
409     }
410
411     private interface MessageSupplier {
412         Object newMessage(TransactionIdentifier transactionId, short version);
413
414         boolean isSerializedReplyType(Object reply);
415     }
416 }