5faabb495788c15a8ae8a77d6c85177861c140b8
[openflowplugin.git] / openflowplugin / src / main / java / org / opendaylight / openflowplugin / openflow / md / core / session / MessageDispatchServiceImpl.java
1 /**
2  * Copyright IBM Corporation, 2013.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.openflowplugin.openflow.md.core.session;
9
10 import com.google.common.base.Function;
11 import com.google.common.util.concurrent.Futures;
12 import com.google.common.util.concurrent.JdkFutureAdapters;
13 import com.google.common.util.concurrent.ListenableFuture;
14
15 import org.opendaylight.openflowjava.protocol.api.connection.ConnectionAdapter;
16 import org.opendaylight.openflowplugin.ConnectionException;
17 import org.opendaylight.openflowplugin.api.openflow.md.core.ConnectionConductor;
18 import org.opendaylight.openflowplugin.api.openflow.md.core.SwitchConnectionDistinguisher;
19 import org.opendaylight.openflowplugin.api.openflow.md.core.session.IMessageDispatchService;
20 import org.opendaylight.openflowplugin.api.openflow.md.core.session.SessionContext;
21 import org.opendaylight.openflowplugin.openflow.md.util.RpcResultUtil;
22 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.UpdateFlowOutput;
23 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.UpdateFlowOutputBuilder;
24 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.transaction.rev131103.TransactionId;
25 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.service.rev130918.UpdateGroupOutput;
26 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.service.rev130918.UpdateGroupOutputBuilder;
27 import org.opendaylight.yang.gen.v1.urn.opendaylight.meter.service.rev130918.UpdateMeterOutput;
28 import org.opendaylight.yang.gen.v1.urn.opendaylight.meter.service.rev130918.UpdateMeterOutputBuilder;
29 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.*;
30 import org.opendaylight.yang.gen.v1.urn.opendaylight.port.service.rev131107.UpdatePortOutput;
31 import org.opendaylight.yang.gen.v1.urn.opendaylight.port.service.rev131107.UpdatePortOutputBuilder;
32 import org.opendaylight.yangtools.yang.common.RpcResult;
33 import org.opendaylight.yangtools.yang.common.RpcResultBuilder;
34 import org.slf4j.Logger;
35 import org.slf4j.LoggerFactory;
36
37 import java.math.BigInteger;
38 import java.util.concurrent.Future;
39
40 /**
41  * message dispatch service to send the message to switch.
42  *
43  * @author AnilGujele
44  */
45 public class MessageDispatchServiceImpl implements IMessageDispatchService {
46
47     private static final Logger LOG = LoggerFactory.getLogger(MessageDispatchServiceImpl.class);
48     private SessionContext session;
49
50     /**
51      * constructor
52      *
53      * @param session - MessageDispatchService for this session
54      */
55     public MessageDispatchServiceImpl(SessionContext session) {
56         this.session = session;
57     }
58
59     /**
60      * get proper connection adapter to send the message to switch.
61      *
62      * @param cookie to identify the right connection, it can be null also.
63      * @return connectionAdapter associated with cookie, otherwise return best
64      * suitable connection.
65      */
66
67     private ConnectionAdapter getConnectionAdapter(SwitchConnectionDistinguisher cookie) throws ConnectionException {
68
69         if (!session.isValid()) {
70             LOG.warn("No valid connection found for the node [datapath-id : {}]", session.getSessionKey().getId());
71             throw new ConnectionException(CONNECTION_ERROR_MESSAGE);
72         }
73         LOG.debug("finding connecton for cookie value {}. ", cookie);
74         // set main connection as default
75         ConnectionAdapter connectionAdapter = session.getPrimaryConductor().getConnectionAdapter();
76         if (null != cookie) {
77             ConnectionConductor conductor = session.getAuxiliaryConductor(cookie);
78             // check if auxiliary connection exist
79             if (null != conductor) {
80                 LOG.debug("found auxiliary connection for the cookie.");
81                 connectionAdapter = conductor.getConnectionAdapter();
82             }
83         } else {
84             // TODO: pick connection to utilize all the available connection.
85         }
86         return connectionAdapter;
87     }
88
89     @Override
90     public Future<RpcResult<BarrierOutput>> barrier(BarrierInput input, SwitchConnectionDistinguisher cookie) {
91         try {
92             return getConnectionAdapter(cookie).barrier(input);
93         } catch (ConnectionException e) {
94             return RpcResultUtil.getRpcErrorFuture(e);
95         }
96     }
97
98     @Override
99     public Future<RpcResult<Void>> experimenter(ExperimenterInput input, SwitchConnectionDistinguisher cookie) {
100         try {
101             return getConnectionAdapter(cookie).experimenter(input);
102         } catch (ConnectionException e) {
103             return RpcResultUtil.getRpcErrorFuture(e);
104         }
105     }
106
107     @Override
108     public Future<RpcResult<UpdateFlowOutput>> flowMod(final FlowModInput input, SwitchConnectionDistinguisher cookie) {
109         LOG.debug("Calling OFLibrary flowMod");
110         Future<RpcResult<Void>> response = null;
111         try {
112             response = getConnectionAdapter(cookie).flowMod(input);
113         } catch (ConnectionException e) {
114             return RpcResultUtil.getRpcErrorFuture(e);
115         }
116
117         // appending xid
118         ListenableFuture<RpcResult<UpdateFlowOutput>> xidResult = Futures.transform(
119                 JdkFutureAdapters.listenInPoolThread(response),
120                 new Function<RpcResult<Void>, RpcResult<UpdateFlowOutput>>() {
121
122                     @Override
123                     public RpcResult<UpdateFlowOutput> apply(final RpcResult<Void> inputArg) {
124                         UpdateFlowOutputBuilder flowModOutput = new UpdateFlowOutputBuilder();
125                         BigInteger bigIntXid = BigInteger.valueOf(input.getXid());
126                         flowModOutput.setTransactionId(new TransactionId(bigIntXid));
127
128                         UpdateFlowOutput result = flowModOutput.build();
129                         RpcResult<UpdateFlowOutput> rpcResult = RpcResultBuilder
130                                 .<UpdateFlowOutput>status(inputArg.isSuccessful())
131                                 .withResult(result).withRpcErrors(inputArg.getErrors())
132                                 .build();
133                         return rpcResult;
134                     }
135                 });
136
137         return xidResult;
138     }
139
140     @Override
141     public Future<RpcResult<GetAsyncOutput>> getAsync(GetAsyncInput input, SwitchConnectionDistinguisher cookie) {
142         try {
143             return getConnectionAdapter(cookie).getAsync(input);
144         } catch (ConnectionException e) {
145             return RpcResultUtil.getRpcErrorFuture(e);
146         }
147     }
148
149     @Override
150     public Future<RpcResult<GetConfigOutput>> getConfig(GetConfigInput input, SwitchConnectionDistinguisher cookie) {
151         try {
152             return getConnectionAdapter(cookie).getConfig(input);
153         } catch (ConnectionException e) {
154             return RpcResultUtil.getRpcErrorFuture(e);
155         }
156     }
157
158     @Override
159     public Future<RpcResult<GetFeaturesOutput>> getFeatures(GetFeaturesInput input, SwitchConnectionDistinguisher cookie) {
160         try {
161             return getConnectionAdapter(cookie).getFeatures(input);
162         } catch (ConnectionException e) {
163             return RpcResultUtil.getRpcErrorFuture(e);
164         }
165     }
166
167     @Override
168     public Future<RpcResult<GetQueueConfigOutput>> getQueueConfig(GetQueueConfigInput input,
169                                                                   SwitchConnectionDistinguisher cookie) {
170         try {
171             return getConnectionAdapter(cookie).getQueueConfig(input);
172         } catch (ConnectionException e) {
173             return RpcResultUtil.getRpcErrorFuture(e);
174         }
175     }
176
177     @Override
178     public Future<RpcResult<UpdateGroupOutput>> groupMod(final GroupModInput input, SwitchConnectionDistinguisher cookie) {
179         LOG.debug("Calling OFLibrary groupMod");
180         Future<RpcResult<Void>> response = null;
181         try {
182             response = getConnectionAdapter(cookie).groupMod(input);
183         } catch (ConnectionException e) {
184             return RpcResultUtil.getRpcErrorFuture(e);
185         }
186
187         // appending xid
188         ListenableFuture<RpcResult<UpdateGroupOutput>> xidResult = Futures.transform(
189                 JdkFutureAdapters.listenInPoolThread(response),
190                 new Function<RpcResult<Void>, RpcResult<UpdateGroupOutput>>() {
191
192                     @Override
193                     public RpcResult<UpdateGroupOutput> apply(final RpcResult<Void> inputArg) {
194                         UpdateGroupOutputBuilder groupModOutput = new UpdateGroupOutputBuilder();
195                         BigInteger bigIntXid = BigInteger.valueOf(input.getXid());
196                         groupModOutput.setTransactionId(new TransactionId(bigIntXid));
197
198                         UpdateGroupOutput result = groupModOutput.build();
199                         RpcResult<UpdateGroupOutput> rpcResult = RpcResultBuilder
200                                 .<UpdateGroupOutput>status(inputArg.isSuccessful()).withResult(result)
201                                 .withRpcErrors(inputArg.getErrors()).build();
202                         return rpcResult;
203                     }
204                 });
205
206         return xidResult;
207     }
208
209     @Override
210     public Future<RpcResult<UpdateMeterOutput>> meterMod(final MeterModInput input, SwitchConnectionDistinguisher cookie) {
211         LOG.debug("Calling OFLibrary meterMod");
212         Future<RpcResult<Void>> response = null;
213         try {
214             response = getConnectionAdapter(cookie).meterMod(input);
215         } catch (ConnectionException e) {
216             return RpcResultUtil.getRpcErrorFuture(e);
217         }
218
219         // appending xid
220         ListenableFuture<RpcResult<UpdateMeterOutput>> xidResult = Futures.transform(
221                 JdkFutureAdapters.listenInPoolThread(response),
222                 new Function<RpcResult<Void>, RpcResult<UpdateMeterOutput>>() {
223
224                     @Override
225                     public RpcResult<UpdateMeterOutput> apply(final RpcResult<Void> inputArg) {
226                         UpdateMeterOutputBuilder meterModOutput = new UpdateMeterOutputBuilder();
227                         BigInteger bigIntXid = BigInteger.valueOf(input.getXid());
228                         meterModOutput.setTransactionId(new TransactionId(bigIntXid));
229
230                         UpdateMeterOutput result = meterModOutput.build();
231                         RpcResult<UpdateMeterOutput> rpcResult = RpcResultBuilder
232                                 .<UpdateMeterOutput>status(inputArg.isSuccessful()).withResult(result)
233                                 .withRpcErrors(inputArg.getErrors()).build();
234                         return rpcResult;
235                     }
236                 });
237
238         return xidResult;
239     }
240
241     @Override
242     public Future<RpcResult<java.lang.Void>> multipartRequest(MultipartRequestInput input, SwitchConnectionDistinguisher cookie) {
243         try {
244             return getConnectionAdapter(cookie).multipartRequest(input);
245         } catch (ConnectionException e) {
246             return RpcResultUtil.getRpcErrorFuture(e);
247         }
248     }
249
250     @Override
251     public Future<RpcResult<Void>> packetOut(PacketOutInput input, SwitchConnectionDistinguisher cookie) {
252         try {
253             return getConnectionAdapter(cookie).packetOut(input);
254         } catch (ConnectionException e) {
255             return RpcResultUtil.getRpcErrorFuture(e);
256         }
257     }
258
259     @Override
260     public Future<RpcResult<UpdatePortOutput>> portMod(final PortModInput input, SwitchConnectionDistinguisher cookie) {
261         LOG.debug("Calling OFLibrary portMod");
262         Future<RpcResult<Void>> response = null;
263         try {
264             response = getConnectionAdapter(cookie).portMod(input);
265         } catch (ConnectionException e) {
266             return RpcResultUtil.getRpcErrorFuture(e);
267         }
268
269         // appending xid
270         ListenableFuture<RpcResult<UpdatePortOutput>> xidResult = Futures.transform(
271                 JdkFutureAdapters.listenInPoolThread(response),
272                 new Function<RpcResult<Void>, RpcResult<UpdatePortOutput>>() {
273
274                     @Override
275                     public RpcResult<UpdatePortOutput> apply(final RpcResult<Void> inputArg) {
276                         UpdatePortOutputBuilder portModOutput = new UpdatePortOutputBuilder();
277                         BigInteger bigIntXid = BigInteger.valueOf(input.getXid());
278                         portModOutput.setTransactionId(new TransactionId(bigIntXid));
279
280                         UpdatePortOutput result = portModOutput.build();
281                         RpcResult<UpdatePortOutput> rpcResult = RpcResultBuilder
282                                 .<UpdatePortOutput>status(inputArg.isSuccessful()).withResult(result)
283                                 .withRpcErrors(inputArg.getErrors()).build();
284                         return rpcResult;
285                     }
286                 });
287
288         return xidResult;
289     }
290
291     @Override
292     public Future<RpcResult<RoleRequestOutput>> roleRequest(RoleRequestInput input, SwitchConnectionDistinguisher cookie) {
293         try {
294             return getConnectionAdapter(cookie).roleRequest(input);
295         } catch (ConnectionException e) {
296             return RpcResultUtil.getRpcErrorFuture(e);
297         }
298     }
299
300     @Override
301     public Future<RpcResult<Void>> setAsync(SetAsyncInput input, SwitchConnectionDistinguisher cookie) {
302         try {
303             return getConnectionAdapter(cookie).setAsync(input);
304         } catch (ConnectionException e) {
305             return RpcResultUtil.getRpcErrorFuture(e);
306         }
307     }
308
309     @Override
310     public Future<RpcResult<Void>> setConfig(SetConfigInput input, SwitchConnectionDistinguisher cookie) {
311         try {
312             return getConnectionAdapter(cookie).setConfig(input);
313         } catch (ConnectionException e) {
314             return RpcResultUtil.getRpcErrorFuture(e);
315         }
316     }
317
318     @Override
319     public Future<RpcResult<Void>> tableMod(TableModInput input, SwitchConnectionDistinguisher cookie) {
320         try {
321             return getConnectionAdapter(cookie).tableMod(input);
322         } catch (ConnectionException e) {
323             return RpcResultUtil.getRpcErrorFuture(e);
324         }
325     }
326 }