Merge "Bug 1965: Fixed DataChangedReply sent to deadletters"
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / main / java / org / opendaylight / controller / cluster / datastore / ThreePhaseCommitCohortProxy.java
1 /*
2  * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.controller.cluster.datastore;
10
11 import akka.actor.ActorSelection;
12 import akka.dispatch.Futures;
13 import akka.dispatch.OnComplete;
14 import com.google.common.annotations.VisibleForTesting;
15 import com.google.common.collect.Lists;
16 import com.google.common.util.concurrent.ListenableFuture;
17 import com.google.common.util.concurrent.SettableFuture;
18 import org.opendaylight.controller.cluster.datastore.messages.AbortTransaction;
19 import org.opendaylight.controller.cluster.datastore.messages.AbortTransactionReply;
20 import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransaction;
21 import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransactionReply;
22 import org.opendaylight.controller.cluster.datastore.messages.CommitTransaction;
23 import org.opendaylight.controller.cluster.datastore.messages.CommitTransactionReply;
24 import org.opendaylight.controller.cluster.datastore.messages.PreCommitTransaction;
25 import org.opendaylight.controller.cluster.datastore.messages.PreCommitTransactionReply;
26 import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
27 import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
28 import org.slf4j.Logger;
29 import org.slf4j.LoggerFactory;
30 import scala.concurrent.Future;
31 import scala.runtime.AbstractFunction1;
32
33 import java.util.Collections;
34 import java.util.List;
35
36 /**
37  * ThreePhaseCommitCohortProxy represents a set of remote cohort proxies
38  */
39 public class ThreePhaseCommitCohortProxy implements DOMStoreThreePhaseCommitCohort{
40
41     private static final Logger LOG = LoggerFactory.getLogger(ThreePhaseCommitCohortProxy.class);
42
43     private final ActorContext actorContext;
44     private final List<Future<ActorSelection>> cohortFutures;
45     private volatile List<ActorSelection> cohorts;
46     private final String transactionId;
47
48     public ThreePhaseCommitCohortProxy(ActorContext actorContext,
49             List<Future<ActorSelection>> cohortFutures, String transactionId) {
50         this.actorContext = actorContext;
51         this.cohortFutures = cohortFutures;
52         this.transactionId = transactionId;
53     }
54
55     private Future<Void> buildCohortList() {
56
57         Future<Iterable<ActorSelection>> combinedFutures = Futures.sequence(cohortFutures,
58                 actorContext.getActorSystem().dispatcher());
59
60         return combinedFutures.transform(new AbstractFunction1<Iterable<ActorSelection>, Void>() {
61             @Override
62             public Void apply(Iterable<ActorSelection> actorSelections) {
63                 cohorts = Lists.newArrayList(actorSelections);
64                 if(LOG.isDebugEnabled()) {
65                     LOG.debug("Tx {} successfully built cohort path list: {}",
66                         transactionId, cohorts);
67                 }
68                 return null;
69             }
70         }, TransactionProxy.SAME_FAILURE_TRANSFORMER, actorContext.getActorSystem().dispatcher());
71     }
72
73     @Override
74     public ListenableFuture<Boolean> canCommit() {
75         if(LOG.isDebugEnabled()) {
76             LOG.debug("Tx {} canCommit", transactionId);
77         }
78         final SettableFuture<Boolean> returnFuture = SettableFuture.create();
79
80         // The first phase of canCommit is to gather the list of cohort actor paths that will
81         // participate in the commit. buildCohortPathsList combines the cohort path Futures into
82         // one Future which we wait on asynchronously here. The cohort actor paths are
83         // extracted from ReadyTransactionReply messages by the Futures that were obtained earlier
84         // and passed to us from upstream processing. If any one fails then  we'll fail canCommit.
85
86         buildCohortList().onComplete(new OnComplete<Void>() {
87             @Override
88             public void onComplete(Throwable failure, Void notUsed) throws Throwable {
89                 if(failure != null) {
90                     if(LOG.isDebugEnabled()) {
91                         LOG.debug("Tx {}: a cohort Future failed: {}", transactionId, failure);
92                     }
93                     returnFuture.setException(failure);
94                 } else {
95                     finishCanCommit(returnFuture);
96                 }
97             }
98         }, actorContext.getActorSystem().dispatcher());
99
100         return returnFuture;
101     }
102
103     private void finishCanCommit(final SettableFuture<Boolean> returnFuture) {
104         if(LOG.isDebugEnabled()) {
105             LOG.debug("Tx {} finishCanCommit", transactionId);
106         }
107         // The last phase of canCommit is to invoke all the cohort actors asynchronously to perform
108         // their canCommit processing. If any one fails then we'll fail canCommit.
109
110         Future<Iterable<Object>> combinedFuture =
111                 invokeCohorts(new CanCommitTransaction().toSerializable());
112
113         combinedFuture.onComplete(new OnComplete<Iterable<Object>>() {
114             @Override
115             public void onComplete(Throwable failure, Iterable<Object> responses) throws Throwable {
116                 if(failure != null) {
117                     if(LOG.isDebugEnabled()) {
118                         LOG.debug("Tx {}: a canCommit cohort Future failed: {}", transactionId, failure);
119                     }
120                     returnFuture.setException(failure);
121                     return;
122                 }
123
124                 boolean result = true;
125                 for(Object response: responses) {
126                     if (response.getClass().equals(CanCommitTransactionReply.SERIALIZABLE_CLASS)) {
127                         CanCommitTransactionReply reply =
128                                 CanCommitTransactionReply.fromSerializable(response);
129                         if (!reply.getCanCommit()) {
130                             result = false;
131                             break;
132                         }
133                     } else {
134                         LOG.error("Unexpected response type {}", response.getClass());
135                         returnFuture.setException(new IllegalArgumentException(
136                                 String.format("Unexpected response type {}", response.getClass())));
137                         return;
138                     }
139                 }
140                 if(LOG.isDebugEnabled()) {
141                     LOG.debug("Tx {}: canCommit returning result: {}", transactionId, result);
142                 }
143                 returnFuture.set(Boolean.valueOf(result));
144             }
145         }, actorContext.getActorSystem().dispatcher());
146     }
147
148     private Future<Iterable<Object>> invokeCohorts(Object message) {
149         List<Future<Object>> futureList = Lists.newArrayListWithCapacity(cohorts.size());
150         for(ActorSelection cohort : cohorts) {
151             if(LOG.isDebugEnabled()) {
152                 LOG.debug("Tx {}: Sending {} to cohort {}", transactionId, message, cohort);
153             }
154
155             futureList.add(actorContext.executeOperationAsync(cohort, message));
156         }
157
158         return Futures.sequence(futureList, actorContext.getActorSystem().dispatcher());
159     }
160
161     @Override
162     public ListenableFuture<Void> preCommit() {
163         return voidOperation("preCommit",  new PreCommitTransaction().toSerializable(),
164                 PreCommitTransactionReply.SERIALIZABLE_CLASS, true);
165     }
166
167     @Override
168     public ListenableFuture<Void> abort() {
169         // Note - we pass false for propagateException. In the front-end data broker, this method
170         // is called when one of the 3 phases fails with an exception. We'd rather have that
171         // original exception propagated to the client. If our abort fails and we propagate the
172         // exception then that exception will supersede and suppress the original exception. But
173         // it's the original exception that is the root cause and of more interest to the client.
174
175         return voidOperation("abort", new AbortTransaction().toSerializable(),
176                 AbortTransactionReply.SERIALIZABLE_CLASS, false);
177     }
178
179     @Override
180     public ListenableFuture<Void> commit() {
181         return voidOperation("commit",  new CommitTransaction().toSerializable(),
182                 CommitTransactionReply.SERIALIZABLE_CLASS, true);
183     }
184
185     private ListenableFuture<Void> voidOperation(final String operationName, final Object message,
186             final Class<?> expectedResponseClass, final boolean propagateException) {
187
188         if(LOG.isDebugEnabled()) {
189             LOG.debug("Tx {} {}", transactionId, operationName);
190         }
191         final SettableFuture<Void> returnFuture = SettableFuture.create();
192
193         // The cohort actor list should already be built at this point by the canCommit phase but,
194         // if not for some reason, we'll try to build it here.
195
196         if(cohorts != null) {
197             finishVoidOperation(operationName, message, expectedResponseClass, propagateException,
198                     returnFuture);
199         } else {
200             buildCohortList().onComplete(new OnComplete<Void>() {
201                 @Override
202                 public void onComplete(Throwable failure, Void notUsed) throws Throwable {
203                     if(failure != null) {
204                         if(LOG.isDebugEnabled()) {
205                             LOG.debug("Tx {}: a {} cohort path Future failed: {}", transactionId,
206                                 operationName, failure);
207                         }
208                         if(propagateException) {
209                             returnFuture.setException(failure);
210                         } else {
211                             returnFuture.set(null);
212                         }
213                     } else {
214                         finishVoidOperation(operationName, message, expectedResponseClass,
215                                 propagateException, returnFuture);
216                     }
217                 }
218             }, actorContext.getActorSystem().dispatcher());
219         }
220
221         return returnFuture;
222     }
223
224     private void finishVoidOperation(final String operationName, final Object message,
225             final Class<?> expectedResponseClass, final boolean propagateException,
226             final SettableFuture<Void> returnFuture) {
227         if(LOG.isDebugEnabled()) {
228             LOG.debug("Tx {} finish {}", transactionId, operationName);
229         }
230         Future<Iterable<Object>> combinedFuture = invokeCohorts(message);
231
232         combinedFuture.onComplete(new OnComplete<Iterable<Object>>() {
233             @Override
234             public void onComplete(Throwable failure, Iterable<Object> responses) throws Throwable {
235
236                 Throwable exceptionToPropagate = failure;
237                 if(exceptionToPropagate == null) {
238                     for(Object response: responses) {
239                         if(!response.getClass().equals(expectedResponseClass)) {
240                             exceptionToPropagate = new IllegalArgumentException(
241                                     String.format("Unexpected response type {}",
242                                             response.getClass()));
243                             break;
244                         }
245                     }
246                 }
247
248                 if(exceptionToPropagate != null) {
249                     if(LOG.isDebugEnabled()) {
250                         LOG.debug("Tx {}: a {} cohort Future failed: {}", transactionId,
251                             operationName, exceptionToPropagate);
252                     }
253                     if(propagateException) {
254                         // We don't log the exception here to avoid redundant logging since we're
255                         // propagating to the caller in MD-SAL core who will log it.
256                         returnFuture.setException(exceptionToPropagate);
257                     } else {
258                         // Since the caller doesn't want us to propagate the exception we'll also
259                         // not log it normally. But it's usually not good to totally silence
260                         // exceptions so we'll log it to debug level.
261                         if(LOG.isDebugEnabled()) {
262                             LOG.debug(String.format("%s failed", message.getClass().getSimpleName()),
263                                 exceptionToPropagate);
264                         }
265                         returnFuture.set(null);
266                     }
267                 } else {
268                     if(LOG.isDebugEnabled()) {
269                         LOG.debug("Tx {}: {} succeeded", transactionId, operationName);
270                     }
271                     returnFuture.set(null);
272                 }
273             }
274         }, actorContext.getActorSystem().dispatcher());
275     }
276
277     @VisibleForTesting
278     List<Future<ActorSelection>> getCohortFutures() {
279         return Collections.unmodifiableList(cohortFutures);
280     }
281 }