2 * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
9 package org.opendaylight.controller.cluster.datastore;
11 import akka.actor.ActorSelection;
12 import akka.dispatch.Futures;
13 import akka.dispatch.OnComplete;
14 import com.google.common.annotations.VisibleForTesting;
15 import com.google.common.collect.Lists;
16 import com.google.common.util.concurrent.ListenableFuture;
17 import com.google.common.util.concurrent.SettableFuture;
18 import java.util.Collections;
19 import java.util.Iterator;
20 import java.util.List;
21 import org.opendaylight.controller.cluster.datastore.messages.AbortTransaction;
22 import org.opendaylight.controller.cluster.datastore.messages.AbortTransactionReply;
23 import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransaction;
24 import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransactionReply;
25 import org.opendaylight.controller.cluster.datastore.messages.CommitTransaction;
26 import org.opendaylight.controller.cluster.datastore.messages.CommitTransactionReply;
27 import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
28 import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
29 import org.slf4j.Logger;
30 import org.slf4j.LoggerFactory;
31 import scala.concurrent.Future;
32 import scala.runtime.AbstractFunction1;
35 * ThreePhaseCommitCohortProxy represents a set of remote cohort proxies
37 public class ThreePhaseCommitCohortProxy implements DOMStoreThreePhaseCommitCohort{
39 private static final Logger LOG = LoggerFactory.getLogger(ThreePhaseCommitCohortProxy.class);
41 private static final ListenableFuture<Void> IMMEDIATE_SUCCESS =
42 com.google.common.util.concurrent.Futures.immediateFuture(null);
44 private final ActorContext actorContext;
45 private final List<Future<ActorSelection>> cohortFutures;
46 private volatile List<ActorSelection> cohorts;
47 private final String transactionId;
48 private static final OperationCallback NO_OP_CALLBACK = new OperationCallback() {
54 public void success() {
58 public void failure() {
62 public ThreePhaseCommitCohortProxy(ActorContext actorContext,
63 List<Future<ActorSelection>> cohortFutures, String transactionId) {
64 this.actorContext = actorContext;
65 this.cohortFutures = cohortFutures;
66 this.transactionId = transactionId;
69 private Future<Void> buildCohortList() {
71 Future<Iterable<ActorSelection>> combinedFutures = Futures.sequence(cohortFutures,
72 actorContext.getClientDispatcher());
74 return combinedFutures.transform(new AbstractFunction1<Iterable<ActorSelection>, Void>() {
76 public Void apply(Iterable<ActorSelection> actorSelections) {
77 cohorts = Lists.newArrayList(actorSelections);
78 if(LOG.isDebugEnabled()) {
79 LOG.debug("Tx {} successfully built cohort path list: {}",
80 transactionId, cohorts);
84 }, TransactionProxy.SAME_FAILURE_TRANSFORMER, actorContext.getClientDispatcher());
88 public ListenableFuture<Boolean> canCommit() {
89 if(LOG.isDebugEnabled()) {
90 LOG.debug("Tx {} canCommit", transactionId);
92 final SettableFuture<Boolean> returnFuture = SettableFuture.create();
94 // The first phase of canCommit is to gather the list of cohort actor paths that will
95 // participate in the commit. buildCohortPathsList combines the cohort path Futures into
96 // one Future which we wait on asynchronously here. The cohort actor paths are
97 // extracted from ReadyTransactionReply messages by the Futures that were obtained earlier
98 // and passed to us from upstream processing. If any one fails then we'll fail canCommit.
100 buildCohortList().onComplete(new OnComplete<Void>() {
102 public void onComplete(Throwable failure, Void notUsed) throws Throwable {
103 if(failure != null) {
104 if(LOG.isDebugEnabled()) {
105 LOG.debug("Tx {}: a cohort Future failed: {}", transactionId, failure);
107 returnFuture.setException(failure);
109 finishCanCommit(returnFuture);
112 }, actorContext.getClientDispatcher());
117 private void finishCanCommit(final SettableFuture<Boolean> returnFuture) {
118 if(LOG.isDebugEnabled()) {
119 LOG.debug("Tx {} finishCanCommit", transactionId);
122 // For empty transactions return immediately
123 if(cohorts.size() == 0){
124 if(LOG.isDebugEnabled()) {
125 LOG.debug("Tx {}: canCommit returning result: {}", transactionId, true);
127 returnFuture.set(Boolean.TRUE);
131 final Object message = new CanCommitTransaction(transactionId).toSerializable();
133 final Iterator<ActorSelection> iterator = cohorts.iterator();
135 final OnComplete<Object> onComplete = new OnComplete<Object>() {
137 public void onComplete(Throwable failure, Object response) throws Throwable {
138 if (failure != null) {
139 if (LOG.isDebugEnabled()) {
140 LOG.debug("Tx {}: a canCommit cohort Future failed: {}", transactionId, failure);
142 returnFuture.setException(failure);
146 boolean result = true;
147 if (response.getClass().equals(CanCommitTransactionReply.SERIALIZABLE_CLASS)) {
148 CanCommitTransactionReply reply =
149 CanCommitTransactionReply.fromSerializable(response);
150 if (!reply.getCanCommit()) {
154 LOG.error("Unexpected response type {}", response.getClass());
155 returnFuture.setException(new IllegalArgumentException(
156 String.format("Unexpected response type %s", response.getClass())));
160 if(iterator.hasNext() && result){
161 Future<Object> future = actorContext.executeOperationAsync(iterator.next(), message,
162 actorContext.getTransactionCommitOperationTimeout());
163 future.onComplete(this, actorContext.getClientDispatcher());
165 if(LOG.isDebugEnabled()) {
166 LOG.debug("Tx {}: canCommit returning result: {}", transactionId, result);
168 returnFuture.set(Boolean.valueOf(result));
174 Future<Object> future = actorContext.executeOperationAsync(iterator.next(), message,
175 actorContext.getTransactionCommitOperationTimeout());
176 future.onComplete(onComplete, actorContext.getClientDispatcher());
179 private Future<Iterable<Object>> invokeCohorts(Object message) {
180 List<Future<Object>> futureList = Lists.newArrayListWithCapacity(cohorts.size());
181 for(ActorSelection cohort : cohorts) {
182 if(LOG.isDebugEnabled()) {
183 LOG.debug("Tx {}: Sending {} to cohort {}", transactionId, message, cohort);
185 futureList.add(actorContext.executeOperationAsync(cohort, message, actorContext.getTransactionCommitOperationTimeout()));
188 return Futures.sequence(futureList, actorContext.getClientDispatcher());
192 public ListenableFuture<Void> preCommit() {
193 // We don't need to do anything here - preCommit is done atomically with the commit phase
195 return IMMEDIATE_SUCCESS;
199 public ListenableFuture<Void> abort() {
200 // Note - we pass false for propagateException. In the front-end data broker, this method
201 // is called when one of the 3 phases fails with an exception. We'd rather have that
202 // original exception propagated to the client. If our abort fails and we propagate the
203 // exception then that exception will supersede and suppress the original exception. But
204 // it's the original exception that is the root cause and of more interest to the client.
206 return voidOperation("abort", new AbortTransaction(transactionId).toSerializable(),
207 AbortTransactionReply.SERIALIZABLE_CLASS, false);
211 public ListenableFuture<Void> commit() {
212 OperationCallback operationCallback = cohortFutures.isEmpty() ? NO_OP_CALLBACK :
213 new TransactionRateLimitingCallback(actorContext);
215 return voidOperation("commit", new CommitTransaction(transactionId).toSerializable(),
216 CommitTransactionReply.SERIALIZABLE_CLASS, true, operationCallback);
219 private ListenableFuture<Void> voidOperation(final String operationName, final Object message,
220 final Class<?> expectedResponseClass, final boolean propagateException) {
221 return voidOperation(operationName, message, expectedResponseClass, propagateException, NO_OP_CALLBACK);
224 private ListenableFuture<Void> voidOperation(final String operationName, final Object message,
225 final Class<?> expectedResponseClass, final boolean propagateException, final OperationCallback callback) {
227 if(LOG.isDebugEnabled()) {
228 LOG.debug("Tx {} {}", transactionId, operationName);
230 final SettableFuture<Void> returnFuture = SettableFuture.create();
232 // The cohort actor list should already be built at this point by the canCommit phase but,
233 // if not for some reason, we'll try to build it here.
235 if(cohorts != null) {
236 finishVoidOperation(operationName, message, expectedResponseClass, propagateException,
237 returnFuture, callback);
239 buildCohortList().onComplete(new OnComplete<Void>() {
241 public void onComplete(Throwable failure, Void notUsed) throws Throwable {
242 if(failure != null) {
243 if(LOG.isDebugEnabled()) {
244 LOG.debug("Tx {}: a {} cohort path Future failed: {}", transactionId,
245 operationName, failure);
247 if(propagateException) {
248 returnFuture.setException(failure);
250 returnFuture.set(null);
253 finishVoidOperation(operationName, message, expectedResponseClass,
254 propagateException, returnFuture, callback);
257 }, actorContext.getClientDispatcher());
263 private void finishVoidOperation(final String operationName, final Object message,
264 final Class<?> expectedResponseClass, final boolean propagateException,
265 final SettableFuture<Void> returnFuture, final OperationCallback callback) {
266 if(LOG.isDebugEnabled()) {
267 LOG.debug("Tx {} finish {}", transactionId, operationName);
272 Future<Iterable<Object>> combinedFuture = invokeCohorts(message);
274 combinedFuture.onComplete(new OnComplete<Iterable<Object>>() {
276 public void onComplete(Throwable failure, Iterable<Object> responses) throws Throwable {
278 Throwable exceptionToPropagate = failure;
279 if(exceptionToPropagate == null) {
280 for(Object response: responses) {
281 if(!response.getClass().equals(expectedResponseClass)) {
282 exceptionToPropagate = new IllegalArgumentException(
283 String.format("Unexpected response type %s",
284 response.getClass()));
290 if(exceptionToPropagate != null) {
292 if(LOG.isDebugEnabled()) {
293 LOG.debug("Tx {}: a {} cohort Future failed: {}", transactionId,
294 operationName, exceptionToPropagate);
296 if(propagateException) {
297 // We don't log the exception here to avoid redundant logging since we're
298 // propagating to the caller in MD-SAL core who will log it.
299 returnFuture.setException(exceptionToPropagate);
301 // Since the caller doesn't want us to propagate the exception we'll also
302 // not log it normally. But it's usually not good to totally silence
303 // exceptions so we'll log it to debug level.
304 if(LOG.isDebugEnabled()) {
305 LOG.debug(String.format("%s failed", message.getClass().getSimpleName()),
306 exceptionToPropagate);
308 returnFuture.set(null);
314 if(LOG.isDebugEnabled()) {
315 LOG.debug("Tx {}: {} succeeded", transactionId, operationName);
317 returnFuture.set(null);
322 }, actorContext.getClientDispatcher());
326 List<Future<ActorSelection>> getCohortFutures() {
327 return Collections.unmodifiableList(cohortFutures);