2 * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
9 package org.opendaylight.controller.cluster.datastore;
11 import akka.actor.ActorSelection;
12 import akka.dispatch.Futures;
13 import akka.dispatch.OnComplete;
14 import com.google.common.annotations.VisibleForTesting;
15 import com.google.common.collect.Lists;
16 import com.google.common.util.concurrent.ListenableFuture;
17 import com.google.common.util.concurrent.SettableFuture;
18 import java.util.Collections;
19 import java.util.List;
20 import org.opendaylight.controller.cluster.datastore.messages.AbortTransaction;
21 import org.opendaylight.controller.cluster.datastore.messages.AbortTransactionReply;
22 import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransaction;
23 import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransactionReply;
24 import org.opendaylight.controller.cluster.datastore.messages.CommitTransaction;
25 import org.opendaylight.controller.cluster.datastore.messages.CommitTransactionReply;
26 import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
27 import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
28 import org.slf4j.Logger;
29 import org.slf4j.LoggerFactory;
30 import scala.concurrent.Future;
31 import scala.runtime.AbstractFunction1;
34 * ThreePhaseCommitCohortProxy represents a set of remote cohort proxies
36 public class ThreePhaseCommitCohortProxy implements DOMStoreThreePhaseCommitCohort{
38 private static final Logger LOG = LoggerFactory.getLogger(ThreePhaseCommitCohortProxy.class);
40 private static final ListenableFuture<Void> IMMEDIATE_SUCCESS =
41 com.google.common.util.concurrent.Futures.immediateFuture(null);
43 private final ActorContext actorContext;
44 private final List<Future<ActorSelection>> cohortFutures;
45 private volatile List<ActorSelection> cohorts;
46 private final String transactionId;
48 public ThreePhaseCommitCohortProxy(ActorContext actorContext,
49 List<Future<ActorSelection>> cohortFutures, String transactionId) {
50 this.actorContext = actorContext;
51 this.cohortFutures = cohortFutures;
52 this.transactionId = transactionId;
55 private Future<Void> buildCohortList() {
57 Future<Iterable<ActorSelection>> combinedFutures = Futures.sequence(cohortFutures,
58 actorContext.getActorSystem().dispatcher());
60 return combinedFutures.transform(new AbstractFunction1<Iterable<ActorSelection>, Void>() {
62 public Void apply(Iterable<ActorSelection> actorSelections) {
63 cohorts = Lists.newArrayList(actorSelections);
64 if(LOG.isDebugEnabled()) {
65 LOG.debug("Tx {} successfully built cohort path list: {}",
66 transactionId, cohorts);
70 }, TransactionProxy.SAME_FAILURE_TRANSFORMER, actorContext.getActorSystem().dispatcher());
74 public ListenableFuture<Boolean> canCommit() {
75 if(LOG.isDebugEnabled()) {
76 LOG.debug("Tx {} canCommit", transactionId);
78 final SettableFuture<Boolean> returnFuture = SettableFuture.create();
80 // The first phase of canCommit is to gather the list of cohort actor paths that will
81 // participate in the commit. buildCohortPathsList combines the cohort path Futures into
82 // one Future which we wait on asynchronously here. The cohort actor paths are
83 // extracted from ReadyTransactionReply messages by the Futures that were obtained earlier
84 // and passed to us from upstream processing. If any one fails then we'll fail canCommit.
86 buildCohortList().onComplete(new OnComplete<Void>() {
88 public void onComplete(Throwable failure, Void notUsed) throws Throwable {
90 if(LOG.isDebugEnabled()) {
91 LOG.debug("Tx {}: a cohort Future failed: {}", transactionId, failure);
93 returnFuture.setException(failure);
95 finishCanCommit(returnFuture);
98 }, actorContext.getActorSystem().dispatcher());
103 private void finishCanCommit(final SettableFuture<Boolean> returnFuture) {
104 if(LOG.isDebugEnabled()) {
105 LOG.debug("Tx {} finishCanCommit", transactionId);
107 // The last phase of canCommit is to invoke all the cohort actors asynchronously to perform
108 // their canCommit processing. If any one fails then we'll fail canCommit.
110 Future<Iterable<Object>> combinedFuture =
111 invokeCohorts(new CanCommitTransaction(transactionId).toSerializable());
113 combinedFuture.onComplete(new OnComplete<Iterable<Object>>() {
115 public void onComplete(Throwable failure, Iterable<Object> responses) throws Throwable {
116 if(failure != null) {
117 if(LOG.isDebugEnabled()) {
118 LOG.debug("Tx {}: a canCommit cohort Future failed: {}", transactionId, failure);
120 returnFuture.setException(failure);
124 boolean result = true;
125 for(Object response: responses) {
126 if (response.getClass().equals(CanCommitTransactionReply.SERIALIZABLE_CLASS)) {
127 CanCommitTransactionReply reply =
128 CanCommitTransactionReply.fromSerializable(response);
129 if (!reply.getCanCommit()) {
134 LOG.error("Unexpected response type {}", response.getClass());
135 returnFuture.setException(new IllegalArgumentException(
136 String.format("Unexpected response type %s", response.getClass())));
140 if(LOG.isDebugEnabled()) {
141 LOG.debug("Tx {}: canCommit returning result: {}", transactionId, result);
143 returnFuture.set(Boolean.valueOf(result));
145 }, actorContext.getActorSystem().dispatcher());
148 private Future<Iterable<Object>> invokeCohorts(Object message) {
149 List<Future<Object>> futureList = Lists.newArrayListWithCapacity(cohorts.size());
150 for(ActorSelection cohort : cohorts) {
151 if(LOG.isDebugEnabled()) {
152 LOG.debug("Tx {}: Sending {} to cohort {}", transactionId, message, cohort);
155 futureList.add(actorContext.executeOperationAsync(cohort, message));
158 return Futures.sequence(futureList, actorContext.getActorSystem().dispatcher());
162 public ListenableFuture<Void> preCommit() {
163 // We don't need to do anything here - preCommit is done atomically with the commit phase
165 return IMMEDIATE_SUCCESS;
169 public ListenableFuture<Void> abort() {
170 // Note - we pass false for propagateException. In the front-end data broker, this method
171 // is called when one of the 3 phases fails with an exception. We'd rather have that
172 // original exception propagated to the client. If our abort fails and we propagate the
173 // exception then that exception will supersede and suppress the original exception. But
174 // it's the original exception that is the root cause and of more interest to the client.
176 return voidOperation("abort", new AbortTransaction(transactionId).toSerializable(),
177 AbortTransactionReply.SERIALIZABLE_CLASS, false);
181 public ListenableFuture<Void> commit() {
182 return voidOperation("commit", new CommitTransaction(transactionId).toSerializable(),
183 CommitTransactionReply.SERIALIZABLE_CLASS, true);
186 private ListenableFuture<Void> voidOperation(final String operationName, final Object message,
187 final Class<?> expectedResponseClass, final boolean propagateException) {
189 if(LOG.isDebugEnabled()) {
190 LOG.debug("Tx {} {}", transactionId, operationName);
192 final SettableFuture<Void> returnFuture = SettableFuture.create();
194 // The cohort actor list should already be built at this point by the canCommit phase but,
195 // if not for some reason, we'll try to build it here.
197 if(cohorts != null) {
198 finishVoidOperation(operationName, message, expectedResponseClass, propagateException,
201 buildCohortList().onComplete(new OnComplete<Void>() {
203 public void onComplete(Throwable failure, Void notUsed) throws Throwable {
204 if(failure != null) {
205 if(LOG.isDebugEnabled()) {
206 LOG.debug("Tx {}: a {} cohort path Future failed: {}", transactionId,
207 operationName, failure);
209 if(propagateException) {
210 returnFuture.setException(failure);
212 returnFuture.set(null);
215 finishVoidOperation(operationName, message, expectedResponseClass,
216 propagateException, returnFuture);
219 }, actorContext.getActorSystem().dispatcher());
225 private void finishVoidOperation(final String operationName, final Object message,
226 final Class<?> expectedResponseClass, final boolean propagateException,
227 final SettableFuture<Void> returnFuture) {
228 if(LOG.isDebugEnabled()) {
229 LOG.debug("Tx {} finish {}", transactionId, operationName);
231 Future<Iterable<Object>> combinedFuture = invokeCohorts(message);
233 combinedFuture.onComplete(new OnComplete<Iterable<Object>>() {
235 public void onComplete(Throwable failure, Iterable<Object> responses) throws Throwable {
237 Throwable exceptionToPropagate = failure;
238 if(exceptionToPropagate == null) {
239 for(Object response: responses) {
240 if(!response.getClass().equals(expectedResponseClass)) {
241 exceptionToPropagate = new IllegalArgumentException(
242 String.format("Unexpected response type %s",
243 response.getClass()));
249 if(exceptionToPropagate != null) {
250 if(LOG.isDebugEnabled()) {
251 LOG.debug("Tx {}: a {} cohort Future failed: {}", transactionId,
252 operationName, exceptionToPropagate);
254 if(propagateException) {
255 // We don't log the exception here to avoid redundant logging since we're
256 // propagating to the caller in MD-SAL core who will log it.
257 returnFuture.setException(exceptionToPropagate);
259 // Since the caller doesn't want us to propagate the exception we'll also
260 // not log it normally. But it's usually not good to totally silence
261 // exceptions so we'll log it to debug level.
262 if(LOG.isDebugEnabled()) {
263 LOG.debug(String.format("%s failed", message.getClass().getSimpleName()),
264 exceptionToPropagate);
266 returnFuture.set(null);
269 if(LOG.isDebugEnabled()) {
270 LOG.debug("Tx {}: {} succeeded", transactionId, operationName);
272 returnFuture.set(null);
275 }, actorContext.getActorSystem().dispatcher());
279 List<Future<ActorSelection>> getCohortFutures() {
280 return Collections.unmodifiableList(cohortFutures);