Merge "BUG 932 - Swagger HTTP POST contains incorrect object"
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / main / java / org / opendaylight / controller / cluster / datastore / ThreePhaseCommitCohortProxy.java
1 /*
2  * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.controller.cluster.datastore;
10
11 import akka.actor.ActorPath;
12 import akka.actor.ActorSelection;
13 import akka.dispatch.Futures;
14 import akka.dispatch.OnComplete;
15
16 import com.google.common.collect.Lists;
17 import com.google.common.util.concurrent.ListenableFuture;
18 import com.google.common.util.concurrent.SettableFuture;
19
20 import org.opendaylight.controller.cluster.datastore.messages.AbortTransaction;
21 import org.opendaylight.controller.cluster.datastore.messages.AbortTransactionReply;
22 import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransaction;
23 import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransactionReply;
24 import org.opendaylight.controller.cluster.datastore.messages.CommitTransaction;
25 import org.opendaylight.controller.cluster.datastore.messages.CommitTransactionReply;
26 import org.opendaylight.controller.cluster.datastore.messages.PreCommitTransaction;
27 import org.opendaylight.controller.cluster.datastore.messages.PreCommitTransactionReply;
28 import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
29 import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
30 import org.slf4j.Logger;
31 import org.slf4j.LoggerFactory;
32
33 import scala.concurrent.Future;
34
35 import java.util.Collections;
36 import java.util.List;
37
38 /**
39  * ThreePhaseCommitCohortProxy represents a set of remote cohort proxies
40  */
41 public class ThreePhaseCommitCohortProxy implements DOMStoreThreePhaseCommitCohort{
42
43     private static final Logger LOG = LoggerFactory.getLogger(DistributedDataStore.class);
44
45     private final ActorContext actorContext;
46     private final List<ActorPath> cohortPaths;
47     private final String transactionId;
48
49     public ThreePhaseCommitCohortProxy(ActorContext actorContext, List<ActorPath> cohortPaths,
50             String transactionId) {
51         this.actorContext = actorContext;
52         this.cohortPaths = cohortPaths;
53         this.transactionId = transactionId;
54     }
55
56     @Override
57     public ListenableFuture<Boolean> canCommit() {
58         LOG.debug("txn {} canCommit", transactionId);
59
60         Future<Iterable<Object>> combinedFuture =
61                 invokeCohorts(new CanCommitTransaction().toSerializable());
62
63         final SettableFuture<Boolean> returnFuture = SettableFuture.create();
64
65         combinedFuture.onComplete(new OnComplete<Iterable<Object>>() {
66             @Override
67             public void onComplete(Throwable failure, Iterable<Object> responses) throws Throwable {
68                 if(failure != null) {
69                     returnFuture.setException(failure);
70                     return;
71                 }
72
73                 boolean result = true;
74                 for(Object response: responses) {
75                     if (response.getClass().equals(CanCommitTransactionReply.SERIALIZABLE_CLASS)) {
76                         CanCommitTransactionReply reply =
77                                 CanCommitTransactionReply.fromSerializable(response);
78                         if (!reply.getCanCommit()) {
79                             result = false;
80                             break;
81                         }
82                     } else {
83                         LOG.error("Unexpected response type {}", response.getClass());
84                         returnFuture.setException(new IllegalArgumentException(
85                                 String.format("Unexpected response type {}", response.getClass())));
86                         return;
87                     }
88                 }
89
90                 returnFuture.set(Boolean.valueOf(result));
91             }
92         }, actorContext.getActorSystem().dispatcher());
93
94         return returnFuture;
95     }
96
97     private Future<Iterable<Object>> invokeCohorts(Object message) {
98         List<Future<Object>> futureList = Lists.newArrayListWithCapacity(cohortPaths.size());
99         for(ActorPath actorPath : cohortPaths) {
100
101             LOG.debug("txn {} Sending {} to {}", transactionId, message, actorPath);
102
103             ActorSelection cohort = actorContext.actorSelection(actorPath);
104
105             futureList.add(actorContext.executeRemoteOperationAsync(cohort, message,
106                     ActorContext.ASK_DURATION));
107         }
108
109         return Futures.sequence(futureList, actorContext.getActorSystem().dispatcher());
110     }
111
112     @Override
113     public ListenableFuture<Void> preCommit() {
114         LOG.debug("txn {} preCommit", transactionId);
115         return voidOperation(new PreCommitTransaction().toSerializable(),
116                 PreCommitTransactionReply.SERIALIZABLE_CLASS, true);
117     }
118
119     @Override
120     public ListenableFuture<Void> abort() {
121         LOG.debug("txn {} abort", transactionId);
122
123         // Note - we pass false for propagateException. In the front-end data broker, this method
124         // is called when one of the 3 phases fails with an exception. We'd rather have that
125         // original exception propagated to the client. If our abort fails and we propagate the
126         // exception then that exception will supersede and suppress the original exception. But
127         // it's the original exception that is the root cause and of more interest to the client.
128
129         return voidOperation(new AbortTransaction().toSerializable(),
130                 AbortTransactionReply.SERIALIZABLE_CLASS, false);
131     }
132
133     @Override
134     public ListenableFuture<Void> commit() {
135         LOG.debug("txn {} commit", transactionId);
136         return voidOperation(new CommitTransaction().toSerializable(),
137                 CommitTransactionReply.SERIALIZABLE_CLASS, true);
138     }
139
140     private ListenableFuture<Void> voidOperation(final Object message,
141             final Class<?> expectedResponseClass, final boolean propagateException) {
142
143         Future<Iterable<Object>> combinedFuture = invokeCohorts(message);
144
145         final SettableFuture<Void> returnFuture = SettableFuture.create();
146
147         combinedFuture.onComplete(new OnComplete<Iterable<Object>>() {
148             @Override
149             public void onComplete(Throwable failure, Iterable<Object> responses) throws Throwable {
150
151                 Throwable exceptionToPropagate = failure;
152                 if(exceptionToPropagate == null) {
153                     for(Object response: responses) {
154                         if(!response.getClass().equals(expectedResponseClass)) {
155                             exceptionToPropagate = new IllegalArgumentException(
156                                     String.format("Unexpected response type {}",
157                                             response.getClass()));
158                             break;
159                         }
160                     }
161                 }
162
163                 if(exceptionToPropagate != null) {
164                     if(propagateException) {
165                         // We don't log the exception here to avoid redundant logging since we're
166                         // propagating to the caller in MD-SAL core who will log it.
167                         returnFuture.setException(exceptionToPropagate);
168                     } else {
169                         // Since the caller doesn't want us to propagate the exception we'll also
170                         // not log it normally. But it's usually not good to totally silence
171                         // exceptions so we'll log it to debug level.
172                         LOG.debug(String.format("%s failed",  message.getClass().getSimpleName()),
173                                 exceptionToPropagate);
174                         returnFuture.set(null);
175                     }
176                 } else {
177                     returnFuture.set(null);
178                 }
179             }
180         }, actorContext.getActorSystem().dispatcher());
181
182         return returnFuture;
183     }
184
185     public List<ActorPath> getCohortPaths() {
186         return Collections.unmodifiableList(this.cohortPaths);
187     }
188 }