2 * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.controller.cluster.datastore;
10 import static com.google.common.base.Preconditions.checkState;
11 import static java.util.Objects.requireNonNull;
13 import akka.actor.ActorSelection;
14 import akka.dispatch.OnComplete;
15 import com.google.common.util.concurrent.FutureCallback;
16 import com.google.common.util.concurrent.Futures;
17 import com.google.common.util.concurrent.ListenableFuture;
18 import com.google.common.util.concurrent.MoreExecutors;
19 import com.google.common.util.concurrent.SettableFuture;
20 import java.util.ArrayList;
21 import java.util.Iterator;
22 import java.util.List;
23 import java.util.concurrent.atomic.AtomicInteger;
24 import java.util.function.Supplier;
25 import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
26 import org.opendaylight.controller.cluster.datastore.messages.AbortTransaction;
27 import org.opendaylight.controller.cluster.datastore.messages.AbortTransactionReply;
28 import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransaction;
29 import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransactionReply;
30 import org.opendaylight.controller.cluster.datastore.messages.CommitTransaction;
31 import org.opendaylight.controller.cluster.datastore.messages.CommitTransactionReply;
32 import org.opendaylight.controller.cluster.datastore.utils.ActorUtils;
33 import org.opendaylight.mdsal.common.api.CommitInfo;
34 import org.opendaylight.yangtools.yang.common.Empty;
35 import org.slf4j.Logger;
36 import org.slf4j.LoggerFactory;
37 import scala.concurrent.Future;
40 * ThreePhaseCommitCohortProxy represents a set of remote cohort proxies.
42 public class ThreePhaseCommitCohortProxy extends AbstractThreePhaseCommitCohort<ActorSelection> {
44 private static final Logger LOG = LoggerFactory.getLogger(ThreePhaseCommitCohortProxy.class);
46 private static final MessageSupplier COMMIT_MESSAGE_SUPPLIER = new MessageSupplier() {
48 public Object newMessage(final TransactionIdentifier transactionId, final short version) {
49 return new CommitTransaction(transactionId, version).toSerializable();
53 public boolean isSerializedReplyType(final Object reply) {
54 return CommitTransactionReply.isSerializedType(reply);
58 private static final MessageSupplier ABORT_MESSAGE_SUPPLIER = new MessageSupplier() {
60 public Object newMessage(final TransactionIdentifier transactionId, final short version) {
61 return new AbortTransaction(transactionId, version).toSerializable();
65 public boolean isSerializedReplyType(final Object reply) {
66 return AbortTransactionReply.isSerializedType(reply);
70 private final ActorUtils actorUtils;
71 private final List<CohortInfo> cohorts;
72 private final SettableFuture<Empty> cohortsResolvedFuture = SettableFuture.create();
73 private final TransactionIdentifier transactionId;
74 private volatile OperationCallback commitOperationCallback;
76 public ThreePhaseCommitCohortProxy(final ActorUtils actorUtils, final List<CohortInfo> cohorts,
77 final TransactionIdentifier transactionId) {
78 this.actorUtils = actorUtils;
79 this.cohorts = cohorts;
80 this.transactionId = requireNonNull(transactionId);
82 if (cohorts.isEmpty()) {
83 cohortsResolvedFuture.set(Empty.value());
87 private ListenableFuture<Empty> resolveCohorts() {
88 if (cohortsResolvedFuture.isDone()) {
89 return cohortsResolvedFuture;
92 final AtomicInteger completed = new AtomicInteger(cohorts.size());
93 final Object lock = new Object();
94 for (final CohortInfo info: cohorts) {
95 info.getActorFuture().onComplete(new OnComplete<ActorSelection>() {
97 public void onComplete(final Throwable failure, final ActorSelection actor) {
99 boolean done = completed.decrementAndGet() == 0;
100 if (failure != null) {
101 LOG.debug("Tx {}: a cohort Future failed", transactionId, failure);
102 cohortsResolvedFuture.setException(failure);
103 } else if (!cohortsResolvedFuture.isDone()) {
104 LOG.debug("Tx {}: cohort actor {} resolved", transactionId, actor);
106 info.setResolvedActor(actor);
108 LOG.debug("Tx {}: successfully resolved all cohort actors", transactionId);
109 cohortsResolvedFuture.set(Empty.value());
114 }, actorUtils.getClientDispatcher());
117 return cohortsResolvedFuture;
121 public ListenableFuture<Boolean> canCommit() {
122 LOG.debug("Tx {} canCommit", transactionId);
124 final SettableFuture<Boolean> returnFuture = SettableFuture.create();
126 // The first phase of canCommit is to gather the list of cohort actor paths that will
127 // participate in the commit. buildCohortPathsList combines the cohort path Futures into
128 // one Future which we wait on asynchronously here. The cohort actor paths are
129 // extracted from ReadyTransactionReply messages by the Futures that were obtained earlier
130 // and passed to us from upstream processing. If any one fails then we'll fail canCommit.
132 Futures.addCallback(resolveCohorts(), new FutureCallback<>() {
134 public void onSuccess(final Empty result) {
135 finishCanCommit(returnFuture);
139 public void onFailure(final Throwable failure) {
140 returnFuture.setException(failure);
142 }, MoreExecutors.directExecutor());
147 private void finishCanCommit(final SettableFuture<Boolean> returnFuture) {
148 LOG.debug("Tx {} finishCanCommit", transactionId);
150 // For empty transactions return immediately
151 if (cohorts.size() == 0) {
152 LOG.debug("Tx {}: canCommit returning result true", transactionId);
153 returnFuture.set(Boolean.TRUE);
157 commitOperationCallback = new TransactionRateLimitingCallback(actorUtils);
158 commitOperationCallback.run();
160 final Iterator<CohortInfo> iterator = cohorts.iterator();
162 final OnComplete<Object> onComplete = new OnComplete<>() {
164 public void onComplete(final Throwable failure, final Object response) {
165 if (failure != null) {
166 LOG.debug("Tx {}: a canCommit cohort Future failed", transactionId, failure);
168 returnFuture.setException(failure);
169 commitOperationCallback.failure();
173 // Only the first call to pause takes effect - subsequent calls before resume are no-ops. So
174 // this means we'll only time the first transaction canCommit which should be fine.
175 commitOperationCallback.pause();
177 boolean result = true;
178 if (CanCommitTransactionReply.isSerializedType(response)) {
179 CanCommitTransactionReply reply = CanCommitTransactionReply.fromSerializable(response);
181 LOG.debug("Tx {}: received {}", transactionId, response);
183 if (!reply.getCanCommit()) {
187 LOG.error("Unexpected response type {}", response.getClass());
188 returnFuture.setException(new IllegalArgumentException(
189 String.format("Unexpected response type %s", response.getClass())));
193 if (iterator.hasNext() && result) {
194 sendCanCommitTransaction(iterator.next(), this);
196 LOG.debug("Tx {}: canCommit returning result: {}", transactionId, result);
197 returnFuture.set(result);
203 sendCanCommitTransaction(iterator.next(), onComplete);
206 private void sendCanCommitTransaction(final CohortInfo toCohortInfo, final OnComplete<Object> onComplete) {
207 CanCommitTransaction message = new CanCommitTransaction(transactionId, toCohortInfo.getActorVersion());
209 LOG.debug("Tx {}: sending {} to {}", transactionId, message, toCohortInfo.getResolvedActor());
211 Future<Object> future = actorUtils.executeOperationAsync(toCohortInfo.getResolvedActor(),
212 message.toSerializable(), actorUtils.getTransactionCommitOperationTimeout());
213 future.onComplete(onComplete, actorUtils.getClientDispatcher());
216 private Future<Iterable<Object>> invokeCohorts(final MessageSupplier messageSupplier) {
217 List<Future<Object>> futureList = new ArrayList<>(cohorts.size());
218 for (CohortInfo cohort : cohorts) {
219 Object message = messageSupplier.newMessage(transactionId, cohort.getActorVersion());
221 LOG.debug("Tx {}: Sending {} to cohort {}", transactionId, message , cohort.getResolvedActor());
223 futureList.add(actorUtils.executeOperationAsync(cohort.getResolvedActor(), message,
224 actorUtils.getTransactionCommitOperationTimeout()));
227 return akka.dispatch.Futures.sequence(futureList, actorUtils.getClientDispatcher());
231 public ListenableFuture<Empty> preCommit() {
232 // We don't need to do anything here - preCommit is done atomically with the commit phase by the shard.
233 return IMMEDIATE_EMPTY_SUCCESS;
237 public ListenableFuture<Empty> abort() {
238 // Note - we pass false for propagateException. In the front-end data broker, this method
239 // is called when one of the 3 phases fails with an exception. We'd rather have that
240 // original exception propagated to the client. If our abort fails and we propagate the
241 // exception then that exception will supersede and suppress the original exception. But
242 // it's the original exception that is the root cause and of more interest to the client.
244 return operation("abort", Empty.value(), ABORT_MESSAGE_SUPPLIER, AbortTransactionReply.class, false,
245 OperationCallback.NO_OP_CALLBACK);
249 public ListenableFuture<? extends CommitInfo> commit() {
250 OperationCallback operationCallback = commitOperationCallback != null ? commitOperationCallback :
251 OperationCallback.NO_OP_CALLBACK;
253 return operation("commit", CommitInfo.empty(), COMMIT_MESSAGE_SUPPLIER, CommitTransactionReply.class, true,
257 @SuppressWarnings("checkstyle:IllegalCatch")
258 private static boolean successfulFuture(final ListenableFuture<?> future) {
259 if (!future.isDone()) {
266 } catch (Exception e) {
271 private <T> ListenableFuture<T> operation(final String operationName, final T futureValue,
272 final MessageSupplier messageSupplier, final Class<?> expectedResponseClass,
273 final boolean propagateException, final OperationCallback callback) {
274 LOG.debug("Tx {} {}", transactionId, operationName);
276 final SettableFuture<T> returnFuture = SettableFuture.create();
278 // The cohort actor list should already be built at this point by the canCommit phase but,
279 // if not for some reason, we'll try to build it here.
281 ListenableFuture<Empty> future = resolveCohorts();
282 if (successfulFuture(future)) {
283 finishOperation(operationName, messageSupplier, expectedResponseClass, propagateException, returnFuture,
284 futureValue, callback);
286 Futures.addCallback(future, new FutureCallback<>() {
288 public void onSuccess(final Empty result) {
289 finishOperation(operationName, messageSupplier, expectedResponseClass, propagateException,
290 returnFuture, futureValue, callback);
294 public void onFailure(final Throwable failure) {
295 LOG.debug("Tx {}: a {} cohort path Future failed", transactionId, operationName, failure);
297 if (propagateException) {
298 returnFuture.setException(failure);
300 returnFuture.set(futureValue);
303 }, MoreExecutors.directExecutor());
309 private <T> void finishOperation(final String operationName, final MessageSupplier messageSupplier,
310 final Class<?> expectedResponseClass, final boolean propagateException,
311 final SettableFuture<T> returnFuture, final T futureValue,
312 final OperationCallback callback) {
313 LOG.debug("Tx {} finish {}", transactionId, operationName);
317 Future<Iterable<Object>> combinedFuture = invokeCohorts(messageSupplier);
319 combinedFuture.onComplete(new OnComplete<Iterable<Object>>() {
321 public void onComplete(final Throwable failure, final Iterable<Object> responses) {
322 Throwable exceptionToPropagate = failure;
323 if (exceptionToPropagate == null) {
324 for (Object response: responses) {
325 if (!response.getClass().equals(expectedResponseClass)) {
326 exceptionToPropagate = new IllegalArgumentException(
327 String.format("Unexpected response type %s", response.getClass()));
333 if (exceptionToPropagate != null) {
334 LOG.debug("Tx {}: a {} cohort Future failed", transactionId, operationName, exceptionToPropagate);
335 if (propagateException) {
336 // We don't log the exception here to avoid redundant logging since we're
337 // propagating to the caller in MD-SAL core who will log it.
338 returnFuture.setException(exceptionToPropagate);
340 // Since the caller doesn't want us to propagate the exception we'll also
341 // not log it normally. But it's usually not good to totally silence
342 // exceptions so we'll log it to debug level.
343 returnFuture.set(futureValue);
348 LOG.debug("Tx {}: {} succeeded", transactionId, operationName);
350 returnFuture.set(futureValue);
355 }, actorUtils.getClientDispatcher());
359 List<Future<ActorSelection>> getCohortFutures() {
360 List<Future<ActorSelection>> cohortFutures = new ArrayList<>(cohorts.size());
361 for (CohortInfo info: cohorts) {
362 cohortFutures.add(info.getActorFuture());
365 return cohortFutures;
368 static class CohortInfo {
369 private final Future<ActorSelection> actorFuture;
370 private final Supplier<Short> actorVersionSupplier;
372 private volatile ActorSelection resolvedActor;
374 CohortInfo(final Future<ActorSelection> actorFuture, final Supplier<Short> actorVersionSupplier) {
375 this.actorFuture = actorFuture;
376 this.actorVersionSupplier = actorVersionSupplier;
379 Future<ActorSelection> getActorFuture() {
383 ActorSelection getResolvedActor() {
384 return resolvedActor;
387 void setResolvedActor(final ActorSelection resolvedActor) {
388 this.resolvedActor = resolvedActor;
391 short getActorVersion() {
392 checkState(resolvedActor != null, "getActorVersion cannot be called until the actor is resolved");
393 return actorVersionSupplier.get();
397 private interface MessageSupplier {
398 Object newMessage(TransactionIdentifier transactionId, short version);
400 boolean isSerializedReplyType(Object reply);