2 * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
9 package org.opendaylight.controller.cluster.datastore;
11 import akka.actor.ActorPath;
12 import akka.actor.ActorSelection;
13 import akka.dispatch.Futures;
14 import akka.dispatch.OnComplete;
16 import com.google.common.annotations.VisibleForTesting;
17 import com.google.common.collect.Lists;
18 import com.google.common.util.concurrent.ListenableFuture;
19 import com.google.common.util.concurrent.SettableFuture;
21 import org.opendaylight.controller.cluster.datastore.messages.AbortTransaction;
22 import org.opendaylight.controller.cluster.datastore.messages.AbortTransactionReply;
23 import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransaction;
24 import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransactionReply;
25 import org.opendaylight.controller.cluster.datastore.messages.CommitTransaction;
26 import org.opendaylight.controller.cluster.datastore.messages.CommitTransactionReply;
27 import org.opendaylight.controller.cluster.datastore.messages.PreCommitTransaction;
28 import org.opendaylight.controller.cluster.datastore.messages.PreCommitTransactionReply;
29 import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
30 import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
31 import org.slf4j.Logger;
32 import org.slf4j.LoggerFactory;
34 import scala.concurrent.Future;
35 import scala.runtime.AbstractFunction1;
37 import java.util.Collections;
38 import java.util.List;
41 * ThreePhaseCommitCohortProxy represents a set of remote cohort proxies
43 public class ThreePhaseCommitCohortProxy implements DOMStoreThreePhaseCommitCohort{
45 private static final Logger LOG = LoggerFactory.getLogger(ThreePhaseCommitCohortProxy.class);
47 private final ActorContext actorContext;
48 private final List<Future<ActorPath>> cohortPathFutures;
49 private volatile List<ActorPath> cohortPaths;
50 private final String transactionId;
52 public ThreePhaseCommitCohortProxy(ActorContext actorContext,
53 List<Future<ActorPath>> cohortPathFutures, String transactionId) {
54 this.actorContext = actorContext;
55 this.cohortPathFutures = cohortPathFutures;
56 this.transactionId = transactionId;
59 private Future<Void> buildCohortPathsList() {
61 Future<Iterable<ActorPath>> combinedFutures = Futures.sequence(cohortPathFutures,
62 actorContext.getActorSystem().dispatcher());
64 return combinedFutures.transform(new AbstractFunction1<Iterable<ActorPath>, Void>() {
66 public Void apply(Iterable<ActorPath> paths) {
67 cohortPaths = Lists.newArrayList(paths);
68 if(LOG.isDebugEnabled()) {
69 LOG.debug("Tx {} successfully built cohort path list: {}",
70 transactionId, cohortPaths);
74 }, TransactionProxy.SAME_FAILURE_TRANSFORMER, actorContext.getActorSystem().dispatcher());
78 public ListenableFuture<Boolean> canCommit() {
79 if(LOG.isDebugEnabled()) {
80 LOG.debug("Tx {} canCommit", transactionId);
82 final SettableFuture<Boolean> returnFuture = SettableFuture.create();
84 // The first phase of canCommit is to gather the list of cohort actor paths that will
85 // participate in the commit. buildCohortPathsList combines the cohort path Futures into
86 // one Future which we wait on asynchronously here. The cohort actor paths are
87 // extracted from ReadyTransactionReply messages by the Futures that were obtained earlier
88 // and passed to us from upstream processing. If any one fails then we'll fail canCommit.
90 buildCohortPathsList().onComplete(new OnComplete<Void>() {
92 public void onComplete(Throwable failure, Void notUsed) throws Throwable {
94 if(LOG.isDebugEnabled()) {
95 LOG.debug("Tx {}: a cohort path Future failed: {}", transactionId, failure);
97 returnFuture.setException(failure);
99 finishCanCommit(returnFuture);
102 }, actorContext.getActorSystem().dispatcher());
107 private void finishCanCommit(final SettableFuture<Boolean> returnFuture) {
108 if(LOG.isDebugEnabled()) {
109 LOG.debug("Tx {} finishCanCommit", transactionId);
111 // The last phase of canCommit is to invoke all the cohort actors asynchronously to perform
112 // their canCommit processing. If any one fails then we'll fail canCommit.
114 Future<Iterable<Object>> combinedFuture =
115 invokeCohorts(new CanCommitTransaction().toSerializable());
117 combinedFuture.onComplete(new OnComplete<Iterable<Object>>() {
119 public void onComplete(Throwable failure, Iterable<Object> responses) throws Throwable {
120 if(failure != null) {
121 if(LOG.isDebugEnabled()) {
122 LOG.debug("Tx {}: a canCommit cohort Future failed: {}", transactionId, failure);
124 returnFuture.setException(failure);
128 boolean result = true;
129 for(Object response: responses) {
130 if (response.getClass().equals(CanCommitTransactionReply.SERIALIZABLE_CLASS)) {
131 CanCommitTransactionReply reply =
132 CanCommitTransactionReply.fromSerializable(response);
133 if (!reply.getCanCommit()) {
138 LOG.error("Unexpected response type {}", response.getClass());
139 returnFuture.setException(new IllegalArgumentException(
140 String.format("Unexpected response type {}", response.getClass())));
144 if(LOG.isDebugEnabled()) {
145 LOG.debug("Tx {}: canCommit returning result: {}", transactionId, result);
147 returnFuture.set(Boolean.valueOf(result));
149 }, actorContext.getActorSystem().dispatcher());
152 private Future<Iterable<Object>> invokeCohorts(Object message) {
153 List<Future<Object>> futureList = Lists.newArrayListWithCapacity(cohortPaths.size());
154 for(ActorPath actorPath : cohortPaths) {
155 if(LOG.isDebugEnabled()) {
156 LOG.debug("Tx {}: Sending {} to cohort {}", transactionId, message, actorPath);
158 ActorSelection cohort = actorContext.actorSelection(actorPath);
160 futureList.add(actorContext.executeOperationAsync(cohort, message));
163 return Futures.sequence(futureList, actorContext.getActorSystem().dispatcher());
167 public ListenableFuture<Void> preCommit() {
168 return voidOperation("preCommit", new PreCommitTransaction().toSerializable(),
169 PreCommitTransactionReply.SERIALIZABLE_CLASS, true);
173 public ListenableFuture<Void> abort() {
174 // Note - we pass false for propagateException. In the front-end data broker, this method
175 // is called when one of the 3 phases fails with an exception. We'd rather have that
176 // original exception propagated to the client. If our abort fails and we propagate the
177 // exception then that exception will supersede and suppress the original exception. But
178 // it's the original exception that is the root cause and of more interest to the client.
180 return voidOperation("abort", new AbortTransaction().toSerializable(),
181 AbortTransactionReply.SERIALIZABLE_CLASS, false);
185 public ListenableFuture<Void> commit() {
186 return voidOperation("commit", new CommitTransaction().toSerializable(),
187 CommitTransactionReply.SERIALIZABLE_CLASS, true);
190 private ListenableFuture<Void> voidOperation(final String operationName, final Object message,
191 final Class<?> expectedResponseClass, final boolean propagateException) {
193 if(LOG.isDebugEnabled()) {
194 LOG.debug("Tx {} {}", transactionId, operationName);
196 final SettableFuture<Void> returnFuture = SettableFuture.create();
198 // The cohort actor list should already be built at this point by the canCommit phase but,
199 // if not for some reason, we'll try to build it here.
201 if(cohortPaths != null) {
202 finishVoidOperation(operationName, message, expectedResponseClass, propagateException,
205 buildCohortPathsList().onComplete(new OnComplete<Void>() {
207 public void onComplete(Throwable failure, Void notUsed) throws Throwable {
208 if(failure != null) {
209 if(LOG.isDebugEnabled()) {
210 LOG.debug("Tx {}: a {} cohort path Future failed: {}", transactionId,
211 operationName, failure);
213 if(propagateException) {
214 returnFuture.setException(failure);
216 returnFuture.set(null);
219 finishVoidOperation(operationName, message, expectedResponseClass,
220 propagateException, returnFuture);
223 }, actorContext.getActorSystem().dispatcher());
229 private void finishVoidOperation(final String operationName, final Object message,
230 final Class<?> expectedResponseClass, final boolean propagateException,
231 final SettableFuture<Void> returnFuture) {
232 if(LOG.isDebugEnabled()) {
233 LOG.debug("Tx {} finish {}", transactionId, operationName);
235 Future<Iterable<Object>> combinedFuture = invokeCohorts(message);
237 combinedFuture.onComplete(new OnComplete<Iterable<Object>>() {
239 public void onComplete(Throwable failure, Iterable<Object> responses) throws Throwable {
241 Throwable exceptionToPropagate = failure;
242 if(exceptionToPropagate == null) {
243 for(Object response: responses) {
244 if(!response.getClass().equals(expectedResponseClass)) {
245 exceptionToPropagate = new IllegalArgumentException(
246 String.format("Unexpected response type {}",
247 response.getClass()));
253 if(exceptionToPropagate != null) {
254 if(LOG.isDebugEnabled()) {
255 LOG.debug("Tx {}: a {} cohort Future failed: {}", transactionId,
256 operationName, exceptionToPropagate);
258 if(propagateException) {
259 // We don't log the exception here to avoid redundant logging since we're
260 // propagating to the caller in MD-SAL core who will log it.
261 returnFuture.setException(exceptionToPropagate);
263 // Since the caller doesn't want us to propagate the exception we'll also
264 // not log it normally. But it's usually not good to totally silence
265 // exceptions so we'll log it to debug level.
266 if(LOG.isDebugEnabled()) {
267 LOG.debug(String.format("%s failed", message.getClass().getSimpleName()),
268 exceptionToPropagate);
270 returnFuture.set(null);
273 if(LOG.isDebugEnabled()) {
274 LOG.debug("Tx {}: {} succeeded", transactionId, operationName);
276 returnFuture.set(null);
279 }, actorContext.getActorSystem().dispatcher());
283 List<Future<ActorPath>> getCohortPathFutures() {
284 return Collections.unmodifiableList(cohortPathFutures);