Merge "BUG-1958 Fix leafref path in config-yang module."
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / main / java / org / opendaylight / controller / cluster / datastore / ThreePhaseCommitCohortProxy.java
1 /*
2  * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.controller.cluster.datastore;
10
11 import akka.actor.ActorPath;
12 import akka.actor.ActorSelection;
13 import akka.dispatch.Futures;
14 import akka.dispatch.OnComplete;
15
16 import com.google.common.annotations.VisibleForTesting;
17 import com.google.common.collect.Lists;
18 import com.google.common.util.concurrent.ListenableFuture;
19 import com.google.common.util.concurrent.SettableFuture;
20
21 import org.opendaylight.controller.cluster.datastore.messages.AbortTransaction;
22 import org.opendaylight.controller.cluster.datastore.messages.AbortTransactionReply;
23 import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransaction;
24 import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransactionReply;
25 import org.opendaylight.controller.cluster.datastore.messages.CommitTransaction;
26 import org.opendaylight.controller.cluster.datastore.messages.CommitTransactionReply;
27 import org.opendaylight.controller.cluster.datastore.messages.PreCommitTransaction;
28 import org.opendaylight.controller.cluster.datastore.messages.PreCommitTransactionReply;
29 import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
30 import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
31 import org.slf4j.Logger;
32 import org.slf4j.LoggerFactory;
33
34 import scala.concurrent.Future;
35 import scala.runtime.AbstractFunction1;
36
37 import java.util.Collections;
38 import java.util.List;
39
40 /**
41  * ThreePhaseCommitCohortProxy represents a set of remote cohort proxies
42  */
43 public class ThreePhaseCommitCohortProxy implements DOMStoreThreePhaseCommitCohort{
44
45     private static final Logger LOG = LoggerFactory.getLogger(ThreePhaseCommitCohortProxy.class);
46
47     private final ActorContext actorContext;
48     private final List<Future<ActorPath>> cohortPathFutures;
49     private volatile List<ActorPath> cohortPaths;
50     private final String transactionId;
51
52     public ThreePhaseCommitCohortProxy(ActorContext actorContext,
53             List<Future<ActorPath>> cohortPathFutures, String transactionId) {
54         this.actorContext = actorContext;
55         this.cohortPathFutures = cohortPathFutures;
56         this.transactionId = transactionId;
57     }
58
59     private Future<Void> buildCohortPathsList() {
60
61         Future<Iterable<ActorPath>> combinedFutures = Futures.sequence(cohortPathFutures,
62                 actorContext.getActorSystem().dispatcher());
63
64         return combinedFutures.transform(new AbstractFunction1<Iterable<ActorPath>, Void>() {
65             @Override
66             public Void apply(Iterable<ActorPath> paths) {
67                 cohortPaths = Lists.newArrayList(paths);
68                 if(LOG.isDebugEnabled()) {
69                     LOG.debug("Tx {} successfully built cohort path list: {}",
70                         transactionId, cohortPaths);
71                 }
72                 return null;
73             }
74         }, TransactionProxy.SAME_FAILURE_TRANSFORMER, actorContext.getActorSystem().dispatcher());
75     }
76
77     @Override
78     public ListenableFuture<Boolean> canCommit() {
79         if(LOG.isDebugEnabled()) {
80             LOG.debug("Tx {} canCommit", transactionId);
81         }
82         final SettableFuture<Boolean> returnFuture = SettableFuture.create();
83
84         // The first phase of canCommit is to gather the list of cohort actor paths that will
85         // participate in the commit. buildCohortPathsList combines the cohort path Futures into
86         // one Future which we wait on asynchronously here. The cohort actor paths are
87         // extracted from ReadyTransactionReply messages by the Futures that were obtained earlier
88         // and passed to us from upstream processing. If any one fails then  we'll fail canCommit.
89
90         buildCohortPathsList().onComplete(new OnComplete<Void>() {
91             @Override
92             public void onComplete(Throwable failure, Void notUsed) throws Throwable {
93                 if(failure != null) {
94                     if(LOG.isDebugEnabled()) {
95                         LOG.debug("Tx {}: a cohort path Future failed: {}", transactionId, failure);
96                     }
97                     returnFuture.setException(failure);
98                 } else {
99                     finishCanCommit(returnFuture);
100                 }
101             }
102         }, actorContext.getActorSystem().dispatcher());
103
104         return returnFuture;
105     }
106
107     private void finishCanCommit(final SettableFuture<Boolean> returnFuture) {
108         if(LOG.isDebugEnabled()) {
109             LOG.debug("Tx {} finishCanCommit", transactionId);
110         }
111         // The last phase of canCommit is to invoke all the cohort actors asynchronously to perform
112         // their canCommit processing. If any one fails then we'll fail canCommit.
113
114         Future<Iterable<Object>> combinedFuture =
115                 invokeCohorts(new CanCommitTransaction().toSerializable());
116
117         combinedFuture.onComplete(new OnComplete<Iterable<Object>>() {
118             @Override
119             public void onComplete(Throwable failure, Iterable<Object> responses) throws Throwable {
120                 if(failure != null) {
121                     if(LOG.isDebugEnabled()) {
122                         LOG.debug("Tx {}: a canCommit cohort Future failed: {}", transactionId, failure);
123                     }
124                     returnFuture.setException(failure);
125                     return;
126                 }
127
128                 boolean result = true;
129                 for(Object response: responses) {
130                     if (response.getClass().equals(CanCommitTransactionReply.SERIALIZABLE_CLASS)) {
131                         CanCommitTransactionReply reply =
132                                 CanCommitTransactionReply.fromSerializable(response);
133                         if (!reply.getCanCommit()) {
134                             result = false;
135                             break;
136                         }
137                     } else {
138                         LOG.error("Unexpected response type {}", response.getClass());
139                         returnFuture.setException(new IllegalArgumentException(
140                                 String.format("Unexpected response type {}", response.getClass())));
141                         return;
142                     }
143                 }
144                 if(LOG.isDebugEnabled()) {
145                     LOG.debug("Tx {}: canCommit returning result: {}", transactionId, result);
146                 }
147                 returnFuture.set(Boolean.valueOf(result));
148             }
149         }, actorContext.getActorSystem().dispatcher());
150     }
151
152     private Future<Iterable<Object>> invokeCohorts(Object message) {
153         List<Future<Object>> futureList = Lists.newArrayListWithCapacity(cohortPaths.size());
154         for(ActorPath actorPath : cohortPaths) {
155             if(LOG.isDebugEnabled()) {
156                 LOG.debug("Tx {}: Sending {} to cohort {}", transactionId, message, actorPath);
157             }
158             ActorSelection cohort = actorContext.actorSelection(actorPath);
159
160             futureList.add(actorContext.executeOperationAsync(cohort, message));
161         }
162
163         return Futures.sequence(futureList, actorContext.getActorSystem().dispatcher());
164     }
165
166     @Override
167     public ListenableFuture<Void> preCommit() {
168         return voidOperation("preCommit",  new PreCommitTransaction().toSerializable(),
169                 PreCommitTransactionReply.SERIALIZABLE_CLASS, true);
170     }
171
172     @Override
173     public ListenableFuture<Void> abort() {
174         // Note - we pass false for propagateException. In the front-end data broker, this method
175         // is called when one of the 3 phases fails with an exception. We'd rather have that
176         // original exception propagated to the client. If our abort fails and we propagate the
177         // exception then that exception will supersede and suppress the original exception. But
178         // it's the original exception that is the root cause and of more interest to the client.
179
180         return voidOperation("abort", new AbortTransaction().toSerializable(),
181                 AbortTransactionReply.SERIALIZABLE_CLASS, false);
182     }
183
184     @Override
185     public ListenableFuture<Void> commit() {
186         return voidOperation("commit",  new CommitTransaction().toSerializable(),
187                 CommitTransactionReply.SERIALIZABLE_CLASS, true);
188     }
189
190     private ListenableFuture<Void> voidOperation(final String operationName, final Object message,
191             final Class<?> expectedResponseClass, final boolean propagateException) {
192
193         if(LOG.isDebugEnabled()) {
194             LOG.debug("Tx {} {}", transactionId, operationName);
195         }
196         final SettableFuture<Void> returnFuture = SettableFuture.create();
197
198         // The cohort actor list should already be built at this point by the canCommit phase but,
199         // if not for some reason, we'll try to build it here.
200
201         if(cohortPaths != null) {
202             finishVoidOperation(operationName, message, expectedResponseClass, propagateException,
203                     returnFuture);
204         } else {
205             buildCohortPathsList().onComplete(new OnComplete<Void>() {
206                 @Override
207                 public void onComplete(Throwable failure, Void notUsed) throws Throwable {
208                     if(failure != null) {
209                         if(LOG.isDebugEnabled()) {
210                             LOG.debug("Tx {}: a {} cohort path Future failed: {}", transactionId,
211                                 operationName, failure);
212                         }
213                         if(propagateException) {
214                             returnFuture.setException(failure);
215                         } else {
216                             returnFuture.set(null);
217                         }
218                     } else {
219                         finishVoidOperation(operationName, message, expectedResponseClass,
220                                 propagateException, returnFuture);
221                     }
222                 }
223             }, actorContext.getActorSystem().dispatcher());
224         }
225
226         return returnFuture;
227     }
228
229     private void finishVoidOperation(final String operationName, final Object message,
230             final Class<?> expectedResponseClass, final boolean propagateException,
231             final SettableFuture<Void> returnFuture) {
232         if(LOG.isDebugEnabled()) {
233             LOG.debug("Tx {} finish {}", transactionId, operationName);
234         }
235         Future<Iterable<Object>> combinedFuture = invokeCohorts(message);
236
237         combinedFuture.onComplete(new OnComplete<Iterable<Object>>() {
238             @Override
239             public void onComplete(Throwable failure, Iterable<Object> responses) throws Throwable {
240
241                 Throwable exceptionToPropagate = failure;
242                 if(exceptionToPropagate == null) {
243                     for(Object response: responses) {
244                         if(!response.getClass().equals(expectedResponseClass)) {
245                             exceptionToPropagate = new IllegalArgumentException(
246                                     String.format("Unexpected response type {}",
247                                             response.getClass()));
248                             break;
249                         }
250                     }
251                 }
252
253                 if(exceptionToPropagate != null) {
254                     if(LOG.isDebugEnabled()) {
255                         LOG.debug("Tx {}: a {} cohort Future failed: {}", transactionId,
256                             operationName, exceptionToPropagate);
257                     }
258                     if(propagateException) {
259                         // We don't log the exception here to avoid redundant logging since we're
260                         // propagating to the caller in MD-SAL core who will log it.
261                         returnFuture.setException(exceptionToPropagate);
262                     } else {
263                         // Since the caller doesn't want us to propagate the exception we'll also
264                         // not log it normally. But it's usually not good to totally silence
265                         // exceptions so we'll log it to debug level.
266                         if(LOG.isDebugEnabled()) {
267                             LOG.debug(String.format("%s failed", message.getClass().getSimpleName()),
268                                 exceptionToPropagate);
269                         }
270                         returnFuture.set(null);
271                     }
272                 } else {
273                     if(LOG.isDebugEnabled()) {
274                         LOG.debug("Tx {}: {} succeeded", transactionId, operationName);
275                     }
276                     returnFuture.set(null);
277                 }
278             }
279         }, actorContext.getActorSystem().dispatcher());
280     }
281
282     @VisibleForTesting
283     List<Future<ActorPath>> getCohortPathFutures() {
284         return Collections.unmodifiableList(cohortPathFutures);
285     }
286 }