2 * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
9 package org.opendaylight.controller.cluster.datastore;
11 import akka.actor.ActorSelection;
12 import akka.dispatch.Futures;
13 import akka.dispatch.OnComplete;
14 import com.codahale.metrics.Snapshot;
15 import com.codahale.metrics.Timer;
16 import com.google.common.annotations.VisibleForTesting;
17 import com.google.common.collect.Lists;
18 import com.google.common.util.concurrent.ListenableFuture;
19 import com.google.common.util.concurrent.SettableFuture;
20 import java.util.Collections;
21 import java.util.List;
22 import java.util.concurrent.TimeUnit;
23 import org.opendaylight.controller.cluster.datastore.messages.AbortTransaction;
24 import org.opendaylight.controller.cluster.datastore.messages.AbortTransactionReply;
25 import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransaction;
26 import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransactionReply;
27 import org.opendaylight.controller.cluster.datastore.messages.CommitTransaction;
28 import org.opendaylight.controller.cluster.datastore.messages.CommitTransactionReply;
29 import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
30 import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
31 import org.slf4j.Logger;
32 import org.slf4j.LoggerFactory;
33 import scala.concurrent.Future;
34 import scala.runtime.AbstractFunction1;
37 * ThreePhaseCommitCohortProxy represents a set of remote cohort proxies
39 public class ThreePhaseCommitCohortProxy implements DOMStoreThreePhaseCommitCohort{
41 private static final Logger LOG = LoggerFactory.getLogger(ThreePhaseCommitCohortProxy.class);
43 private static final ListenableFuture<Void> IMMEDIATE_SUCCESS =
44 com.google.common.util.concurrent.Futures.immediateFuture(null);
46 private final ActorContext actorContext;
47 private final List<Future<ActorSelection>> cohortFutures;
48 private volatile List<ActorSelection> cohorts;
49 private final String transactionId;
50 private static final OperationCallback NO_OP_CALLBACK = new OperationCallback() {
56 public void success() {
60 public void failure() {
64 public ThreePhaseCommitCohortProxy(ActorContext actorContext,
65 List<Future<ActorSelection>> cohortFutures, String transactionId) {
66 this.actorContext = actorContext;
67 this.cohortFutures = cohortFutures;
68 this.transactionId = transactionId;
71 private Future<Void> buildCohortList() {
73 Future<Iterable<ActorSelection>> combinedFutures = Futures.sequence(cohortFutures,
74 actorContext.getClientDispatcher());
76 return combinedFutures.transform(new AbstractFunction1<Iterable<ActorSelection>, Void>() {
78 public Void apply(Iterable<ActorSelection> actorSelections) {
79 cohorts = Lists.newArrayList(actorSelections);
80 if(LOG.isDebugEnabled()) {
81 LOG.debug("Tx {} successfully built cohort path list: {}",
82 transactionId, cohorts);
86 }, TransactionProxy.SAME_FAILURE_TRANSFORMER, actorContext.getClientDispatcher());
90 public ListenableFuture<Boolean> canCommit() {
91 if(LOG.isDebugEnabled()) {
92 LOG.debug("Tx {} canCommit", transactionId);
94 final SettableFuture<Boolean> returnFuture = SettableFuture.create();
96 // The first phase of canCommit is to gather the list of cohort actor paths that will
97 // participate in the commit. buildCohortPathsList combines the cohort path Futures into
98 // one Future which we wait on asynchronously here. The cohort actor paths are
99 // extracted from ReadyTransactionReply messages by the Futures that were obtained earlier
100 // and passed to us from upstream processing. If any one fails then we'll fail canCommit.
102 buildCohortList().onComplete(new OnComplete<Void>() {
104 public void onComplete(Throwable failure, Void notUsed) throws Throwable {
105 if(failure != null) {
106 if(LOG.isDebugEnabled()) {
107 LOG.debug("Tx {}: a cohort Future failed: {}", transactionId, failure);
109 returnFuture.setException(failure);
111 finishCanCommit(returnFuture);
114 }, actorContext.getClientDispatcher());
119 private void finishCanCommit(final SettableFuture<Boolean> returnFuture) {
120 if(LOG.isDebugEnabled()) {
121 LOG.debug("Tx {} finishCanCommit", transactionId);
123 // The last phase of canCommit is to invoke all the cohort actors asynchronously to perform
124 // their canCommit processing. If any one fails then we'll fail canCommit.
126 Future<Iterable<Object>> combinedFuture =
127 invokeCohorts(new CanCommitTransaction(transactionId).toSerializable());
129 combinedFuture.onComplete(new OnComplete<Iterable<Object>>() {
131 public void onComplete(Throwable failure, Iterable<Object> responses) throws Throwable {
132 if(failure != null) {
133 if(LOG.isDebugEnabled()) {
134 LOG.debug("Tx {}: a canCommit cohort Future failed: {}", transactionId, failure);
136 returnFuture.setException(failure);
140 boolean result = true;
141 for(Object response: responses) {
142 if (response.getClass().equals(CanCommitTransactionReply.SERIALIZABLE_CLASS)) {
143 CanCommitTransactionReply reply =
144 CanCommitTransactionReply.fromSerializable(response);
145 if (!reply.getCanCommit()) {
150 LOG.error("Unexpected response type {}", response.getClass());
151 returnFuture.setException(new IllegalArgumentException(
152 String.format("Unexpected response type %s", response.getClass())));
156 if(LOG.isDebugEnabled()) {
157 LOG.debug("Tx {}: canCommit returning result: {}", transactionId, result);
159 returnFuture.set(Boolean.valueOf(result));
161 }, actorContext.getClientDispatcher());
164 private Future<Iterable<Object>> invokeCohorts(Object message) {
165 List<Future<Object>> futureList = Lists.newArrayListWithCapacity(cohorts.size());
166 for(ActorSelection cohort : cohorts) {
167 if(LOG.isDebugEnabled()) {
168 LOG.debug("Tx {}: Sending {} to cohort {}", transactionId, message, cohort);
170 futureList.add(actorContext.executeOperationAsync(cohort, message, actorContext.getTransactionCommitOperationTimeout()));
173 return Futures.sequence(futureList, actorContext.getClientDispatcher());
177 public ListenableFuture<Void> preCommit() {
178 // We don't need to do anything here - preCommit is done atomically with the commit phase
180 return IMMEDIATE_SUCCESS;
184 public ListenableFuture<Void> abort() {
185 // Note - we pass false for propagateException. In the front-end data broker, this method
186 // is called when one of the 3 phases fails with an exception. We'd rather have that
187 // original exception propagated to the client. If our abort fails and we propagate the
188 // exception then that exception will supersede and suppress the original exception. But
189 // it's the original exception that is the root cause and of more interest to the client.
191 return voidOperation("abort", new AbortTransaction(transactionId).toSerializable(),
192 AbortTransactionReply.SERIALIZABLE_CLASS, false);
196 public ListenableFuture<Void> commit() {
197 OperationCallback operationCallback = (cohortFutures.size() == 0) ? NO_OP_CALLBACK :
198 new CommitCallback(actorContext);
200 return voidOperation("commit", new CommitTransaction(transactionId).toSerializable(),
201 CommitTransactionReply.SERIALIZABLE_CLASS, true, operationCallback);
204 private ListenableFuture<Void> voidOperation(final String operationName, final Object message,
205 final Class<?> expectedResponseClass, final boolean propagateException) {
206 return voidOperation(operationName, message, expectedResponseClass, propagateException, NO_OP_CALLBACK);
209 private ListenableFuture<Void> voidOperation(final String operationName, final Object message,
210 final Class<?> expectedResponseClass, final boolean propagateException, final OperationCallback callback) {
212 if(LOG.isDebugEnabled()) {
213 LOG.debug("Tx {} {}", transactionId, operationName);
215 final SettableFuture<Void> returnFuture = SettableFuture.create();
217 // The cohort actor list should already be built at this point by the canCommit phase but,
218 // if not for some reason, we'll try to build it here.
220 if(cohorts != null) {
221 finishVoidOperation(operationName, message, expectedResponseClass, propagateException,
222 returnFuture, callback);
224 buildCohortList().onComplete(new OnComplete<Void>() {
226 public void onComplete(Throwable failure, Void notUsed) throws Throwable {
227 if(failure != null) {
228 if(LOG.isDebugEnabled()) {
229 LOG.debug("Tx {}: a {} cohort path Future failed: {}", transactionId,
230 operationName, failure);
232 if(propagateException) {
233 returnFuture.setException(failure);
235 returnFuture.set(null);
238 finishVoidOperation(operationName, message, expectedResponseClass,
239 propagateException, returnFuture, callback);
242 }, actorContext.getClientDispatcher());
248 private void finishVoidOperation(final String operationName, final Object message,
249 final Class<?> expectedResponseClass, final boolean propagateException,
250 final SettableFuture<Void> returnFuture, final OperationCallback callback) {
251 if(LOG.isDebugEnabled()) {
252 LOG.debug("Tx {} finish {}", transactionId, operationName);
257 Future<Iterable<Object>> combinedFuture = invokeCohorts(message);
259 combinedFuture.onComplete(new OnComplete<Iterable<Object>>() {
261 public void onComplete(Throwable failure, Iterable<Object> responses) throws Throwable {
263 Throwable exceptionToPropagate = failure;
264 if(exceptionToPropagate == null) {
265 for(Object response: responses) {
266 if(!response.getClass().equals(expectedResponseClass)) {
267 exceptionToPropagate = new IllegalArgumentException(
268 String.format("Unexpected response type %s",
269 response.getClass()));
275 if(exceptionToPropagate != null) {
277 if(LOG.isDebugEnabled()) {
278 LOG.debug("Tx {}: a {} cohort Future failed: {}", transactionId,
279 operationName, exceptionToPropagate);
281 if(propagateException) {
282 // We don't log the exception here to avoid redundant logging since we're
283 // propagating to the caller in MD-SAL core who will log it.
284 returnFuture.setException(exceptionToPropagate);
286 // Since the caller doesn't want us to propagate the exception we'll also
287 // not log it normally. But it's usually not good to totally silence
288 // exceptions so we'll log it to debug level.
289 if(LOG.isDebugEnabled()) {
290 LOG.debug(String.format("%s failed", message.getClass().getSimpleName()),
291 exceptionToPropagate);
293 returnFuture.set(null);
299 if(LOG.isDebugEnabled()) {
300 LOG.debug("Tx {}: {} succeeded", transactionId, operationName);
302 returnFuture.set(null);
307 }, actorContext.getClientDispatcher());
311 List<Future<ActorSelection>> getCohortFutures() {
312 return Collections.unmodifiableList(cohortFutures);
315 private static interface OperationCallback {
321 private static class CommitCallback implements OperationCallback{
323 private static final Logger LOG = LoggerFactory.getLogger(CommitCallback.class);
324 private static final String COMMIT = "commit";
326 private final Timer commitTimer;
327 private final ActorContext actorContext;
328 private Timer.Context timerContext;
330 CommitCallback(ActorContext actorContext){
331 this.actorContext = actorContext;
332 commitTimer = actorContext.getOperationTimer(COMMIT);
337 timerContext = commitTimer.time();
341 public void success() {
344 Snapshot timerSnapshot = commitTimer.getSnapshot();
345 double allowedLatencyInNanos = timerSnapshot.get95thPercentile();
347 long commitTimeoutInSeconds = actorContext.getDatastoreContext()
348 .getShardTransactionCommitTimeoutInSeconds();
349 long commitTimeoutInNanos = TimeUnit.SECONDS.toNanos(commitTimeoutInSeconds);
351 // Here we are trying to find out how many transactions per second are allowed
352 double newRateLimit = ((double) commitTimeoutInNanos / allowedLatencyInNanos) / commitTimeoutInSeconds;
354 LOG.debug("Data Store {} commit rateLimit adjusted to {} allowedLatencyInNanos = {}",
355 actorContext.getDataStoreType(), newRateLimit, allowedLatencyInNanos);
357 actorContext.setTxCreationLimit(newRateLimit);
361 public void failure() {
362 // This would mean we couldn't get a transaction completed in 30 seconds which is
363 // the default transaction commit timeout. Using the timeout information to figure out the rate limit is
364 // not going to be useful - so we leave it as it is