2 * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
9 package org.opendaylight.controller.cluster.datastore;
11 import akka.actor.ActorPath;
12 import akka.actor.ActorSelection;
13 import akka.dispatch.Futures;
14 import akka.dispatch.OnComplete;
16 import com.google.common.collect.Lists;
17 import com.google.common.util.concurrent.ListenableFuture;
18 import com.google.common.util.concurrent.SettableFuture;
20 import org.opendaylight.controller.cluster.datastore.messages.AbortTransaction;
21 import org.opendaylight.controller.cluster.datastore.messages.AbortTransactionReply;
22 import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransaction;
23 import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransactionReply;
24 import org.opendaylight.controller.cluster.datastore.messages.CommitTransaction;
25 import org.opendaylight.controller.cluster.datastore.messages.CommitTransactionReply;
26 import org.opendaylight.controller.cluster.datastore.messages.PreCommitTransaction;
27 import org.opendaylight.controller.cluster.datastore.messages.PreCommitTransactionReply;
28 import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
29 import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
30 import org.slf4j.Logger;
31 import org.slf4j.LoggerFactory;
33 import scala.concurrent.Future;
35 import java.util.Collections;
36 import java.util.List;
39 * ThreePhaseCommitCohortProxy represents a set of remote cohort proxies
41 public class ThreePhaseCommitCohortProxy implements DOMStoreThreePhaseCommitCohort{
43 private static final Logger LOG = LoggerFactory.getLogger(DistributedDataStore.class);
45 private final ActorContext actorContext;
46 private final List<ActorPath> cohortPaths;
47 private final String transactionId;
49 public ThreePhaseCommitCohortProxy(ActorContext actorContext, List<ActorPath> cohortPaths,
50 String transactionId) {
51 this.actorContext = actorContext;
52 this.cohortPaths = cohortPaths;
53 this.transactionId = transactionId;
57 public ListenableFuture<Boolean> canCommit() {
58 LOG.debug("txn {} canCommit", transactionId);
60 Future<Iterable<Object>> combinedFuture =
61 invokeCohorts(new CanCommitTransaction().toSerializable());
63 final SettableFuture<Boolean> returnFuture = SettableFuture.create();
65 combinedFuture.onComplete(new OnComplete<Iterable<Object>>() {
67 public void onComplete(Throwable failure, Iterable<Object> responses) throws Throwable {
69 returnFuture.setException(failure);
73 boolean result = true;
74 for(Object response: responses) {
75 if (response.getClass().equals(CanCommitTransactionReply.SERIALIZABLE_CLASS)) {
76 CanCommitTransactionReply reply =
77 CanCommitTransactionReply.fromSerializable(response);
78 if (!reply.getCanCommit()) {
83 LOG.error("Unexpected response type {}", response.getClass());
84 returnFuture.setException(new IllegalArgumentException(
85 String.format("Unexpected response type {}", response.getClass())));
90 returnFuture.set(Boolean.valueOf(result));
92 }, actorContext.getActorSystem().dispatcher());
97 private Future<Iterable<Object>> invokeCohorts(Object message) {
98 List<Future<Object>> futureList = Lists.newArrayListWithCapacity(cohortPaths.size());
99 for(ActorPath actorPath : cohortPaths) {
101 LOG.debug("txn {} Sending {} to {}", transactionId, message, actorPath);
103 ActorSelection cohort = actorContext.actorSelection(actorPath);
105 futureList.add(actorContext.executeRemoteOperationAsync(cohort, message,
106 ActorContext.ASK_DURATION));
109 return Futures.sequence(futureList, actorContext.getActorSystem().dispatcher());
113 public ListenableFuture<Void> preCommit() {
114 LOG.debug("txn {} preCommit", transactionId);
115 return voidOperation(new PreCommitTransaction().toSerializable(),
116 PreCommitTransactionReply.SERIALIZABLE_CLASS, true);
120 public ListenableFuture<Void> abort() {
121 LOG.debug("txn {} abort", transactionId);
123 // Note - we pass false for propagateException. In the front-end data broker, this method
124 // is called when one of the 3 phases fails with an exception. We'd rather have that
125 // original exception propagated to the client. If our abort fails and we propagate the
126 // exception then that exception will supersede and suppress the original exception. But
127 // it's the original exception that is the root cause and of more interest to the client.
129 return voidOperation(new AbortTransaction().toSerializable(),
130 AbortTransactionReply.SERIALIZABLE_CLASS, false);
134 public ListenableFuture<Void> commit() {
135 LOG.debug("txn {} commit", transactionId);
136 return voidOperation(new CommitTransaction().toSerializable(),
137 CommitTransactionReply.SERIALIZABLE_CLASS, true);
140 private ListenableFuture<Void> voidOperation(final Object message,
141 final Class<?> expectedResponseClass, final boolean propagateException) {
143 Future<Iterable<Object>> combinedFuture = invokeCohorts(message);
145 final SettableFuture<Void> returnFuture = SettableFuture.create();
147 combinedFuture.onComplete(new OnComplete<Iterable<Object>>() {
149 public void onComplete(Throwable failure, Iterable<Object> responses) throws Throwable {
151 Throwable exceptionToPropagate = failure;
152 if(exceptionToPropagate == null) {
153 for(Object response: responses) {
154 if(!response.getClass().equals(expectedResponseClass)) {
155 exceptionToPropagate = new IllegalArgumentException(
156 String.format("Unexpected response type {}",
157 response.getClass()));
163 if(exceptionToPropagate != null) {
164 if(propagateException) {
165 // We don't log the exception here to avoid redundant logging since we're
166 // propagating to the caller in MD-SAL core who will log it.
167 returnFuture.setException(exceptionToPropagate);
169 // Since the caller doesn't want us to propagate the exception we'll also
170 // not log it normally. But it's usually not good to totally silence
171 // exceptions so we'll log it to debug level.
172 LOG.debug(String.format("%s failed", message.getClass().getSimpleName()),
173 exceptionToPropagate);
174 returnFuture.set(null);
177 returnFuture.set(null);
180 }, actorContext.getActorSystem().dispatcher());
185 public List<ActorPath> getCohortPaths() {
186 return Collections.unmodifiableList(this.cohortPaths);