2 * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.controller.cluster.datastore;
10 import static com.google.common.base.Preconditions.checkState;
11 import static java.util.Objects.requireNonNull;
13 import akka.actor.ActorSelection;
14 import akka.dispatch.OnComplete;
15 import com.google.common.base.Supplier;
16 import com.google.common.collect.Lists;
17 import com.google.common.util.concurrent.FutureCallback;
18 import com.google.common.util.concurrent.Futures;
19 import com.google.common.util.concurrent.ListenableFuture;
20 import com.google.common.util.concurrent.MoreExecutors;
21 import com.google.common.util.concurrent.SettableFuture;
22 import java.util.ArrayList;
23 import java.util.Iterator;
24 import java.util.List;
25 import java.util.concurrent.atomic.AtomicInteger;
26 import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
27 import org.opendaylight.controller.cluster.datastore.messages.AbortTransaction;
28 import org.opendaylight.controller.cluster.datastore.messages.AbortTransactionReply;
29 import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransaction;
30 import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransactionReply;
31 import org.opendaylight.controller.cluster.datastore.messages.CommitTransaction;
32 import org.opendaylight.controller.cluster.datastore.messages.CommitTransactionReply;
33 import org.opendaylight.controller.cluster.datastore.utils.ActorUtils;
34 import org.slf4j.Logger;
35 import org.slf4j.LoggerFactory;
36 import scala.concurrent.Future;
39 * ThreePhaseCommitCohortProxy represents a set of remote cohort proxies.
41 public class ThreePhaseCommitCohortProxy extends AbstractThreePhaseCommitCohort<ActorSelection> {
43 private static final Logger LOG = LoggerFactory.getLogger(ThreePhaseCommitCohortProxy.class);
45 private static final MessageSupplier COMMIT_MESSAGE_SUPPLIER = new MessageSupplier() {
47 public Object newMessage(final TransactionIdentifier transactionId, final short version) {
48 return new CommitTransaction(transactionId, version).toSerializable();
52 public boolean isSerializedReplyType(final Object reply) {
53 return CommitTransactionReply.isSerializedType(reply);
57 private static final MessageSupplier ABORT_MESSAGE_SUPPLIER = new MessageSupplier() {
59 public Object newMessage(final TransactionIdentifier transactionId, final short version) {
60 return new AbortTransaction(transactionId, version).toSerializable();
64 public boolean isSerializedReplyType(final Object reply) {
65 return AbortTransactionReply.isSerializedType(reply);
69 private final ActorUtils actorUtils;
70 private final List<CohortInfo> cohorts;
71 private final SettableFuture<Void> cohortsResolvedFuture = SettableFuture.create();
72 private final TransactionIdentifier transactionId;
73 private volatile OperationCallback commitOperationCallback;
75 public ThreePhaseCommitCohortProxy(final ActorUtils actorUtils, final List<CohortInfo> cohorts,
76 final TransactionIdentifier transactionId) {
77 this.actorUtils = actorUtils;
78 this.cohorts = cohorts;
79 this.transactionId = requireNonNull(transactionId);
81 if (cohorts.isEmpty()) {
82 cohortsResolvedFuture.set(null);
86 private ListenableFuture<Void> resolveCohorts() {
87 if (cohortsResolvedFuture.isDone()) {
88 return cohortsResolvedFuture;
91 final AtomicInteger completed = new AtomicInteger(cohorts.size());
92 final Object lock = new Object();
93 for (final CohortInfo info: cohorts) {
94 info.getActorFuture().onComplete(new OnComplete<ActorSelection>() {
96 public void onComplete(final Throwable failure, final ActorSelection actor) {
98 boolean done = completed.decrementAndGet() == 0;
99 if (failure != null) {
100 LOG.debug("Tx {}: a cohort Future failed", transactionId, failure);
101 cohortsResolvedFuture.setException(failure);
102 } else if (!cohortsResolvedFuture.isDone()) {
103 LOG.debug("Tx {}: cohort actor {} resolved", transactionId, actor);
105 info.setResolvedActor(actor);
107 LOG.debug("Tx {}: successfully resolved all cohort actors", transactionId);
108 cohortsResolvedFuture.set(null);
113 }, actorUtils.getClientDispatcher());
116 return cohortsResolvedFuture;
120 public ListenableFuture<Boolean> canCommit() {
121 LOG.debug("Tx {} canCommit", transactionId);
123 final SettableFuture<Boolean> returnFuture = SettableFuture.create();
125 // The first phase of canCommit is to gather the list of cohort actor paths that will
126 // participate in the commit. buildCohortPathsList combines the cohort path Futures into
127 // one Future which we wait on asynchronously here. The cohort actor paths are
128 // extracted from ReadyTransactionReply messages by the Futures that were obtained earlier
129 // and passed to us from upstream processing. If any one fails then we'll fail canCommit.
131 Futures.addCallback(resolveCohorts(), new FutureCallback<Void>() {
133 public void onSuccess(final Void notUsed) {
134 finishCanCommit(returnFuture);
138 public void onFailure(final Throwable failure) {
139 returnFuture.setException(failure);
141 }, MoreExecutors.directExecutor());
146 private void finishCanCommit(final SettableFuture<Boolean> returnFuture) {
147 LOG.debug("Tx {} finishCanCommit", transactionId);
149 // For empty transactions return immediately
150 if (cohorts.size() == 0) {
151 LOG.debug("Tx {}: canCommit returning result true", transactionId);
152 returnFuture.set(Boolean.TRUE);
156 commitOperationCallback = new TransactionRateLimitingCallback(actorUtils);
157 commitOperationCallback.run();
159 final Iterator<CohortInfo> iterator = cohorts.iterator();
161 final OnComplete<Object> onComplete = new OnComplete<Object>() {
163 public void onComplete(final Throwable failure, final Object response) {
164 if (failure != null) {
165 LOG.debug("Tx {}: a canCommit cohort Future failed", transactionId, failure);
167 returnFuture.setException(failure);
168 commitOperationCallback.failure();
172 // Only the first call to pause takes effect - subsequent calls before resume are no-ops. So
173 // this means we'll only time the first transaction canCommit which should be fine.
174 commitOperationCallback.pause();
176 boolean result = true;
177 if (CanCommitTransactionReply.isSerializedType(response)) {
178 CanCommitTransactionReply reply = CanCommitTransactionReply.fromSerializable(response);
180 LOG.debug("Tx {}: received {}", transactionId, response);
182 if (!reply.getCanCommit()) {
186 LOG.error("Unexpected response type {}", response.getClass());
187 returnFuture.setException(new IllegalArgumentException(
188 String.format("Unexpected response type %s", response.getClass())));
192 if (iterator.hasNext() && result) {
193 sendCanCommitTransaction(iterator.next(), this);
195 LOG.debug("Tx {}: canCommit returning result: {}", transactionId, result);
196 returnFuture.set(Boolean.valueOf(result));
202 sendCanCommitTransaction(iterator.next(), onComplete);
205 private void sendCanCommitTransaction(final CohortInfo toCohortInfo, final OnComplete<Object> onComplete) {
206 CanCommitTransaction message = new CanCommitTransaction(transactionId, toCohortInfo.getActorVersion());
208 LOG.debug("Tx {}: sending {} to {}", transactionId, message, toCohortInfo.getResolvedActor());
210 Future<Object> future = actorUtils.executeOperationAsync(toCohortInfo.getResolvedActor(),
211 message.toSerializable(), actorUtils.getTransactionCommitOperationTimeout());
212 future.onComplete(onComplete, actorUtils.getClientDispatcher());
215 private Future<Iterable<Object>> invokeCohorts(final MessageSupplier messageSupplier) {
216 List<Future<Object>> futureList = Lists.newArrayListWithCapacity(cohorts.size());
217 for (CohortInfo cohort : cohorts) {
218 Object message = messageSupplier.newMessage(transactionId, cohort.getActorVersion());
220 LOG.debug("Tx {}: Sending {} to cohort {}", transactionId, message , cohort.getResolvedActor());
222 futureList.add(actorUtils.executeOperationAsync(cohort.getResolvedActor(), message,
223 actorUtils.getTransactionCommitOperationTimeout()));
226 return akka.dispatch.Futures.sequence(futureList, actorUtils.getClientDispatcher());
230 public ListenableFuture<Void> preCommit() {
231 // We don't need to do anything here - preCommit is done atomically with the commit phase
233 return IMMEDIATE_VOID_SUCCESS;
237 public ListenableFuture<Void> abort() {
238 // Note - we pass false for propagateException. In the front-end data broker, this method
239 // is called when one of the 3 phases fails with an exception. We'd rather have that
240 // original exception propagated to the client. If our abort fails and we propagate the
241 // exception then that exception will supersede and suppress the original exception. But
242 // it's the original exception that is the root cause and of more interest to the client.
244 return voidOperation("abort", ABORT_MESSAGE_SUPPLIER,
245 AbortTransactionReply.class, false, OperationCallback.NO_OP_CALLBACK);
249 public ListenableFuture<Void> commit() {
250 OperationCallback operationCallback = commitOperationCallback != null ? commitOperationCallback :
251 OperationCallback.NO_OP_CALLBACK;
253 return voidOperation("commit", COMMIT_MESSAGE_SUPPLIER,
254 CommitTransactionReply.class, true, operationCallback);
257 @SuppressWarnings("checkstyle:IllegalCatch")
258 private static boolean successfulFuture(final ListenableFuture<Void> future) {
259 if (!future.isDone()) {
266 } catch (Exception e) {
271 private ListenableFuture<Void> voidOperation(final String operationName,
272 final MessageSupplier messageSupplier, final Class<?> expectedResponseClass,
273 final boolean propagateException, final OperationCallback callback) {
274 LOG.debug("Tx {} {}", transactionId, operationName);
276 final SettableFuture<Void> returnFuture = SettableFuture.create();
278 // The cohort actor list should already be built at this point by the canCommit phase but,
279 // if not for some reason, we'll try to build it here.
281 ListenableFuture<Void> future = resolveCohorts();
282 if (successfulFuture(future)) {
283 finishVoidOperation(operationName, messageSupplier, expectedResponseClass, propagateException,
284 returnFuture, callback);
286 Futures.addCallback(future, new FutureCallback<Void>() {
288 public void onSuccess(final Void notUsed) {
289 finishVoidOperation(operationName, messageSupplier, expectedResponseClass,
290 propagateException, returnFuture, callback);
294 public void onFailure(final Throwable failure) {
295 LOG.debug("Tx {}: a {} cohort path Future failed", transactionId, operationName, failure);
297 if (propagateException) {
298 returnFuture.setException(failure);
300 returnFuture.set(null);
303 }, MoreExecutors.directExecutor());
309 private void finishVoidOperation(final String operationName, final MessageSupplier messageSupplier,
310 final Class<?> expectedResponseClass, final boolean propagateException,
311 final SettableFuture<Void> returnFuture, final OperationCallback callback) {
312 LOG.debug("Tx {} finish {}", transactionId, operationName);
316 Future<Iterable<Object>> combinedFuture = invokeCohorts(messageSupplier);
318 combinedFuture.onComplete(new OnComplete<Iterable<Object>>() {
320 public void onComplete(final Throwable failure, final Iterable<Object> responses) {
321 Throwable exceptionToPropagate = failure;
322 if (exceptionToPropagate == null) {
323 for (Object response: responses) {
324 if (!response.getClass().equals(expectedResponseClass)) {
325 exceptionToPropagate = new IllegalArgumentException(
326 String.format("Unexpected response type %s", response.getClass()));
332 if (exceptionToPropagate != null) {
333 LOG.debug("Tx {}: a {} cohort Future failed", transactionId, operationName, exceptionToPropagate);
334 if (propagateException) {
335 // We don't log the exception here to avoid redundant logging since we're
336 // propagating to the caller in MD-SAL core who will log it.
337 returnFuture.setException(exceptionToPropagate);
339 // Since the caller doesn't want us to propagate the exception we'll also
340 // not log it normally. But it's usually not good to totally silence
341 // exceptions so we'll log it to debug level.
342 returnFuture.set(null);
347 LOG.debug("Tx {}: {} succeeded", transactionId, operationName);
349 returnFuture.set(null);
354 }, actorUtils.getClientDispatcher());
358 List<Future<ActorSelection>> getCohortFutures() {
359 List<Future<ActorSelection>> cohortFutures = new ArrayList<>(cohorts.size());
360 for (CohortInfo info: cohorts) {
361 cohortFutures.add(info.getActorFuture());
364 return cohortFutures;
367 static class CohortInfo {
368 private final Future<ActorSelection> actorFuture;
369 private volatile ActorSelection resolvedActor;
370 private final Supplier<Short> actorVersionSupplier;
372 CohortInfo(final Future<ActorSelection> actorFuture, final Supplier<Short> actorVersionSupplier) {
373 this.actorFuture = actorFuture;
374 this.actorVersionSupplier = actorVersionSupplier;
377 Future<ActorSelection> getActorFuture() {
381 ActorSelection getResolvedActor() {
382 return resolvedActor;
385 void setResolvedActor(final ActorSelection resolvedActor) {
386 this.resolvedActor = resolvedActor;
389 short getActorVersion() {
390 checkState(resolvedActor != null, "getActorVersion cannot be called until the actor is resolved");
391 return actorVersionSupplier.get();
395 private interface MessageSupplier {
396 Object newMessage(TransactionIdentifier transactionId, short version);
398 boolean isSerializedReplyType(Object reply);