2 * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.controller.cluster.datastore;
10 import static com.google.common.base.Preconditions.checkState;
11 import static java.util.Objects.requireNonNull;
13 import akka.actor.ActorSelection;
14 import akka.dispatch.OnComplete;
15 import com.google.common.util.concurrent.FutureCallback;
16 import com.google.common.util.concurrent.Futures;
17 import com.google.common.util.concurrent.ListenableFuture;
18 import com.google.common.util.concurrent.MoreExecutors;
19 import com.google.common.util.concurrent.SettableFuture;
20 import java.util.ArrayList;
21 import java.util.Iterator;
22 import java.util.List;
23 import java.util.concurrent.atomic.AtomicInteger;
24 import java.util.function.Supplier;
25 import org.eclipse.jdt.annotation.NonNull;
26 import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
27 import org.opendaylight.controller.cluster.datastore.messages.AbortTransaction;
28 import org.opendaylight.controller.cluster.datastore.messages.AbortTransactionReply;
29 import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransaction;
30 import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransactionReply;
31 import org.opendaylight.controller.cluster.datastore.messages.CommitTransaction;
32 import org.opendaylight.controller.cluster.datastore.messages.CommitTransactionReply;
33 import org.opendaylight.controller.cluster.datastore.utils.ActorUtils;
34 import org.opendaylight.mdsal.common.api.CommitInfo;
35 import org.opendaylight.mdsal.dom.spi.store.DOMStoreThreePhaseCommitCohort;
36 import org.opendaylight.yangtools.yang.common.Empty;
37 import org.slf4j.Logger;
38 import org.slf4j.LoggerFactory;
39 import scala.concurrent.Future;
42 * ThreePhaseCommitCohortProxy represents a set of remote cohort proxies.
44 @Deprecated(since = "9.0.0", forRemoval = true)
45 final class ThreePhaseCommitCohortProxy implements DOMStoreThreePhaseCommitCohort {
46 private static final Logger LOG = LoggerFactory.getLogger(ThreePhaseCommitCohortProxy.class);
47 private static final @NonNull ListenableFuture<Empty> IMMEDIATE_EMPTY_SUCCESS =
48 Futures.immediateFuture(Empty.value());
50 private static final MessageSupplier COMMIT_MESSAGE_SUPPLIER = new MessageSupplier() {
52 public Object newMessage(final TransactionIdentifier transactionId, final short version) {
53 return new CommitTransaction(transactionId, version).toSerializable();
57 public boolean isSerializedReplyType(final Object reply) {
58 return CommitTransactionReply.isSerializedType(reply);
62 private static final MessageSupplier ABORT_MESSAGE_SUPPLIER = new MessageSupplier() {
64 public Object newMessage(final TransactionIdentifier transactionId, final short version) {
65 return new AbortTransaction(transactionId, version).toSerializable();
69 public boolean isSerializedReplyType(final Object reply) {
70 return AbortTransactionReply.isSerializedType(reply);
74 private static final OperationCallback NO_OP_CALLBACK = new OperationCallback() {
80 public void success() {
84 public void failure() {
92 public void resume() {
97 private final ActorUtils actorUtils;
98 private final List<CohortInfo> cohorts;
99 private final SettableFuture<Empty> cohortsResolvedFuture = SettableFuture.create();
100 private final TransactionIdentifier transactionId;
101 private volatile OperationCallback commitOperationCallback;
103 ThreePhaseCommitCohortProxy(final ActorUtils actorUtils, final List<CohortInfo> cohorts,
104 final TransactionIdentifier transactionId) {
105 this.actorUtils = actorUtils;
106 this.cohorts = cohorts;
107 this.transactionId = requireNonNull(transactionId);
109 if (cohorts.isEmpty()) {
110 cohortsResolvedFuture.set(Empty.value());
114 private ListenableFuture<Empty> resolveCohorts() {
115 if (cohortsResolvedFuture.isDone()) {
116 return cohortsResolvedFuture;
119 final AtomicInteger completed = new AtomicInteger(cohorts.size());
120 final Object lock = new Object();
121 for (final CohortInfo info: cohorts) {
122 info.getActorFuture().onComplete(new OnComplete<ActorSelection>() {
124 public void onComplete(final Throwable failure, final ActorSelection actor) {
125 synchronized (lock) {
126 boolean done = completed.decrementAndGet() == 0;
127 if (failure != null) {
128 LOG.debug("Tx {}: a cohort Future failed", transactionId, failure);
129 cohortsResolvedFuture.setException(failure);
130 } else if (!cohortsResolvedFuture.isDone()) {
131 LOG.debug("Tx {}: cohort actor {} resolved", transactionId, actor);
133 info.setResolvedActor(actor);
135 LOG.debug("Tx {}: successfully resolved all cohort actors", transactionId);
136 cohortsResolvedFuture.set(Empty.value());
141 }, actorUtils.getClientDispatcher());
144 return cohortsResolvedFuture;
148 public ListenableFuture<Boolean> canCommit() {
149 LOG.debug("Tx {} canCommit", transactionId);
151 final SettableFuture<Boolean> returnFuture = SettableFuture.create();
153 // The first phase of canCommit is to gather the list of cohort actor paths that will
154 // participate in the commit. buildCohortPathsList combines the cohort path Futures into
155 // one Future which we wait on asynchronously here. The cohort actor paths are
156 // extracted from ReadyTransactionReply messages by the Futures that were obtained earlier
157 // and passed to us from upstream processing. If any one fails then we'll fail canCommit.
159 Futures.addCallback(resolveCohorts(), new FutureCallback<>() {
161 public void onSuccess(final Empty result) {
162 finishCanCommit(returnFuture);
166 public void onFailure(final Throwable failure) {
167 returnFuture.setException(failure);
169 }, MoreExecutors.directExecutor());
174 private void finishCanCommit(final SettableFuture<Boolean> returnFuture) {
175 LOG.debug("Tx {} finishCanCommit", transactionId);
177 // For empty transactions return immediately
178 if (cohorts.size() == 0) {
179 LOG.debug("Tx {}: canCommit returning result true", transactionId);
180 returnFuture.set(Boolean.TRUE);
184 commitOperationCallback = new TransactionRateLimitingCallback(actorUtils);
185 commitOperationCallback.run();
187 final Iterator<CohortInfo> iterator = cohorts.iterator();
189 final OnComplete<Object> onComplete = new OnComplete<>() {
191 public void onComplete(final Throwable failure, final Object response) {
192 if (failure != null) {
193 LOG.debug("Tx {}: a canCommit cohort Future failed", transactionId, failure);
195 returnFuture.setException(failure);
196 commitOperationCallback.failure();
200 // Only the first call to pause takes effect - subsequent calls before resume are no-ops. So
201 // this means we'll only time the first transaction canCommit which should be fine.
202 commitOperationCallback.pause();
204 boolean result = true;
205 if (CanCommitTransactionReply.isSerializedType(response)) {
206 CanCommitTransactionReply reply = CanCommitTransactionReply.fromSerializable(response);
208 LOG.debug("Tx {}: received {}", transactionId, response);
210 if (!reply.getCanCommit()) {
214 LOG.error("Unexpected response type {}", response.getClass());
215 returnFuture.setException(new IllegalArgumentException(
216 String.format("Unexpected response type %s", response.getClass())));
220 if (iterator.hasNext() && result) {
221 sendCanCommitTransaction(iterator.next(), this);
223 LOG.debug("Tx {}: canCommit returning result: {}", transactionId, result);
224 returnFuture.set(result);
230 sendCanCommitTransaction(iterator.next(), onComplete);
233 private void sendCanCommitTransaction(final CohortInfo toCohortInfo, final OnComplete<Object> onComplete) {
234 CanCommitTransaction message = new CanCommitTransaction(transactionId, toCohortInfo.getActorVersion());
236 LOG.debug("Tx {}: sending {} to {}", transactionId, message, toCohortInfo.getResolvedActor());
238 Future<Object> future = actorUtils.executeOperationAsync(toCohortInfo.getResolvedActor(),
239 message.toSerializable(), actorUtils.getTransactionCommitOperationTimeout());
240 future.onComplete(onComplete, actorUtils.getClientDispatcher());
243 private Future<Iterable<Object>> invokeCohorts(final MessageSupplier messageSupplier) {
244 List<Future<Object>> futureList = new ArrayList<>(cohorts.size());
245 for (CohortInfo cohort : cohorts) {
246 Object message = messageSupplier.newMessage(transactionId, cohort.getActorVersion());
248 LOG.debug("Tx {}: Sending {} to cohort {}", transactionId, message , cohort.getResolvedActor());
250 futureList.add(actorUtils.executeOperationAsync(cohort.getResolvedActor(), message,
251 actorUtils.getTransactionCommitOperationTimeout()));
254 return akka.dispatch.Futures.sequence(futureList, actorUtils.getClientDispatcher());
258 public ListenableFuture<Empty> preCommit() {
259 // We don't need to do anything here - preCommit is done atomically with the commit phase by the shard.
260 return IMMEDIATE_EMPTY_SUCCESS;
264 public ListenableFuture<Empty> abort() {
265 // Note - we pass false for propagateException. In the front-end data broker, this method
266 // is called when one of the 3 phases fails with an exception. We'd rather have that
267 // original exception propagated to the client. If our abort fails and we propagate the
268 // exception then that exception will supersede and suppress the original exception. But
269 // it's the original exception that is the root cause and of more interest to the client.
271 return operation("abort", Empty.value(), ABORT_MESSAGE_SUPPLIER, AbortTransactionReply.class, false,
276 public ListenableFuture<? extends CommitInfo> commit() {
277 OperationCallback operationCallback = commitOperationCallback != null ? commitOperationCallback :
280 return operation("commit", CommitInfo.empty(), COMMIT_MESSAGE_SUPPLIER, CommitTransactionReply.class, true,
284 @SuppressWarnings("checkstyle:IllegalCatch")
285 private static boolean successfulFuture(final ListenableFuture<?> future) {
286 if (!future.isDone()) {
293 } catch (Exception e) {
298 private <T> ListenableFuture<T> operation(final String operationName, final T futureValue,
299 final MessageSupplier messageSupplier, final Class<?> expectedResponseClass,
300 final boolean propagateException, final OperationCallback callback) {
301 LOG.debug("Tx {} {}", transactionId, operationName);
303 final SettableFuture<T> returnFuture = SettableFuture.create();
305 // The cohort actor list should already be built at this point by the canCommit phase but,
306 // if not for some reason, we'll try to build it here.
308 ListenableFuture<Empty> future = resolveCohorts();
309 if (successfulFuture(future)) {
310 finishOperation(operationName, messageSupplier, expectedResponseClass, propagateException, returnFuture,
311 futureValue, callback);
313 Futures.addCallback(future, new FutureCallback<>() {
315 public void onSuccess(final Empty result) {
316 finishOperation(operationName, messageSupplier, expectedResponseClass, propagateException,
317 returnFuture, futureValue, callback);
321 public void onFailure(final Throwable failure) {
322 LOG.debug("Tx {}: a {} cohort path Future failed", transactionId, operationName, failure);
324 if (propagateException) {
325 returnFuture.setException(failure);
327 returnFuture.set(futureValue);
330 }, MoreExecutors.directExecutor());
336 private <T> void finishOperation(final String operationName, final MessageSupplier messageSupplier,
337 final Class<?> expectedResponseClass, final boolean propagateException,
338 final SettableFuture<T> returnFuture, final T futureValue,
339 final OperationCallback callback) {
340 LOG.debug("Tx {} finish {}", transactionId, operationName);
344 Future<Iterable<Object>> combinedFuture = invokeCohorts(messageSupplier);
346 combinedFuture.onComplete(new OnComplete<Iterable<Object>>() {
348 public void onComplete(final Throwable failure, final Iterable<Object> responses) {
349 Throwable exceptionToPropagate = failure;
350 if (exceptionToPropagate == null) {
351 for (Object response: responses) {
352 if (!response.getClass().equals(expectedResponseClass)) {
353 exceptionToPropagate = new IllegalArgumentException(
354 String.format("Unexpected response type %s", response.getClass()));
360 if (exceptionToPropagate != null) {
361 LOG.debug("Tx {}: a {} cohort Future failed", transactionId, operationName, exceptionToPropagate);
362 if (propagateException) {
363 // We don't log the exception here to avoid redundant logging since we're
364 // propagating to the caller in MD-SAL core who will log it.
365 returnFuture.setException(exceptionToPropagate);
367 // Since the caller doesn't want us to propagate the exception we'll also
368 // not log it normally. But it's usually not good to totally silence
369 // exceptions so we'll log it to debug level.
370 returnFuture.set(futureValue);
375 LOG.debug("Tx {}: {} succeeded", transactionId, operationName);
377 returnFuture.set(futureValue);
382 }, actorUtils.getClientDispatcher());
385 static class CohortInfo {
386 private final Future<ActorSelection> actorFuture;
387 private final Supplier<Short> actorVersionSupplier;
389 private volatile ActorSelection resolvedActor;
391 CohortInfo(final Future<ActorSelection> actorFuture, final Supplier<Short> actorVersionSupplier) {
392 this.actorFuture = actorFuture;
393 this.actorVersionSupplier = actorVersionSupplier;
396 Future<ActorSelection> getActorFuture() {
400 ActorSelection getResolvedActor() {
401 return resolvedActor;
404 void setResolvedActor(final ActorSelection resolvedActor) {
405 this.resolvedActor = resolvedActor;
408 short getActorVersion() {
409 checkState(resolvedActor != null, "getActorVersion cannot be called until the actor is resolved");
410 return actorVersionSupplier.get();
414 private interface MessageSupplier {
415 Object newMessage(TransactionIdentifier transactionId, short version);
417 boolean isSerializedReplyType(Object reply);