2 * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.controller.cluster.datastore;
10 import static com.google.common.base.Preconditions.checkState;
11 import static java.util.Objects.requireNonNull;
13 import akka.actor.ActorSelection;
14 import akka.dispatch.OnComplete;
15 import com.google.common.util.concurrent.FutureCallback;
16 import com.google.common.util.concurrent.Futures;
17 import com.google.common.util.concurrent.ListenableFuture;
18 import com.google.common.util.concurrent.MoreExecutors;
19 import com.google.common.util.concurrent.SettableFuture;
20 import java.util.ArrayList;
21 import java.util.Iterator;
22 import java.util.List;
23 import java.util.concurrent.atomic.AtomicInteger;
24 import java.util.function.Supplier;
25 import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
26 import org.opendaylight.controller.cluster.datastore.messages.AbortTransaction;
27 import org.opendaylight.controller.cluster.datastore.messages.AbortTransactionReply;
28 import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransaction;
29 import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransactionReply;
30 import org.opendaylight.controller.cluster.datastore.messages.CommitTransaction;
31 import org.opendaylight.controller.cluster.datastore.messages.CommitTransactionReply;
32 import org.opendaylight.controller.cluster.datastore.utils.ActorUtils;
33 import org.opendaylight.mdsal.common.api.CommitInfo;
34 import org.opendaylight.mdsal.dom.spi.store.DOMStoreThreePhaseCommitCohort;
35 import org.opendaylight.yangtools.yang.common.Empty;
36 import org.slf4j.Logger;
37 import org.slf4j.LoggerFactory;
38 import scala.concurrent.Future;
41 * ThreePhaseCommitCohortProxy represents a set of remote cohort proxies.
43 @Deprecated(since = "9.0.0", forRemoval = true)
44 final class ThreePhaseCommitCohortProxy implements DOMStoreThreePhaseCommitCohort {
45 private static final Logger LOG = LoggerFactory.getLogger(ThreePhaseCommitCohortProxy.class);
47 private static final MessageSupplier COMMIT_MESSAGE_SUPPLIER = new MessageSupplier() {
49 public Object newMessage(final TransactionIdentifier transactionId, final short version) {
50 return new CommitTransaction(transactionId, version).toSerializable();
54 public boolean isSerializedReplyType(final Object reply) {
55 return CommitTransactionReply.isSerializedType(reply);
59 private static final MessageSupplier ABORT_MESSAGE_SUPPLIER = new MessageSupplier() {
61 public Object newMessage(final TransactionIdentifier transactionId, final short version) {
62 return new AbortTransaction(transactionId, version).toSerializable();
66 public boolean isSerializedReplyType(final Object reply) {
67 return AbortTransactionReply.isSerializedType(reply);
71 private static final OperationCallback NO_OP_CALLBACK = new OperationCallback() {
77 public void success() {
81 public void failure() {
89 public void resume() {
94 private final ActorUtils actorUtils;
95 private final List<CohortInfo> cohorts;
96 private final SettableFuture<Empty> cohortsResolvedFuture = SettableFuture.create();
97 private final TransactionIdentifier transactionId;
98 private volatile OperationCallback commitOperationCallback;
100 ThreePhaseCommitCohortProxy(final ActorUtils actorUtils, final List<CohortInfo> cohorts,
101 final TransactionIdentifier transactionId) {
102 this.actorUtils = actorUtils;
103 this.cohorts = cohorts;
104 this.transactionId = requireNonNull(transactionId);
106 if (cohorts.isEmpty()) {
107 cohortsResolvedFuture.set(Empty.value());
111 private ListenableFuture<Empty> resolveCohorts() {
112 if (cohortsResolvedFuture.isDone()) {
113 return cohortsResolvedFuture;
116 final AtomicInteger completed = new AtomicInteger(cohorts.size());
117 final Object lock = new Object();
118 for (final CohortInfo info: cohorts) {
119 info.getActorFuture().onComplete(new OnComplete<ActorSelection>() {
121 public void onComplete(final Throwable failure, final ActorSelection actor) {
122 synchronized (lock) {
123 boolean done = completed.decrementAndGet() == 0;
124 if (failure != null) {
125 LOG.debug("Tx {}: a cohort Future failed", transactionId, failure);
126 cohortsResolvedFuture.setException(failure);
127 } else if (!cohortsResolvedFuture.isDone()) {
128 LOG.debug("Tx {}: cohort actor {} resolved", transactionId, actor);
130 info.setResolvedActor(actor);
132 LOG.debug("Tx {}: successfully resolved all cohort actors", transactionId);
133 cohortsResolvedFuture.set(Empty.value());
138 }, actorUtils.getClientDispatcher());
141 return cohortsResolvedFuture;
145 public ListenableFuture<Boolean> canCommit() {
146 LOG.debug("Tx {} canCommit", transactionId);
148 final SettableFuture<Boolean> returnFuture = SettableFuture.create();
150 // The first phase of canCommit is to gather the list of cohort actor paths that will
151 // participate in the commit. buildCohortPathsList combines the cohort path Futures into
152 // one Future which we wait on asynchronously here. The cohort actor paths are
153 // extracted from ReadyTransactionReply messages by the Futures that were obtained earlier
154 // and passed to us from upstream processing. If any one fails then we'll fail canCommit.
156 Futures.addCallback(resolveCohorts(), new FutureCallback<>() {
158 public void onSuccess(final Empty result) {
159 finishCanCommit(returnFuture);
163 public void onFailure(final Throwable failure) {
164 returnFuture.setException(failure);
166 }, MoreExecutors.directExecutor());
171 private void finishCanCommit(final SettableFuture<Boolean> returnFuture) {
172 LOG.debug("Tx {} finishCanCommit", transactionId);
174 // For empty transactions return immediately
175 if (cohorts.size() == 0) {
176 LOG.debug("Tx {}: canCommit returning result true", transactionId);
177 returnFuture.set(Boolean.TRUE);
181 commitOperationCallback = new TransactionRateLimitingCallback(actorUtils);
182 commitOperationCallback.run();
184 final Iterator<CohortInfo> iterator = cohorts.iterator();
186 final OnComplete<Object> onComplete = new OnComplete<>() {
188 public void onComplete(final Throwable failure, final Object response) {
189 if (failure != null) {
190 LOG.debug("Tx {}: a canCommit cohort Future failed", transactionId, failure);
192 returnFuture.setException(failure);
193 commitOperationCallback.failure();
197 // Only the first call to pause takes effect - subsequent calls before resume are no-ops. So
198 // this means we'll only time the first transaction canCommit which should be fine.
199 commitOperationCallback.pause();
201 boolean result = true;
202 if (CanCommitTransactionReply.isSerializedType(response)) {
203 CanCommitTransactionReply reply = CanCommitTransactionReply.fromSerializable(response);
205 LOG.debug("Tx {}: received {}", transactionId, response);
207 if (!reply.getCanCommit()) {
211 LOG.error("Unexpected response type {}", response.getClass());
212 returnFuture.setException(new IllegalArgumentException(
213 String.format("Unexpected response type %s", response.getClass())));
217 if (iterator.hasNext() && result) {
218 sendCanCommitTransaction(iterator.next(), this);
220 LOG.debug("Tx {}: canCommit returning result: {}", transactionId, result);
221 returnFuture.set(result);
227 sendCanCommitTransaction(iterator.next(), onComplete);
230 private void sendCanCommitTransaction(final CohortInfo toCohortInfo, final OnComplete<Object> onComplete) {
231 CanCommitTransaction message = new CanCommitTransaction(transactionId, toCohortInfo.getActorVersion());
233 LOG.debug("Tx {}: sending {} to {}", transactionId, message, toCohortInfo.getResolvedActor());
235 Future<Object> future = actorUtils.executeOperationAsync(toCohortInfo.getResolvedActor(),
236 message.toSerializable(), actorUtils.getTransactionCommitOperationTimeout());
237 future.onComplete(onComplete, actorUtils.getClientDispatcher());
240 private Future<Iterable<Object>> invokeCohorts(final MessageSupplier messageSupplier) {
241 List<Future<Object>> futureList = new ArrayList<>(cohorts.size());
242 for (CohortInfo cohort : cohorts) {
243 Object message = messageSupplier.newMessage(transactionId, cohort.getActorVersion());
245 LOG.debug("Tx {}: Sending {} to cohort {}", transactionId, message , cohort.getResolvedActor());
247 futureList.add(actorUtils.executeOperationAsync(cohort.getResolvedActor(), message,
248 actorUtils.getTransactionCommitOperationTimeout()));
251 return akka.dispatch.Futures.sequence(futureList, actorUtils.getClientDispatcher());
255 public ListenableFuture<Empty> preCommit() {
256 // We don't need to do anything here - preCommit is done atomically with the commit phase by the shard.
257 return Empty.immediateFuture();
261 public ListenableFuture<Empty> abort() {
262 // Note - we pass false for propagateException. In the front-end data broker, this method
263 // is called when one of the 3 phases fails with an exception. We'd rather have that
264 // original exception propagated to the client. If our abort fails and we propagate the
265 // exception then that exception will supersede and suppress the original exception. But
266 // it's the original exception that is the root cause and of more interest to the client.
268 return operation("abort", Empty.value(), ABORT_MESSAGE_SUPPLIER, AbortTransactionReply.class, false,
273 public ListenableFuture<? extends CommitInfo> commit() {
274 OperationCallback operationCallback = commitOperationCallback != null ? commitOperationCallback :
277 return operation("commit", CommitInfo.empty(), COMMIT_MESSAGE_SUPPLIER, CommitTransactionReply.class, true,
281 @SuppressWarnings("checkstyle:IllegalCatch")
282 private static boolean successfulFuture(final ListenableFuture<?> future) {
283 if (!future.isDone()) {
290 } catch (Exception e) {
295 private <T> ListenableFuture<T> operation(final String operationName, final T futureValue,
296 final MessageSupplier messageSupplier, final Class<?> expectedResponseClass,
297 final boolean propagateException, final OperationCallback callback) {
298 LOG.debug("Tx {} {}", transactionId, operationName);
300 final SettableFuture<T> returnFuture = SettableFuture.create();
302 // The cohort actor list should already be built at this point by the canCommit phase but,
303 // if not for some reason, we'll try to build it here.
305 ListenableFuture<Empty> future = resolveCohorts();
306 if (successfulFuture(future)) {
307 finishOperation(operationName, messageSupplier, expectedResponseClass, propagateException, returnFuture,
308 futureValue, callback);
310 Futures.addCallback(future, new FutureCallback<>() {
312 public void onSuccess(final Empty result) {
313 finishOperation(operationName, messageSupplier, expectedResponseClass, propagateException,
314 returnFuture, futureValue, callback);
318 public void onFailure(final Throwable failure) {
319 LOG.debug("Tx {}: a {} cohort path Future failed", transactionId, operationName, failure);
321 if (propagateException) {
322 returnFuture.setException(failure);
324 returnFuture.set(futureValue);
327 }, MoreExecutors.directExecutor());
333 private <T> void finishOperation(final String operationName, final MessageSupplier messageSupplier,
334 final Class<?> expectedResponseClass, final boolean propagateException,
335 final SettableFuture<T> returnFuture, final T futureValue,
336 final OperationCallback callback) {
337 LOG.debug("Tx {} finish {}", transactionId, operationName);
341 Future<Iterable<Object>> combinedFuture = invokeCohorts(messageSupplier);
343 combinedFuture.onComplete(new OnComplete<Iterable<Object>>() {
345 public void onComplete(final Throwable failure, final Iterable<Object> responses) {
346 Throwable exceptionToPropagate = failure;
347 if (exceptionToPropagate == null) {
348 for (Object response: responses) {
349 if (!response.getClass().equals(expectedResponseClass)) {
350 exceptionToPropagate = new IllegalArgumentException(
351 String.format("Unexpected response type %s", response.getClass()));
357 if (exceptionToPropagate != null) {
358 LOG.debug("Tx {}: a {} cohort Future failed", transactionId, operationName, exceptionToPropagate);
359 if (propagateException) {
360 // We don't log the exception here to avoid redundant logging since we're
361 // propagating to the caller in MD-SAL core who will log it.
362 returnFuture.setException(exceptionToPropagate);
364 // Since the caller doesn't want us to propagate the exception we'll also
365 // not log it normally. But it's usually not good to totally silence
366 // exceptions so we'll log it to debug level.
367 returnFuture.set(futureValue);
372 LOG.debug("Tx {}: {} succeeded", transactionId, operationName);
374 returnFuture.set(futureValue);
379 }, actorUtils.getClientDispatcher());
382 static class CohortInfo {
383 private final Future<ActorSelection> actorFuture;
384 private final Supplier<Short> actorVersionSupplier;
386 private volatile ActorSelection resolvedActor;
388 CohortInfo(final Future<ActorSelection> actorFuture, final Supplier<Short> actorVersionSupplier) {
389 this.actorFuture = actorFuture;
390 this.actorVersionSupplier = actorVersionSupplier;
393 Future<ActorSelection> getActorFuture() {
397 ActorSelection getResolvedActor() {
398 return resolvedActor;
401 void setResolvedActor(final ActorSelection resolvedActor) {
402 this.resolvedActor = resolvedActor;
405 short getActorVersion() {
406 checkState(resolvedActor != null, "getActorVersion cannot be called until the actor is resolved");
407 return actorVersionSupplier.get();
411 private interface MessageSupplier {
412 Object newMessage(TransactionIdentifier transactionId, short version);
414 boolean isSerializedReplyType(Object reply);