2 * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
9 package org.opendaylight.controller.cluster.datastore;
11 import akka.actor.ActorSelection;
12 import akka.dispatch.Futures;
13 import akka.dispatch.OnComplete;
14 import com.google.common.collect.Lists;
15 import com.google.common.util.concurrent.ListenableFuture;
16 import com.google.common.util.concurrent.SettableFuture;
17 import java.util.Collections;
18 import java.util.Iterator;
19 import java.util.List;
20 import org.opendaylight.controller.cluster.datastore.messages.AbortTransaction;
21 import org.opendaylight.controller.cluster.datastore.messages.AbortTransactionReply;
22 import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransaction;
23 import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransactionReply;
24 import org.opendaylight.controller.cluster.datastore.messages.CommitTransaction;
25 import org.opendaylight.controller.cluster.datastore.messages.CommitTransactionReply;
26 import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
27 import org.slf4j.Logger;
28 import org.slf4j.LoggerFactory;
29 import scala.concurrent.Future;
30 import scala.runtime.AbstractFunction1;
33 * ThreePhaseCommitCohortProxy represents a set of remote cohort proxies
35 public class ThreePhaseCommitCohortProxy extends AbstractThreePhaseCommitCohort<ActorSelection> {
37 private static final Logger LOG = LoggerFactory.getLogger(ThreePhaseCommitCohortProxy.class);
39 private final ActorContext actorContext;
40 private final List<Future<ActorSelection>> cohortFutures;
41 private volatile List<ActorSelection> cohorts;
42 private final String transactionId;
43 private volatile OperationCallback commitOperationCallback;
45 public ThreePhaseCommitCohortProxy(ActorContext actorContext,
46 List<Future<ActorSelection>> cohortFutures, String transactionId) {
47 this.actorContext = actorContext;
48 this.cohortFutures = cohortFutures;
49 this.transactionId = transactionId;
52 private Future<Void> buildCohortList() {
54 Future<Iterable<ActorSelection>> combinedFutures = Futures.sequence(cohortFutures,
55 actorContext.getClientDispatcher());
57 return combinedFutures.transform(new AbstractFunction1<Iterable<ActorSelection>, Void>() {
59 public Void apply(Iterable<ActorSelection> actorSelections) {
60 cohorts = Lists.newArrayList(actorSelections);
61 if(LOG.isDebugEnabled()) {
62 LOG.debug("Tx {} successfully built cohort path list: {}",
63 transactionId, cohorts);
67 }, TransactionReadyReplyMapper.SAME_FAILURE_TRANSFORMER, actorContext.getClientDispatcher());
71 public ListenableFuture<Boolean> canCommit() {
72 if(LOG.isDebugEnabled()) {
73 LOG.debug("Tx {} canCommit", transactionId);
75 final SettableFuture<Boolean> returnFuture = SettableFuture.create();
77 // The first phase of canCommit is to gather the list of cohort actor paths that will
78 // participate in the commit. buildCohortPathsList combines the cohort path Futures into
79 // one Future which we wait on asynchronously here. The cohort actor paths are
80 // extracted from ReadyTransactionReply messages by the Futures that were obtained earlier
81 // and passed to us from upstream processing. If any one fails then we'll fail canCommit.
83 buildCohortList().onComplete(new OnComplete<Void>() {
85 public void onComplete(Throwable failure, Void notUsed) throws Throwable {
87 if(LOG.isDebugEnabled()) {
88 LOG.debug("Tx {}: a cohort Future failed: {}", transactionId, failure);
90 returnFuture.setException(failure);
92 finishCanCommit(returnFuture);
95 }, actorContext.getClientDispatcher());
100 private void finishCanCommit(final SettableFuture<Boolean> returnFuture) {
101 if(LOG.isDebugEnabled()) {
102 LOG.debug("Tx {} finishCanCommit", transactionId);
105 // For empty transactions return immediately
106 if(cohorts.size() == 0){
107 if(LOG.isDebugEnabled()) {
108 LOG.debug("Tx {}: canCommit returning result: {}", transactionId, true);
110 returnFuture.set(Boolean.TRUE);
114 commitOperationCallback = cohortFutures.isEmpty() ? OperationCallback.NO_OP_CALLBACK :
115 new TransactionRateLimitingCallback(actorContext);
117 commitOperationCallback.run();
119 final Object message = new CanCommitTransaction(transactionId).toSerializable();
121 final Iterator<ActorSelection> iterator = cohorts.iterator();
123 final OnComplete<Object> onComplete = new OnComplete<Object>() {
125 public void onComplete(Throwable failure, Object response) throws Throwable {
126 if (failure != null) {
127 if (LOG.isDebugEnabled()) {
128 LOG.debug("Tx {}: a canCommit cohort Future failed: {}", transactionId, failure);
130 returnFuture.setException(failure);
131 commitOperationCallback.failure();
135 // Only the first call to pause takes effect - subsequent calls before resume are no-ops. So
136 // this means we'll only time the first transaction canCommit which should be fine.
137 commitOperationCallback.pause();
139 boolean result = true;
140 if (response.getClass().equals(CanCommitTransactionReply.SERIALIZABLE_CLASS)) {
141 CanCommitTransactionReply reply =
142 CanCommitTransactionReply.fromSerializable(response);
143 if (!reply.getCanCommit()) {
147 LOG.error("Unexpected response type {}", response.getClass());
148 returnFuture.setException(new IllegalArgumentException(
149 String.format("Unexpected response type %s", response.getClass())));
153 if(iterator.hasNext() && result){
154 Future<Object> future = actorContext.executeOperationAsync(iterator.next(), message,
155 actorContext.getTransactionCommitOperationTimeout());
156 future.onComplete(this, actorContext.getClientDispatcher());
158 if(LOG.isDebugEnabled()) {
159 LOG.debug("Tx {}: canCommit returning result: {}", transactionId, result);
161 returnFuture.set(Boolean.valueOf(result));
167 Future<Object> future = actorContext.executeOperationAsync(iterator.next(), message,
168 actorContext.getTransactionCommitOperationTimeout());
169 future.onComplete(onComplete, actorContext.getClientDispatcher());
172 private Future<Iterable<Object>> invokeCohorts(Object message) {
173 List<Future<Object>> futureList = Lists.newArrayListWithCapacity(cohorts.size());
174 for(ActorSelection cohort : cohorts) {
175 if(LOG.isDebugEnabled()) {
176 LOG.debug("Tx {}: Sending {} to cohort {}", transactionId, message, cohort);
178 futureList.add(actorContext.executeOperationAsync(cohort, message, actorContext.getTransactionCommitOperationTimeout()));
181 return Futures.sequence(futureList, actorContext.getClientDispatcher());
185 public ListenableFuture<Void> preCommit() {
186 // We don't need to do anything here - preCommit is done atomically with the commit phase
188 return IMMEDIATE_VOID_SUCCESS;
192 public ListenableFuture<Void> abort() {
193 // Note - we pass false for propagateException. In the front-end data broker, this method
194 // is called when one of the 3 phases fails with an exception. We'd rather have that
195 // original exception propagated to the client. If our abort fails and we propagate the
196 // exception then that exception will supersede and suppress the original exception. But
197 // it's the original exception that is the root cause and of more interest to the client.
199 return voidOperation("abort", new AbortTransaction(transactionId).toSerializable(),
200 AbortTransactionReply.SERIALIZABLE_CLASS, false);
204 public ListenableFuture<Void> commit() {
205 OperationCallback operationCallback = commitOperationCallback != null ? commitOperationCallback :
206 OperationCallback.NO_OP_CALLBACK;
208 return voidOperation("commit", new CommitTransaction(transactionId).toSerializable(),
209 CommitTransactionReply.SERIALIZABLE_CLASS, true, operationCallback);
212 private ListenableFuture<Void> voidOperation(final String operationName, final Object message,
213 final Class<?> expectedResponseClass, final boolean propagateException) {
214 return voidOperation(operationName, message, expectedResponseClass, propagateException,
215 OperationCallback.NO_OP_CALLBACK);
218 private ListenableFuture<Void> voidOperation(final String operationName, final Object message,
219 final Class<?> expectedResponseClass, final boolean propagateException, final OperationCallback callback) {
221 if(LOG.isDebugEnabled()) {
222 LOG.debug("Tx {} {}", transactionId, operationName);
224 final SettableFuture<Void> returnFuture = SettableFuture.create();
226 // The cohort actor list should already be built at this point by the canCommit phase but,
227 // if not for some reason, we'll try to build it here.
229 if(cohorts != null) {
230 finishVoidOperation(operationName, message, expectedResponseClass, propagateException,
231 returnFuture, callback);
233 buildCohortList().onComplete(new OnComplete<Void>() {
235 public void onComplete(Throwable failure, Void notUsed) throws Throwable {
236 if(failure != null) {
237 if(LOG.isDebugEnabled()) {
238 LOG.debug("Tx {}: a {} cohort path Future failed: {}", transactionId,
239 operationName, failure);
241 if(propagateException) {
242 returnFuture.setException(failure);
244 returnFuture.set(null);
247 finishVoidOperation(operationName, message, expectedResponseClass,
248 propagateException, returnFuture, callback);
251 }, actorContext.getClientDispatcher());
257 private void finishVoidOperation(final String operationName, final Object message,
258 final Class<?> expectedResponseClass, final boolean propagateException,
259 final SettableFuture<Void> returnFuture, final OperationCallback callback) {
260 if(LOG.isDebugEnabled()) {
261 LOG.debug("Tx {} finish {}", transactionId, operationName);
266 Future<Iterable<Object>> combinedFuture = invokeCohorts(message);
268 combinedFuture.onComplete(new OnComplete<Iterable<Object>>() {
270 public void onComplete(Throwable failure, Iterable<Object> responses) throws Throwable {
272 Throwable exceptionToPropagate = failure;
273 if(exceptionToPropagate == null) {
274 for(Object response: responses) {
275 if(!response.getClass().equals(expectedResponseClass)) {
276 exceptionToPropagate = new IllegalArgumentException(
277 String.format("Unexpected response type %s",
278 response.getClass()));
284 if(exceptionToPropagate != null) {
286 if(LOG.isDebugEnabled()) {
287 LOG.debug("Tx {}: a {} cohort Future failed: {}", transactionId,
288 operationName, exceptionToPropagate);
290 if(propagateException) {
291 // We don't log the exception here to avoid redundant logging since we're
292 // propagating to the caller in MD-SAL core who will log it.
293 returnFuture.setException(exceptionToPropagate);
295 // Since the caller doesn't want us to propagate the exception we'll also
296 // not log it normally. But it's usually not good to totally silence
297 // exceptions so we'll log it to debug level.
298 if(LOG.isDebugEnabled()) {
299 LOG.debug(String.format("%s failed", message.getClass().getSimpleName()),
300 exceptionToPropagate);
302 returnFuture.set(null);
308 if(LOG.isDebugEnabled()) {
309 LOG.debug("Tx {}: {} succeeded", transactionId, operationName);
311 returnFuture.set(null);
316 }, actorContext.getClientDispatcher());
320 List<Future<ActorSelection>> getCohortFutures() {
321 return Collections.unmodifiableList(cohortFutures);