2 * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.controller.cluster.datastore;
10 import static com.google.common.base.Preconditions.checkState;
11 import static java.util.Objects.requireNonNull;
13 import akka.actor.ActorSelection;
14 import akka.dispatch.OnComplete;
15 import com.google.common.util.concurrent.FutureCallback;
16 import com.google.common.util.concurrent.Futures;
17 import com.google.common.util.concurrent.ListenableFuture;
18 import com.google.common.util.concurrent.MoreExecutors;
19 import com.google.common.util.concurrent.SettableFuture;
20 import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
21 import java.util.ArrayList;
22 import java.util.Iterator;
23 import java.util.List;
24 import java.util.concurrent.atomic.AtomicInteger;
25 import java.util.function.Supplier;
26 import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
27 import org.opendaylight.controller.cluster.datastore.messages.AbortTransaction;
28 import org.opendaylight.controller.cluster.datastore.messages.AbortTransactionReply;
29 import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransaction;
30 import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransactionReply;
31 import org.opendaylight.controller.cluster.datastore.messages.CommitTransaction;
32 import org.opendaylight.controller.cluster.datastore.messages.CommitTransactionReply;
33 import org.opendaylight.controller.cluster.datastore.utils.ActorUtils;
34 import org.slf4j.Logger;
35 import org.slf4j.LoggerFactory;
36 import scala.concurrent.Future;
39 * ThreePhaseCommitCohortProxy represents a set of remote cohort proxies.
41 public class ThreePhaseCommitCohortProxy extends AbstractThreePhaseCommitCohort<ActorSelection> {
43 private static final Logger LOG = LoggerFactory.getLogger(ThreePhaseCommitCohortProxy.class);
45 private static final MessageSupplier COMMIT_MESSAGE_SUPPLIER = new MessageSupplier() {
47 public Object newMessage(final TransactionIdentifier transactionId, final short version) {
48 return new CommitTransaction(transactionId, version).toSerializable();
52 public boolean isSerializedReplyType(final Object reply) {
53 return CommitTransactionReply.isSerializedType(reply);
57 private static final MessageSupplier ABORT_MESSAGE_SUPPLIER = new MessageSupplier() {
59 public Object newMessage(final TransactionIdentifier transactionId, final short version) {
60 return new AbortTransaction(transactionId, version).toSerializable();
64 public boolean isSerializedReplyType(final Object reply) {
65 return AbortTransactionReply.isSerializedType(reply);
69 private final ActorUtils actorUtils;
70 private final List<CohortInfo> cohorts;
71 private final SettableFuture<Void> cohortsResolvedFuture = SettableFuture.create();
72 private final TransactionIdentifier transactionId;
73 private volatile OperationCallback commitOperationCallback;
75 public ThreePhaseCommitCohortProxy(final ActorUtils actorUtils, final List<CohortInfo> cohorts,
76 final TransactionIdentifier transactionId) {
77 this.actorUtils = actorUtils;
78 this.cohorts = cohorts;
79 this.transactionId = requireNonNull(transactionId);
81 if (cohorts.isEmpty()) {
82 cohortsResolvedFuture.set(null);
86 private ListenableFuture<Void> resolveCohorts() {
87 if (cohortsResolvedFuture.isDone()) {
88 return cohortsResolvedFuture;
91 final AtomicInteger completed = new AtomicInteger(cohorts.size());
92 final Object lock = new Object();
93 for (final CohortInfo info: cohorts) {
94 info.getActorFuture().onComplete(new OnComplete<ActorSelection>() {
96 public void onComplete(final Throwable failure, final ActorSelection actor) {
98 boolean done = completed.decrementAndGet() == 0;
99 if (failure != null) {
100 LOG.debug("Tx {}: a cohort Future failed", transactionId, failure);
101 cohortsResolvedFuture.setException(failure);
102 } else if (!cohortsResolvedFuture.isDone()) {
103 LOG.debug("Tx {}: cohort actor {} resolved", transactionId, actor);
105 info.setResolvedActor(actor);
107 LOG.debug("Tx {}: successfully resolved all cohort actors", transactionId);
108 cohortsResolvedFuture.set(null);
113 }, actorUtils.getClientDispatcher());
116 return cohortsResolvedFuture;
120 public ListenableFuture<Boolean> canCommit() {
121 LOG.debug("Tx {} canCommit", transactionId);
123 final SettableFuture<Boolean> returnFuture = SettableFuture.create();
125 // The first phase of canCommit is to gather the list of cohort actor paths that will
126 // participate in the commit. buildCohortPathsList combines the cohort path Futures into
127 // one Future which we wait on asynchronously here. The cohort actor paths are
128 // extracted from ReadyTransactionReply messages by the Futures that were obtained earlier
129 // and passed to us from upstream processing. If any one fails then we'll fail canCommit.
131 Futures.addCallback(resolveCohorts(), new FutureCallback<Void>() {
133 public void onSuccess(final Void notUsed) {
134 finishCanCommit(returnFuture);
138 public void onFailure(final Throwable failure) {
139 returnFuture.setException(failure);
141 }, MoreExecutors.directExecutor());
146 @SuppressFBWarnings(value = "UPM_UNCALLED_PRIVATE_METHOD",
147 justification = "https://github.com/spotbugs/spotbugs/issues/811")
148 private void finishCanCommit(final SettableFuture<Boolean> returnFuture) {
149 LOG.debug("Tx {} finishCanCommit", transactionId);
151 // For empty transactions return immediately
152 if (cohorts.size() == 0) {
153 LOG.debug("Tx {}: canCommit returning result true", transactionId);
154 returnFuture.set(Boolean.TRUE);
158 commitOperationCallback = new TransactionRateLimitingCallback(actorUtils);
159 commitOperationCallback.run();
161 final Iterator<CohortInfo> iterator = cohorts.iterator();
163 final OnComplete<Object> onComplete = new OnComplete<Object>() {
165 public void onComplete(final Throwable failure, final Object response) {
166 if (failure != null) {
167 LOG.debug("Tx {}: a canCommit cohort Future failed", transactionId, failure);
169 returnFuture.setException(failure);
170 commitOperationCallback.failure();
174 // Only the first call to pause takes effect - subsequent calls before resume are no-ops. So
175 // this means we'll only time the first transaction canCommit which should be fine.
176 commitOperationCallback.pause();
178 boolean result = true;
179 if (CanCommitTransactionReply.isSerializedType(response)) {
180 CanCommitTransactionReply reply = CanCommitTransactionReply.fromSerializable(response);
182 LOG.debug("Tx {}: received {}", transactionId, response);
184 if (!reply.getCanCommit()) {
188 LOG.error("Unexpected response type {}", response.getClass());
189 returnFuture.setException(new IllegalArgumentException(
190 String.format("Unexpected response type %s", response.getClass())));
194 if (iterator.hasNext() && result) {
195 sendCanCommitTransaction(iterator.next(), this);
197 LOG.debug("Tx {}: canCommit returning result: {}", transactionId, result);
198 returnFuture.set(Boolean.valueOf(result));
204 sendCanCommitTransaction(iterator.next(), onComplete);
207 private void sendCanCommitTransaction(final CohortInfo toCohortInfo, final OnComplete<Object> onComplete) {
208 CanCommitTransaction message = new CanCommitTransaction(transactionId, toCohortInfo.getActorVersion());
210 LOG.debug("Tx {}: sending {} to {}", transactionId, message, toCohortInfo.getResolvedActor());
212 Future<Object> future = actorUtils.executeOperationAsync(toCohortInfo.getResolvedActor(),
213 message.toSerializable(), actorUtils.getTransactionCommitOperationTimeout());
214 future.onComplete(onComplete, actorUtils.getClientDispatcher());
217 private Future<Iterable<Object>> invokeCohorts(final MessageSupplier messageSupplier) {
218 List<Future<Object>> futureList = new ArrayList<>(cohorts.size());
219 for (CohortInfo cohort : cohorts) {
220 Object message = messageSupplier.newMessage(transactionId, cohort.getActorVersion());
222 LOG.debug("Tx {}: Sending {} to cohort {}", transactionId, message , cohort.getResolvedActor());
224 futureList.add(actorUtils.executeOperationAsync(cohort.getResolvedActor(), message,
225 actorUtils.getTransactionCommitOperationTimeout()));
228 return akka.dispatch.Futures.sequence(futureList, actorUtils.getClientDispatcher());
232 public ListenableFuture<Void> preCommit() {
233 // We don't need to do anything here - preCommit is done atomically with the commit phase
235 return IMMEDIATE_VOID_SUCCESS;
239 public ListenableFuture<Void> abort() {
240 // Note - we pass false for propagateException. In the front-end data broker, this method
241 // is called when one of the 3 phases fails with an exception. We'd rather have that
242 // original exception propagated to the client. If our abort fails and we propagate the
243 // exception then that exception will supersede and suppress the original exception. But
244 // it's the original exception that is the root cause and of more interest to the client.
246 return voidOperation("abort", ABORT_MESSAGE_SUPPLIER,
247 AbortTransactionReply.class, false, OperationCallback.NO_OP_CALLBACK);
251 public ListenableFuture<Void> commit() {
252 OperationCallback operationCallback = commitOperationCallback != null ? commitOperationCallback :
253 OperationCallback.NO_OP_CALLBACK;
255 return voidOperation("commit", COMMIT_MESSAGE_SUPPLIER,
256 CommitTransactionReply.class, true, operationCallback);
259 @SuppressWarnings("checkstyle:IllegalCatch")
260 private static boolean successfulFuture(final ListenableFuture<Void> future) {
261 if (!future.isDone()) {
268 } catch (Exception e) {
273 private ListenableFuture<Void> voidOperation(final String operationName,
274 final MessageSupplier messageSupplier, final Class<?> expectedResponseClass,
275 final boolean propagateException, final OperationCallback callback) {
276 LOG.debug("Tx {} {}", transactionId, operationName);
278 final SettableFuture<Void> returnFuture = SettableFuture.create();
280 // The cohort actor list should already be built at this point by the canCommit phase but,
281 // if not for some reason, we'll try to build it here.
283 ListenableFuture<Void> future = resolveCohorts();
284 if (successfulFuture(future)) {
285 finishVoidOperation(operationName, messageSupplier, expectedResponseClass, propagateException,
286 returnFuture, callback);
288 Futures.addCallback(future, new FutureCallback<Void>() {
290 public void onSuccess(final Void notUsed) {
291 finishVoidOperation(operationName, messageSupplier, expectedResponseClass,
292 propagateException, returnFuture, callback);
296 public void onFailure(final Throwable failure) {
297 LOG.debug("Tx {}: a {} cohort path Future failed", transactionId, operationName, failure);
299 if (propagateException) {
300 returnFuture.setException(failure);
302 returnFuture.set(null);
305 }, MoreExecutors.directExecutor());
311 private void finishVoidOperation(final String operationName, final MessageSupplier messageSupplier,
312 final Class<?> expectedResponseClass, final boolean propagateException,
313 final SettableFuture<Void> returnFuture, final OperationCallback callback) {
314 LOG.debug("Tx {} finish {}", transactionId, operationName);
318 Future<Iterable<Object>> combinedFuture = invokeCohorts(messageSupplier);
320 combinedFuture.onComplete(new OnComplete<Iterable<Object>>() {
322 public void onComplete(final Throwable failure, final Iterable<Object> responses) {
323 Throwable exceptionToPropagate = failure;
324 if (exceptionToPropagate == null) {
325 for (Object response: responses) {
326 if (!response.getClass().equals(expectedResponseClass)) {
327 exceptionToPropagate = new IllegalArgumentException(
328 String.format("Unexpected response type %s", response.getClass()));
334 if (exceptionToPropagate != null) {
335 LOG.debug("Tx {}: a {} cohort Future failed", transactionId, operationName, exceptionToPropagate);
336 if (propagateException) {
337 // We don't log the exception here to avoid redundant logging since we're
338 // propagating to the caller in MD-SAL core who will log it.
339 returnFuture.setException(exceptionToPropagate);
341 // Since the caller doesn't want us to propagate the exception we'll also
342 // not log it normally. But it's usually not good to totally silence
343 // exceptions so we'll log it to debug level.
344 returnFuture.set(null);
349 LOG.debug("Tx {}: {} succeeded", transactionId, operationName);
351 returnFuture.set(null);
356 }, actorUtils.getClientDispatcher());
360 List<Future<ActorSelection>> getCohortFutures() {
361 List<Future<ActorSelection>> cohortFutures = new ArrayList<>(cohorts.size());
362 for (CohortInfo info: cohorts) {
363 cohortFutures.add(info.getActorFuture());
366 return cohortFutures;
369 static class CohortInfo {
370 private final Future<ActorSelection> actorFuture;
371 private final Supplier<Short> actorVersionSupplier;
373 private volatile ActorSelection resolvedActor;
375 CohortInfo(final Future<ActorSelection> actorFuture, final Supplier<Short> actorVersionSupplier) {
376 this.actorFuture = actorFuture;
377 this.actorVersionSupplier = actorVersionSupplier;
380 Future<ActorSelection> getActorFuture() {
384 ActorSelection getResolvedActor() {
385 return resolvedActor;
388 void setResolvedActor(final ActorSelection resolvedActor) {
389 this.resolvedActor = resolvedActor;
392 short getActorVersion() {
393 checkState(resolvedActor != null, "getActorVersion cannot be called until the actor is resolved");
394 return actorVersionSupplier.get();
398 private interface MessageSupplier {
399 Object newMessage(TransactionIdentifier transactionId, short version);
401 boolean isSerializedReplyType(Object reply);