BUG-956 deadlock by rpc invocation - phase2
[openflowplugin.git] / openflowplugin / src / main / java / org / opendaylight / openflowplugin / openflow / md / core / session / MessageDispatchServiceImpl.java
1 /**
2  * Copyright IBM Corporation, 2013.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.openflowplugin.openflow.md.core.session;
9
10 import java.math.BigInteger;
11 import java.util.concurrent.Future;
12
13 import org.opendaylight.controller.sal.common.util.Rpcs;
14 import org.opendaylight.openflowjava.protocol.api.connection.ConnectionAdapter;
15 import org.opendaylight.openflowplugin.openflow.md.core.ConnectionConductor;
16 import org.opendaylight.openflowplugin.openflow.md.core.SwitchConnectionDistinguisher;
17 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.UpdateFlowOutput;
18 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.UpdateFlowOutputBuilder;
19 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.transaction.rev131103.TransactionId;
20 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.service.rev130918.UpdateGroupOutput;
21 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.service.rev130918.UpdateGroupOutputBuilder;
22 import org.opendaylight.yang.gen.v1.urn.opendaylight.meter.service.rev130918.UpdateMeterOutput;
23 import org.opendaylight.yang.gen.v1.urn.opendaylight.meter.service.rev130918.UpdateMeterOutputBuilder;
24 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.BarrierInput;
25 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.BarrierOutput;
26 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.ExperimenterInput;
27 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.FlowModInput;
28 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.GetAsyncInput;
29 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.GetAsyncOutput;
30 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.GetConfigInput;
31 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.GetConfigOutput;
32 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.GetFeaturesInput;
33 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.GetFeaturesOutput;
34 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.GetQueueConfigInput;
35 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.GetQueueConfigOutput;
36 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.GroupModInput;
37 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.MeterModInput;
38 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.MultipartRequestInput;
39 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.PacketOutInput;
40 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.PortModInput;
41 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.RoleRequestInput;
42 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.RoleRequestOutput;
43 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.SetAsyncInput;
44 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.SetConfigInput;
45 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.TableModInput;
46 import org.opendaylight.yang.gen.v1.urn.opendaylight.port.service.rev131107.UpdatePortOutput;
47 import org.opendaylight.yang.gen.v1.urn.opendaylight.port.service.rev131107.UpdatePortOutputBuilder;
48 import org.opendaylight.yangtools.yang.common.RpcResult;
49 import org.slf4j.Logger;
50 import org.slf4j.LoggerFactory;
51
52 import com.google.common.base.Function;
53 import com.google.common.util.concurrent.Futures;
54 import com.google.common.util.concurrent.JdkFutureAdapters;
55 import com.google.common.util.concurrent.ListenableFuture;
56
57 /**
58  * message dispatch service to send the message to switch.
59  *
60  * @author AnilGujele
61  *
62  */
63 public class MessageDispatchServiceImpl implements IMessageDispatchService {
64
65     private static final Logger LOG = LoggerFactory.getLogger(MessageDispatchServiceImpl.class);
66
67     private SessionContext session;    
68
69     /**
70      * constructor
71      *
72      * @param session
73      *            - MessageDispatchService for this session
74      */
75     public MessageDispatchServiceImpl(SessionContext session) {
76         this.session = session;        
77     }
78
79     /**
80      * get proper connection adapter to send the message to switch.
81      *
82      * @param cookie to identify the right connection, it can be null also.
83      * @return connectionAdapter associated with cookie, otherwise return best
84      *         suitable connection.
85      *
86      */
87
88     private ConnectionAdapter getConnectionAdapter(SwitchConnectionDistinguisher cookie) {
89
90         if (!session.isValid()) {
91             LOG.warn("Session for the cookie {} is invalid.", cookie);
92             throw new IllegalArgumentException("Session for the cookie is invalid. Reason: "
93                     + "the switch has been recently disconnected OR inventory provides outdated information.");
94         }
95         LOG.debug("finding connecton for cookie value {}. ", cookie);
96         // set main connection as default
97         ConnectionAdapter connectionAdapter = session.getPrimaryConductor().getConnectionAdapter();
98         if (null != cookie) {
99             ConnectionConductor conductor = session.getAuxiliaryConductor(cookie);
100             // check if auxiliary connection exist
101             if (null != conductor) {
102                 LOG.debug("found auxiliary connection for the cookie.");
103                 connectionAdapter = conductor.getConnectionAdapter();
104             }
105         } else {
106             // TODO: pick connection to utilize all the available connection.
107         }
108         return connectionAdapter;
109     }
110
111     @Override
112     public Future<RpcResult<BarrierOutput>> barrier(BarrierInput input, SwitchConnectionDistinguisher cookie) {  
113         return getConnectionAdapter(cookie).barrier(input);
114     }
115
116     @Override
117     public Future<RpcResult<Void>> experimenter(ExperimenterInput input, SwitchConnectionDistinguisher cookie) {
118         return getConnectionAdapter(cookie).experimenter(input);
119     }
120
121     @Override
122     public Future<RpcResult<UpdateFlowOutput>> flowMod(final FlowModInput input, SwitchConnectionDistinguisher cookie) {
123         LOG.debug("Calling OFLibrary flowMod");
124         Future<RpcResult<Void>> response = getConnectionAdapter(cookie).flowMod(input);
125
126         // appending xid
127         ListenableFuture<RpcResult<UpdateFlowOutput>> xidResult = Futures.transform(
128                 JdkFutureAdapters.listenInPoolThread(response), 
129                 new Function<RpcResult<Void>,RpcResult<UpdateFlowOutput>>() {
130
131             @Override
132             public RpcResult<UpdateFlowOutput> apply(final RpcResult<Void> inputArg) {
133                 UpdateFlowOutputBuilder flowModOutput = new UpdateFlowOutputBuilder();        
134                 BigInteger bigIntXid = BigInteger.valueOf(input.getXid()) ;
135                 flowModOutput.setTransactionId(new TransactionId(bigIntXid));
136
137                 UpdateFlowOutput result = flowModOutput.build();
138                 RpcResult<UpdateFlowOutput> rpcResult = Rpcs.getRpcResult(
139                         inputArg.isSuccessful(), result, inputArg.getErrors());
140                 return rpcResult;
141             }
142         });
143         
144         return xidResult;
145     }
146
147     @Override
148     public Future<RpcResult<GetAsyncOutput>> getAsync(GetAsyncInput input, SwitchConnectionDistinguisher cookie) {
149         return getConnectionAdapter(cookie).getAsync(input);
150     }
151
152     @Override
153     public Future<RpcResult<GetConfigOutput>> getConfig(GetConfigInput input, SwitchConnectionDistinguisher cookie) {
154         return getConnectionAdapter(cookie).getConfig(input);
155     }
156
157     @Override
158     public Future<RpcResult<GetFeaturesOutput>> getFeatures(GetFeaturesInput input, SwitchConnectionDistinguisher cookie) {
159         return getConnectionAdapter(cookie).getFeatures(input);
160     }
161
162     @Override
163     public Future<RpcResult<GetQueueConfigOutput>> getQueueConfig(GetQueueConfigInput input,
164             SwitchConnectionDistinguisher cookie) {
165         return getConnectionAdapter(cookie).getQueueConfig(input);
166     }
167
168     @Override
169     public Future<RpcResult<UpdateGroupOutput>> groupMod(final GroupModInput input, SwitchConnectionDistinguisher cookie) {        
170         LOG.debug("Calling OFLibrary groupMod");
171         Future<RpcResult<Void>> response = getConnectionAdapter(cookie).groupMod(input);
172
173         // appending xid
174         ListenableFuture<RpcResult<UpdateGroupOutput>> xidResult = Futures.transform(
175                 JdkFutureAdapters.listenInPoolThread(response), 
176                 new Function<RpcResult<Void>,RpcResult<UpdateGroupOutput>>() {
177
178             @Override
179             public RpcResult<UpdateGroupOutput> apply(final RpcResult<Void> inputArg) {
180                 UpdateGroupOutputBuilder groupModOutput = new UpdateGroupOutputBuilder();      
181                 BigInteger bigIntXid = BigInteger.valueOf(input.getXid());
182                 groupModOutput.setTransactionId(new TransactionId(bigIntXid));
183
184                 UpdateGroupOutput result = groupModOutput.build();
185                 RpcResult<UpdateGroupOutput> rpcResult = Rpcs.getRpcResult(
186                         inputArg.isSuccessful(), result, inputArg.getErrors());
187                 return rpcResult;
188             }
189         });
190         
191         return xidResult;
192     }
193
194     @Override
195     public Future<RpcResult<UpdateMeterOutput>> meterMod(final MeterModInput input, SwitchConnectionDistinguisher cookie) {
196         LOG.debug("Calling OFLibrary meterMod");
197         Future<RpcResult<Void>> response = getConnectionAdapter(cookie).meterMod(input);
198
199         // appending xid
200         ListenableFuture<RpcResult<UpdateMeterOutput>> xidResult = Futures.transform(
201                 JdkFutureAdapters.listenInPoolThread(response), 
202                 new Function<RpcResult<Void>,RpcResult<UpdateMeterOutput>>() {
203
204             @Override
205             public RpcResult<UpdateMeterOutput> apply(final RpcResult<Void> inputArg) {
206                 UpdateMeterOutputBuilder meterModOutput = new UpdateMeterOutputBuilder();       
207                 BigInteger bigIntXid = BigInteger.valueOf(input.getXid());
208                 meterModOutput.setTransactionId(new TransactionId(bigIntXid));
209                 
210                 UpdateMeterOutput result = meterModOutput.build();
211                 RpcResult<UpdateMeterOutput> rpcResult = Rpcs.getRpcResult(
212                         inputArg.isSuccessful(), result, inputArg.getErrors());
213                 return rpcResult;
214             }
215         });
216         
217         return xidResult;
218     }
219
220     @Override
221     public Future<RpcResult<java.lang.Void>> multipartRequest(MultipartRequestInput input, SwitchConnectionDistinguisher cookie) {
222         return getConnectionAdapter(cookie).multipartRequest(input);
223     }
224
225     @Override
226     public Future<RpcResult<Void>> packetOut(PacketOutInput input, SwitchConnectionDistinguisher cookie) {
227         return getConnectionAdapter(cookie).packetOut(input);
228     }
229
230     @Override
231     public Future<RpcResult<UpdatePortOutput>> portMod(final PortModInput input, SwitchConnectionDistinguisher cookie) {
232         LOG.debug("Calling OFLibrary portMod");
233         Future<RpcResult<Void>> response = getConnectionAdapter(cookie).portMod(input);
234         
235         // appending xid
236         ListenableFuture<RpcResult<UpdatePortOutput>> xidResult = Futures.transform(
237                 JdkFutureAdapters.listenInPoolThread(response), 
238                 new Function<RpcResult<Void>,RpcResult<UpdatePortOutput>>() {
239
240             @Override
241             public RpcResult<UpdatePortOutput> apply(final RpcResult<Void> inputArg) {
242                 UpdatePortOutputBuilder portModOutput = new UpdatePortOutputBuilder();
243                 BigInteger bigIntXid = BigInteger.valueOf(input.getXid());
244                 portModOutput.setTransactionId(new TransactionId(bigIntXid));
245                 
246                 UpdatePortOutput result = portModOutput.build();
247                 RpcResult<UpdatePortOutput> rpcResult = Rpcs.getRpcResult(
248                         inputArg.isSuccessful(), result, inputArg.getErrors());
249                 return rpcResult;
250             }
251         });
252         
253         return xidResult;
254     }
255
256     @Override
257     public Future<RpcResult<RoleRequestOutput>> roleRequest(RoleRequestInput input, SwitchConnectionDistinguisher cookie) {
258         return getConnectionAdapter(cookie).roleRequest(input);
259     }
260
261     @Override
262     public Future<RpcResult<Void>> setAsync(SetAsyncInput input, SwitchConnectionDistinguisher cookie) {
263         return getConnectionAdapter(cookie).setAsync(input);
264     }
265
266     @Override
267     public Future<RpcResult<Void>> setConfig(SetConfigInput input, SwitchConnectionDistinguisher cookie) {
268         return getConnectionAdapter(cookie).setConfig(input);
269     }
270
271     @Override
272     public Future<RpcResult<Void>> tableMod(TableModInput input, SwitchConnectionDistinguisher cookie) {
273         return getConnectionAdapter(cookie).tableMod(input);
274     }
275 }