Merge "Make sure write transaction cancellation is propagated"
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / main / java / org / opendaylight / controller / cluster / datastore / ThreePhaseCommitCohortProxy.java
1 /*
2  * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.controller.cluster.datastore;
10
11 import akka.actor.ActorSelection;
12 import akka.dispatch.Futures;
13 import akka.dispatch.OnComplete;
14 import com.google.common.annotations.VisibleForTesting;
15 import com.google.common.collect.Lists;
16 import com.google.common.util.concurrent.ListenableFuture;
17 import com.google.common.util.concurrent.SettableFuture;
18 import java.util.Collections;
19 import java.util.List;
20 import org.opendaylight.controller.cluster.datastore.messages.AbortTransaction;
21 import org.opendaylight.controller.cluster.datastore.messages.AbortTransactionReply;
22 import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransaction;
23 import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransactionReply;
24 import org.opendaylight.controller.cluster.datastore.messages.CommitTransaction;
25 import org.opendaylight.controller.cluster.datastore.messages.CommitTransactionReply;
26 import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
27 import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
28 import org.slf4j.Logger;
29 import org.slf4j.LoggerFactory;
30 import scala.concurrent.Future;
31 import scala.runtime.AbstractFunction1;
32
33 /**
34  * ThreePhaseCommitCohortProxy represents a set of remote cohort proxies
35  */
36 public class ThreePhaseCommitCohortProxy implements DOMStoreThreePhaseCommitCohort{
37
38     private static final Logger LOG = LoggerFactory.getLogger(ThreePhaseCommitCohortProxy.class);
39
40     private static final ListenableFuture<Void> IMMEDIATE_SUCCESS =
41             com.google.common.util.concurrent.Futures.immediateFuture(null);
42
43     private final ActorContext actorContext;
44     private final List<Future<ActorSelection>> cohortFutures;
45     private volatile List<ActorSelection> cohorts;
46     private final String transactionId;
47
48     public ThreePhaseCommitCohortProxy(ActorContext actorContext,
49             List<Future<ActorSelection>> cohortFutures, String transactionId) {
50         this.actorContext = actorContext;
51         this.cohortFutures = cohortFutures;
52         this.transactionId = transactionId;
53     }
54
55     private Future<Void> buildCohortList() {
56
57         Future<Iterable<ActorSelection>> combinedFutures = Futures.sequence(cohortFutures,
58                 actorContext.getActorSystem().dispatcher());
59
60         return combinedFutures.transform(new AbstractFunction1<Iterable<ActorSelection>, Void>() {
61             @Override
62             public Void apply(Iterable<ActorSelection> actorSelections) {
63                 cohorts = Lists.newArrayList(actorSelections);
64                 if(LOG.isDebugEnabled()) {
65                     LOG.debug("Tx {} successfully built cohort path list: {}",
66                         transactionId, cohorts);
67                 }
68                 return null;
69             }
70         }, TransactionProxy.SAME_FAILURE_TRANSFORMER, actorContext.getActorSystem().dispatcher());
71     }
72
73     @Override
74     public ListenableFuture<Boolean> canCommit() {
75         if(LOG.isDebugEnabled()) {
76             LOG.debug("Tx {} canCommit", transactionId);
77         }
78         final SettableFuture<Boolean> returnFuture = SettableFuture.create();
79
80         // The first phase of canCommit is to gather the list of cohort actor paths that will
81         // participate in the commit. buildCohortPathsList combines the cohort path Futures into
82         // one Future which we wait on asynchronously here. The cohort actor paths are
83         // extracted from ReadyTransactionReply messages by the Futures that were obtained earlier
84         // and passed to us from upstream processing. If any one fails then  we'll fail canCommit.
85
86         buildCohortList().onComplete(new OnComplete<Void>() {
87             @Override
88             public void onComplete(Throwable failure, Void notUsed) throws Throwable {
89                 if(failure != null) {
90                     if(LOG.isDebugEnabled()) {
91                         LOG.debug("Tx {}: a cohort Future failed: {}", transactionId, failure);
92                     }
93                     returnFuture.setException(failure);
94                 } else {
95                     finishCanCommit(returnFuture);
96                 }
97             }
98         }, actorContext.getActorSystem().dispatcher());
99
100         return returnFuture;
101     }
102
103     private void finishCanCommit(final SettableFuture<Boolean> returnFuture) {
104         if(LOG.isDebugEnabled()) {
105             LOG.debug("Tx {} finishCanCommit", transactionId);
106         }
107         // The last phase of canCommit is to invoke all the cohort actors asynchronously to perform
108         // their canCommit processing. If any one fails then we'll fail canCommit.
109
110         Future<Iterable<Object>> combinedFuture =
111                 invokeCohorts(new CanCommitTransaction(transactionId).toSerializable());
112
113         combinedFuture.onComplete(new OnComplete<Iterable<Object>>() {
114             @Override
115             public void onComplete(Throwable failure, Iterable<Object> responses) throws Throwable {
116                 if(failure != null) {
117                     if(LOG.isDebugEnabled()) {
118                         LOG.debug("Tx {}: a canCommit cohort Future failed: {}", transactionId, failure);
119                     }
120                     returnFuture.setException(failure);
121                     return;
122                 }
123
124                 boolean result = true;
125                 for(Object response: responses) {
126                     if (response.getClass().equals(CanCommitTransactionReply.SERIALIZABLE_CLASS)) {
127                         CanCommitTransactionReply reply =
128                                 CanCommitTransactionReply.fromSerializable(response);
129                         if (!reply.getCanCommit()) {
130                             result = false;
131                             break;
132                         }
133                     } else {
134                         LOG.error("Unexpected response type {}", response.getClass());
135                         returnFuture.setException(new IllegalArgumentException(
136                                 String.format("Unexpected response type %s", response.getClass())));
137                         return;
138                     }
139                 }
140                 if(LOG.isDebugEnabled()) {
141                     LOG.debug("Tx {}: canCommit returning result: {}", transactionId, result);
142                 }
143                 returnFuture.set(Boolean.valueOf(result));
144             }
145         }, actorContext.getActorSystem().dispatcher());
146     }
147
148     private Future<Iterable<Object>> invokeCohorts(Object message) {
149         List<Future<Object>> futureList = Lists.newArrayListWithCapacity(cohorts.size());
150         for(ActorSelection cohort : cohorts) {
151             if(LOG.isDebugEnabled()) {
152                 LOG.debug("Tx {}: Sending {} to cohort {}", transactionId, message, cohort);
153             }
154
155             futureList.add(actorContext.executeOperationAsync(cohort, message));
156         }
157
158         return Futures.sequence(futureList, actorContext.getActorSystem().dispatcher());
159     }
160
161     @Override
162     public ListenableFuture<Void> preCommit() {
163         // We don't need to do anything here - preCommit is done atomically with the commit phase
164         // by the shard.
165         return IMMEDIATE_SUCCESS;
166     }
167
168     @Override
169     public ListenableFuture<Void> abort() {
170         // Note - we pass false for propagateException. In the front-end data broker, this method
171         // is called when one of the 3 phases fails with an exception. We'd rather have that
172         // original exception propagated to the client. If our abort fails and we propagate the
173         // exception then that exception will supersede and suppress the original exception. But
174         // it's the original exception that is the root cause and of more interest to the client.
175
176         return voidOperation("abort", new AbortTransaction(transactionId).toSerializable(),
177                 AbortTransactionReply.SERIALIZABLE_CLASS, false);
178     }
179
180     @Override
181     public ListenableFuture<Void> commit() {
182         return voidOperation("commit",  new CommitTransaction(transactionId).toSerializable(),
183                 CommitTransactionReply.SERIALIZABLE_CLASS, true);
184     }
185
186     private ListenableFuture<Void> voidOperation(final String operationName, final Object message,
187             final Class<?> expectedResponseClass, final boolean propagateException) {
188
189         if(LOG.isDebugEnabled()) {
190             LOG.debug("Tx {} {}", transactionId, operationName);
191         }
192         final SettableFuture<Void> returnFuture = SettableFuture.create();
193
194         // The cohort actor list should already be built at this point by the canCommit phase but,
195         // if not for some reason, we'll try to build it here.
196
197         if(cohorts != null) {
198             finishVoidOperation(operationName, message, expectedResponseClass, propagateException,
199                     returnFuture);
200         } else {
201             buildCohortList().onComplete(new OnComplete<Void>() {
202                 @Override
203                 public void onComplete(Throwable failure, Void notUsed) throws Throwable {
204                     if(failure != null) {
205                         if(LOG.isDebugEnabled()) {
206                             LOG.debug("Tx {}: a {} cohort path Future failed: {}", transactionId,
207                                 operationName, failure);
208                         }
209                         if(propagateException) {
210                             returnFuture.setException(failure);
211                         } else {
212                             returnFuture.set(null);
213                         }
214                     } else {
215                         finishVoidOperation(operationName, message, expectedResponseClass,
216                                 propagateException, returnFuture);
217                     }
218                 }
219             }, actorContext.getActorSystem().dispatcher());
220         }
221
222         return returnFuture;
223     }
224
225     private void finishVoidOperation(final String operationName, final Object message,
226             final Class<?> expectedResponseClass, final boolean propagateException,
227             final SettableFuture<Void> returnFuture) {
228         if(LOG.isDebugEnabled()) {
229             LOG.debug("Tx {} finish {}", transactionId, operationName);
230         }
231         Future<Iterable<Object>> combinedFuture = invokeCohorts(message);
232
233         combinedFuture.onComplete(new OnComplete<Iterable<Object>>() {
234             @Override
235             public void onComplete(Throwable failure, Iterable<Object> responses) throws Throwable {
236
237                 Throwable exceptionToPropagate = failure;
238                 if(exceptionToPropagate == null) {
239                     for(Object response: responses) {
240                         if(!response.getClass().equals(expectedResponseClass)) {
241                             exceptionToPropagate = new IllegalArgumentException(
242                                     String.format("Unexpected response type %s",
243                                             response.getClass()));
244                             break;
245                         }
246                     }
247                 }
248
249                 if(exceptionToPropagate != null) {
250                     if(LOG.isDebugEnabled()) {
251                         LOG.debug("Tx {}: a {} cohort Future failed: {}", transactionId,
252                             operationName, exceptionToPropagate);
253                     }
254                     if(propagateException) {
255                         // We don't log the exception here to avoid redundant logging since we're
256                         // propagating to the caller in MD-SAL core who will log it.
257                         returnFuture.setException(exceptionToPropagate);
258                     } else {
259                         // Since the caller doesn't want us to propagate the exception we'll also
260                         // not log it normally. But it's usually not good to totally silence
261                         // exceptions so we'll log it to debug level.
262                         if(LOG.isDebugEnabled()) {
263                             LOG.debug(String.format("%s failed", message.getClass().getSimpleName()),
264                                 exceptionToPropagate);
265                         }
266                         returnFuture.set(null);
267                     }
268                 } else {
269                     if(LOG.isDebugEnabled()) {
270                         LOG.debug("Tx {}: {} succeeded", transactionId, operationName);
271                     }
272                     returnFuture.set(null);
273                 }
274             }
275         }, actorContext.getActorSystem().dispatcher());
276     }
277
278     @VisibleForTesting
279     List<Future<ActorSelection>> getCohortFutures() {
280         return Collections.unmodifiableList(cohortFutures);
281     }
282 }