2 * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
9 package org.opendaylight.controller.cluster.datastore;
11 import akka.actor.ActorSelection;
12 import akka.dispatch.OnComplete;
13 import com.google.common.base.Preconditions;
14 import com.google.common.base.Supplier;
15 import com.google.common.collect.Lists;
16 import com.google.common.util.concurrent.FutureCallback;
17 import com.google.common.util.concurrent.Futures;
18 import com.google.common.util.concurrent.ListenableFuture;
19 import com.google.common.util.concurrent.MoreExecutors;
20 import com.google.common.util.concurrent.SettableFuture;
21 import java.util.ArrayList;
22 import java.util.Iterator;
23 import java.util.List;
24 import java.util.concurrent.atomic.AtomicInteger;
25 import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
26 import org.opendaylight.controller.cluster.datastore.messages.AbortTransaction;
27 import org.opendaylight.controller.cluster.datastore.messages.AbortTransactionReply;
28 import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransaction;
29 import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransactionReply;
30 import org.opendaylight.controller.cluster.datastore.messages.CommitTransaction;
31 import org.opendaylight.controller.cluster.datastore.messages.CommitTransactionReply;
32 import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
33 import org.slf4j.Logger;
34 import org.slf4j.LoggerFactory;
35 import scala.concurrent.Future;
38 * ThreePhaseCommitCohortProxy represents a set of remote cohort proxies.
40 public class ThreePhaseCommitCohortProxy extends AbstractThreePhaseCommitCohort<ActorSelection> {
42 private static final Logger LOG = LoggerFactory.getLogger(ThreePhaseCommitCohortProxy.class);
44 private static final MessageSupplier COMMIT_MESSAGE_SUPPLIER = new MessageSupplier() {
46 public Object newMessage(final TransactionIdentifier transactionId, final short version) {
47 return new CommitTransaction(transactionId, version).toSerializable();
51 public boolean isSerializedReplyType(final Object reply) {
52 return CommitTransactionReply.isSerializedType(reply);
56 private static final MessageSupplier ABORT_MESSAGE_SUPPLIER = new MessageSupplier() {
58 public Object newMessage(final TransactionIdentifier transactionId, final short version) {
59 return new AbortTransaction(transactionId, version).toSerializable();
63 public boolean isSerializedReplyType(final Object reply) {
64 return AbortTransactionReply.isSerializedType(reply);
68 private final ActorContext actorContext;
69 private final List<CohortInfo> cohorts;
70 private final SettableFuture<Void> cohortsResolvedFuture = SettableFuture.create();
71 private final TransactionIdentifier transactionId;
72 private volatile OperationCallback commitOperationCallback;
74 public ThreePhaseCommitCohortProxy(final ActorContext actorContext, final List<CohortInfo> cohorts,
75 final TransactionIdentifier transactionId) {
76 this.actorContext = actorContext;
77 this.cohorts = cohorts;
78 this.transactionId = Preconditions.checkNotNull(transactionId);
80 if (cohorts.isEmpty()) {
81 cohortsResolvedFuture.set(null);
85 private ListenableFuture<Void> resolveCohorts() {
86 if (cohortsResolvedFuture.isDone()) {
87 return cohortsResolvedFuture;
90 final AtomicInteger completed = new AtomicInteger(cohorts.size());
91 final Object lock = new Object();
92 for (final CohortInfo info: cohorts) {
93 info.getActorFuture().onComplete(new OnComplete<ActorSelection>() {
95 public void onComplete(final Throwable failure, final ActorSelection actor) {
97 boolean done = completed.decrementAndGet() == 0;
98 if (failure != null) {
99 LOG.debug("Tx {}: a cohort Future failed", transactionId, failure);
100 cohortsResolvedFuture.setException(failure);
101 } else if (!cohortsResolvedFuture.isDone()) {
102 LOG.debug("Tx {}: cohort actor {} resolved", transactionId, actor);
104 info.setResolvedActor(actor);
106 LOG.debug("Tx {}: successfully resolved all cohort actors", transactionId);
107 cohortsResolvedFuture.set(null);
112 }, actorContext.getClientDispatcher());
115 return cohortsResolvedFuture;
119 public ListenableFuture<Boolean> canCommit() {
120 LOG.debug("Tx {} canCommit", transactionId);
122 final SettableFuture<Boolean> returnFuture = SettableFuture.create();
124 // The first phase of canCommit is to gather the list of cohort actor paths that will
125 // participate in the commit. buildCohortPathsList combines the cohort path Futures into
126 // one Future which we wait on asynchronously here. The cohort actor paths are
127 // extracted from ReadyTransactionReply messages by the Futures that were obtained earlier
128 // and passed to us from upstream processing. If any one fails then we'll fail canCommit.
130 Futures.addCallback(resolveCohorts(), new FutureCallback<Void>() {
132 public void onSuccess(final Void notUsed) {
133 finishCanCommit(returnFuture);
137 public void onFailure(final Throwable failure) {
138 returnFuture.setException(failure);
140 }, MoreExecutors.directExecutor());
145 private void finishCanCommit(final SettableFuture<Boolean> returnFuture) {
146 LOG.debug("Tx {} finishCanCommit", transactionId);
148 // For empty transactions return immediately
149 if (cohorts.size() == 0) {
150 LOG.debug("Tx {}: canCommit returning result true", transactionId);
151 returnFuture.set(Boolean.TRUE);
155 commitOperationCallback = new TransactionRateLimitingCallback(actorContext);
156 commitOperationCallback.run();
158 final Iterator<CohortInfo> iterator = cohorts.iterator();
160 final OnComplete<Object> onComplete = new OnComplete<Object>() {
162 public void onComplete(final Throwable failure, final Object response) {
163 if (failure != null) {
164 LOG.debug("Tx {}: a canCommit cohort Future failed", transactionId, failure);
166 returnFuture.setException(failure);
167 commitOperationCallback.failure();
171 // Only the first call to pause takes effect - subsequent calls before resume are no-ops. So
172 // this means we'll only time the first transaction canCommit which should be fine.
173 commitOperationCallback.pause();
175 boolean result = true;
176 if (CanCommitTransactionReply.isSerializedType(response)) {
177 CanCommitTransactionReply reply = CanCommitTransactionReply.fromSerializable(response);
179 LOG.debug("Tx {}: received {}", transactionId, response);
181 if (!reply.getCanCommit()) {
185 LOG.error("Unexpected response type {}", response.getClass());
186 returnFuture.setException(new IllegalArgumentException(
187 String.format("Unexpected response type %s", response.getClass())));
191 if (iterator.hasNext() && result) {
192 sendCanCommitTransaction(iterator.next(), this);
194 LOG.debug("Tx {}: canCommit returning result: {}", transactionId, result);
195 returnFuture.set(Boolean.valueOf(result));
201 sendCanCommitTransaction(iterator.next(), onComplete);
204 private void sendCanCommitTransaction(final CohortInfo toCohortInfo, final OnComplete<Object> onComplete) {
205 CanCommitTransaction message = new CanCommitTransaction(transactionId, toCohortInfo.getActorVersion());
207 LOG.debug("Tx {}: sending {} to {}", transactionId, message, toCohortInfo.getResolvedActor());
209 Future<Object> future = actorContext.executeOperationAsync(toCohortInfo.getResolvedActor(),
210 message.toSerializable(), actorContext.getTransactionCommitOperationTimeout());
211 future.onComplete(onComplete, actorContext.getClientDispatcher());
214 private Future<Iterable<Object>> invokeCohorts(final MessageSupplier messageSupplier) {
215 List<Future<Object>> futureList = Lists.newArrayListWithCapacity(cohorts.size());
216 for (CohortInfo cohort : cohorts) {
217 Object message = messageSupplier.newMessage(transactionId, cohort.getActorVersion());
219 LOG.debug("Tx {}: Sending {} to cohort {}", transactionId, message , cohort.getResolvedActor());
221 futureList.add(actorContext.executeOperationAsync(cohort.getResolvedActor(), message,
222 actorContext.getTransactionCommitOperationTimeout()));
225 return akka.dispatch.Futures.sequence(futureList, actorContext.getClientDispatcher());
229 public ListenableFuture<Void> preCommit() {
230 // We don't need to do anything here - preCommit is done atomically with the commit phase
232 return IMMEDIATE_VOID_SUCCESS;
236 public ListenableFuture<Void> abort() {
237 // Note - we pass false for propagateException. In the front-end data broker, this method
238 // is called when one of the 3 phases fails with an exception. We'd rather have that
239 // original exception propagated to the client. If our abort fails and we propagate the
240 // exception then that exception will supersede and suppress the original exception. But
241 // it's the original exception that is the root cause and of more interest to the client.
243 return voidOperation("abort", ABORT_MESSAGE_SUPPLIER,
244 AbortTransactionReply.class, false, OperationCallback.NO_OP_CALLBACK);
248 public ListenableFuture<Void> commit() {
249 OperationCallback operationCallback = commitOperationCallback != null ? commitOperationCallback :
250 OperationCallback.NO_OP_CALLBACK;
252 return voidOperation("commit", COMMIT_MESSAGE_SUPPLIER,
253 CommitTransactionReply.class, true, operationCallback);
256 @SuppressWarnings("checkstyle:IllegalCatch")
257 private static boolean successfulFuture(final ListenableFuture<Void> future) {
258 if (!future.isDone()) {
265 } catch (Exception e) {
270 private ListenableFuture<Void> voidOperation(final String operationName,
271 final MessageSupplier messageSupplier, final Class<?> expectedResponseClass,
272 final boolean propagateException, final OperationCallback callback) {
273 LOG.debug("Tx {} {}", transactionId, operationName);
275 final SettableFuture<Void> returnFuture = SettableFuture.create();
277 // The cohort actor list should already be built at this point by the canCommit phase but,
278 // if not for some reason, we'll try to build it here.
280 ListenableFuture<Void> future = resolveCohorts();
281 if (successfulFuture(future)) {
282 finishVoidOperation(operationName, messageSupplier, expectedResponseClass, propagateException,
283 returnFuture, callback);
285 Futures.addCallback(future, new FutureCallback<Void>() {
287 public void onSuccess(final Void notUsed) {
288 finishVoidOperation(operationName, messageSupplier, expectedResponseClass,
289 propagateException, returnFuture, callback);
293 public void onFailure(final Throwable failure) {
294 LOG.debug("Tx {}: a {} cohort path Future failed: {}", transactionId, operationName, failure);
296 if (propagateException) {
297 returnFuture.setException(failure);
299 returnFuture.set(null);
302 }, MoreExecutors.directExecutor());
308 private void finishVoidOperation(final String operationName, final MessageSupplier messageSupplier,
309 final Class<?> expectedResponseClass, final boolean propagateException,
310 final SettableFuture<Void> returnFuture, final OperationCallback callback) {
311 LOG.debug("Tx {} finish {}", transactionId, operationName);
315 Future<Iterable<Object>> combinedFuture = invokeCohorts(messageSupplier);
317 combinedFuture.onComplete(new OnComplete<Iterable<Object>>() {
319 public void onComplete(final Throwable failure, final Iterable<Object> responses) throws Throwable {
320 Throwable exceptionToPropagate = failure;
321 if (exceptionToPropagate == null) {
322 for (Object response: responses) {
323 if (!response.getClass().equals(expectedResponseClass)) {
324 exceptionToPropagate = new IllegalArgumentException(
325 String.format("Unexpected response type %s", response.getClass()));
331 if (exceptionToPropagate != null) {
332 LOG.debug("Tx {}: a {} cohort Future failed", transactionId, operationName, exceptionToPropagate);
333 if (propagateException) {
334 // We don't log the exception here to avoid redundant logging since we're
335 // propagating to the caller in MD-SAL core who will log it.
336 returnFuture.setException(exceptionToPropagate);
338 // Since the caller doesn't want us to propagate the exception we'll also
339 // not log it normally. But it's usually not good to totally silence
340 // exceptions so we'll log it to debug level.
341 returnFuture.set(null);
346 LOG.debug("Tx {}: {} succeeded", transactionId, operationName);
348 returnFuture.set(null);
353 }, actorContext.getClientDispatcher());
357 List<Future<ActorSelection>> getCohortFutures() {
358 List<Future<ActorSelection>> cohortFutures = new ArrayList<>(cohorts.size());
359 for (CohortInfo info: cohorts) {
360 cohortFutures.add(info.getActorFuture());
363 return cohortFutures;
366 static class CohortInfo {
367 private final Future<ActorSelection> actorFuture;
368 private volatile ActorSelection resolvedActor;
369 private final Supplier<Short> actorVersionSupplier;
371 CohortInfo(final Future<ActorSelection> actorFuture, final Supplier<Short> actorVersionSupplier) {
372 this.actorFuture = actorFuture;
373 this.actorVersionSupplier = actorVersionSupplier;
376 Future<ActorSelection> getActorFuture() {
380 ActorSelection getResolvedActor() {
381 return resolvedActor;
384 void setResolvedActor(final ActorSelection resolvedActor) {
385 this.resolvedActor = resolvedActor;
388 short getActorVersion() {
389 Preconditions.checkState(resolvedActor != null,
390 "getActorVersion cannot be called until the actor is resolved");
391 return actorVersionSupplier.get();
395 private interface MessageSupplier {
396 Object newMessage(TransactionIdentifier transactionId, short version);
398 boolean isSerializedReplyType(Object reply);