Bug 1588 - OFConstants.java moved to openflowplugin-api module
[openflowplugin.git] / openflowplugin / src / main / java / org / opendaylight / openflowplugin / openflow / md / core / session / MessageDispatchServiceImpl.java
1 /**
2  * Copyright IBM Corporation, 2013.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.openflowplugin.openflow.md.core.session;
9
10 import com.google.common.base.Function;
11 import com.google.common.util.concurrent.Futures;
12 import com.google.common.util.concurrent.JdkFutureAdapters;
13 import com.google.common.util.concurrent.ListenableFuture;
14 import com.google.common.util.concurrent.SettableFuture;
15 import org.opendaylight.controller.sal.common.util.RpcErrors;
16 import org.opendaylight.controller.sal.common.util.Rpcs;
17 import org.opendaylight.openflowjava.protocol.api.connection.ConnectionAdapter;
18 import org.opendaylight.openflowplugin.ConnectionException;
19 import org.opendaylight.openflowplugin.api.OFConstants;
20 import org.opendaylight.openflowplugin.openflow.md.core.ConnectionConductor;
21 import org.opendaylight.openflowplugin.openflow.md.core.SwitchConnectionDistinguisher;
22 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.UpdateFlowOutput;
23 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.UpdateFlowOutputBuilder;
24 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.transaction.rev131103.TransactionId;
25 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.service.rev130918.UpdateGroupOutput;
26 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.service.rev130918.UpdateGroupOutputBuilder;
27 import org.opendaylight.yang.gen.v1.urn.opendaylight.meter.service.rev130918.UpdateMeterOutput;
28 import org.opendaylight.yang.gen.v1.urn.opendaylight.meter.service.rev130918.UpdateMeterOutputBuilder;
29 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.*;
30 import org.opendaylight.yang.gen.v1.urn.opendaylight.port.service.rev131107.UpdatePortOutput;
31 import org.opendaylight.yang.gen.v1.urn.opendaylight.port.service.rev131107.UpdatePortOutputBuilder;
32 import org.opendaylight.yangtools.yang.common.RpcError;
33 import org.opendaylight.yangtools.yang.common.RpcResult;
34 import org.slf4j.Logger;
35 import org.slf4j.LoggerFactory;
36
37 import java.math.BigInteger;
38 import java.util.ArrayList;
39 import java.util.List;
40 import java.util.concurrent.Future;
41
42 /**
43  * message dispatch service to send the message to switch.
44  *
45  * @author AnilGujele
46  */
47 public class MessageDispatchServiceImpl implements IMessageDispatchService {
48
49     private static final Logger LOG = LoggerFactory.getLogger(MessageDispatchServiceImpl.class);
50     private static final String CONNECTION_ERROR_MESSAGE = "Session for the cookie is invalid. Reason: "
51             + "the switch has been recently disconnected OR inventory provides outdated information.";
52
53     private SessionContext session;
54
55     /**
56      * constructor
57      *
58      * @param session - MessageDispatchService for this session
59      */
60     public MessageDispatchServiceImpl(SessionContext session) {
61         this.session = session;
62     }
63
64     /**
65      * get proper connection adapter to send the message to switch.
66      *
67      * @param cookie to identify the right connection, it can be null also.
68      * @return connectionAdapter associated with cookie, otherwise return best
69      * suitable connection.
70      */
71
72     private ConnectionAdapter getConnectionAdapter(SwitchConnectionDistinguisher cookie) throws ConnectionException {
73
74         if (!session.isValid()) {
75             LOG.warn("Session for the cookie {} is invalid.", cookie);
76             throw new ConnectionException(CONNECTION_ERROR_MESSAGE);
77         }
78         LOG.debug("finding connecton for cookie value {}. ", cookie);
79         // set main connection as default
80         ConnectionAdapter connectionAdapter = session.getPrimaryConductor().getConnectionAdapter();
81         if (null != cookie) {
82             ConnectionConductor conductor = session.getAuxiliaryConductor(cookie);
83             // check if auxiliary connection exist
84             if (null != conductor) {
85                 LOG.debug("found auxiliary connection for the cookie.");
86                 connectionAdapter = conductor.getConnectionAdapter();
87             }
88         } else {
89             // TODO: pick connection to utilize all the available connection.
90         }
91         return connectionAdapter;
92     }
93
94     @Override
95     public Future<RpcResult<BarrierOutput>> barrier(BarrierInput input, SwitchConnectionDistinguisher cookie) {
96         try {
97             return getConnectionAdapter(cookie).barrier(input);
98         } catch (ConnectionException e) {
99             return getRpcErrorFuture(e);
100         }
101     }
102
103     private <T> SettableFuture<RpcResult<T>> getRpcErrorFuture(ConnectionException e) {
104         List<RpcError> rpcErrorList = getConnectionErrorAsRpcErrors(e);
105         SettableFuture<RpcResult<T>> futureWithError = SettableFuture.create();
106         futureWithError.set(Rpcs.<T>getRpcResult(false, rpcErrorList));
107         return futureWithError;
108     }
109
110     private List<RpcError> getConnectionErrorAsRpcErrors(ConnectionException e) {
111         List<RpcError> rpcErrorList = new ArrayList<>();
112         rpcErrorList.add(RpcErrors.getRpcError(OFConstants.APPLICATION_TAG,
113                 OFConstants.ERROR_TAG_TIMEOUT,
114                 CONNECTION_ERROR_MESSAGE,
115                 RpcError.ErrorSeverity.WARNING,
116                 e.getMessage(),
117                 RpcError.ErrorType.TRANSPORT,
118                 e.getCause()));
119         return rpcErrorList;
120     }
121
122     @Override
123     public Future<RpcResult<Void>> experimenter(ExperimenterInput input, SwitchConnectionDistinguisher cookie) {
124         try {
125             return getConnectionAdapter(cookie).experimenter(input);
126         } catch (ConnectionException e) {
127             return getRpcErrorFuture(e);
128         }
129     }
130
131     @Override
132     public Future<RpcResult<UpdateFlowOutput>> flowMod(final FlowModInput input, SwitchConnectionDistinguisher cookie) {
133         LOG.debug("Calling OFLibrary flowMod");
134         Future<RpcResult<Void>> response = null;
135         try {
136             response = getConnectionAdapter(cookie).flowMod(input);
137         } catch (ConnectionException e) {
138             return getRpcErrorFuture(e);
139         }
140
141         // appending xid
142         ListenableFuture<RpcResult<UpdateFlowOutput>> xidResult = Futures.transform(
143                 JdkFutureAdapters.listenInPoolThread(response),
144                 new Function<RpcResult<Void>, RpcResult<UpdateFlowOutput>>() {
145
146                     @Override
147                     public RpcResult<UpdateFlowOutput> apply(final RpcResult<Void> inputArg) {
148                         UpdateFlowOutputBuilder flowModOutput = new UpdateFlowOutputBuilder();
149                         BigInteger bigIntXid = BigInteger.valueOf(input.getXid());
150                         flowModOutput.setTransactionId(new TransactionId(bigIntXid));
151
152                         UpdateFlowOutput result = flowModOutput.build();
153                         RpcResult<UpdateFlowOutput> rpcResult = Rpcs.getRpcResult(
154                                 inputArg.isSuccessful(), result, inputArg.getErrors());
155                         return rpcResult;
156                     }
157                 });
158
159         return xidResult;
160     }
161
162     @Override
163     public Future<RpcResult<GetAsyncOutput>> getAsync(GetAsyncInput input, SwitchConnectionDistinguisher cookie) {
164         try {
165             return getConnectionAdapter(cookie).getAsync(input);
166         } catch (ConnectionException e) {
167             return getRpcErrorFuture(e);
168         }
169     }
170
171     @Override
172     public Future<RpcResult<GetConfigOutput>> getConfig(GetConfigInput input, SwitchConnectionDistinguisher cookie) {
173         try {
174             return getConnectionAdapter(cookie).getConfig(input);
175         } catch (ConnectionException e) {
176             return getRpcErrorFuture(e);
177         }
178     }
179
180     @Override
181     public Future<RpcResult<GetFeaturesOutput>> getFeatures(GetFeaturesInput input, SwitchConnectionDistinguisher cookie) {
182         try {
183             return getConnectionAdapter(cookie).getFeatures(input);
184         } catch (ConnectionException e) {
185             return getRpcErrorFuture(e);
186         }
187     }
188
189     @Override
190     public Future<RpcResult<GetQueueConfigOutput>> getQueueConfig(GetQueueConfigInput input,
191                                                                   SwitchConnectionDistinguisher cookie) {
192         try {
193             return getConnectionAdapter(cookie).getQueueConfig(input);
194         } catch (ConnectionException e) {
195             return getRpcErrorFuture(e);
196         }
197     }
198
199     @Override
200     public Future<RpcResult<UpdateGroupOutput>> groupMod(final GroupModInput input, SwitchConnectionDistinguisher cookie) {
201         LOG.debug("Calling OFLibrary groupMod");
202         Future<RpcResult<Void>> response = null;
203         try {
204             response = getConnectionAdapter(cookie).groupMod(input);
205         } catch (ConnectionException e) {
206             return getRpcErrorFuture(e);
207         }
208
209         // appending xid
210         ListenableFuture<RpcResult<UpdateGroupOutput>> xidResult = Futures.transform(
211                 JdkFutureAdapters.listenInPoolThread(response),
212                 new Function<RpcResult<Void>, RpcResult<UpdateGroupOutput>>() {
213
214                     @Override
215                     public RpcResult<UpdateGroupOutput> apply(final RpcResult<Void> inputArg) {
216                         UpdateGroupOutputBuilder groupModOutput = new UpdateGroupOutputBuilder();
217                         BigInteger bigIntXid = BigInteger.valueOf(input.getXid());
218                         groupModOutput.setTransactionId(new TransactionId(bigIntXid));
219
220                         UpdateGroupOutput result = groupModOutput.build();
221                         RpcResult<UpdateGroupOutput> rpcResult = Rpcs.getRpcResult(
222                                 inputArg.isSuccessful(), result, inputArg.getErrors());
223                         return rpcResult;
224                     }
225                 });
226
227         return xidResult;
228     }
229
230     @Override
231     public Future<RpcResult<UpdateMeterOutput>> meterMod(final MeterModInput input, SwitchConnectionDistinguisher cookie) {
232         LOG.debug("Calling OFLibrary meterMod");
233         Future<RpcResult<Void>> response = null;
234         try {
235             response = getConnectionAdapter(cookie).meterMod(input);
236         } catch (ConnectionException e) {
237             return getRpcErrorFuture(e);
238         }
239
240         // appending xid
241         ListenableFuture<RpcResult<UpdateMeterOutput>> xidResult = Futures.transform(
242                 JdkFutureAdapters.listenInPoolThread(response),
243                 new Function<RpcResult<Void>, RpcResult<UpdateMeterOutput>>() {
244
245                     @Override
246                     public RpcResult<UpdateMeterOutput> apply(final RpcResult<Void> inputArg) {
247                         UpdateMeterOutputBuilder meterModOutput = new UpdateMeterOutputBuilder();
248                         BigInteger bigIntXid = BigInteger.valueOf(input.getXid());
249                         meterModOutput.setTransactionId(new TransactionId(bigIntXid));
250
251                         UpdateMeterOutput result = meterModOutput.build();
252                         RpcResult<UpdateMeterOutput> rpcResult = Rpcs.getRpcResult(
253                                 inputArg.isSuccessful(), result, inputArg.getErrors());
254                         return rpcResult;
255                     }
256                 });
257
258         return xidResult;
259     }
260
261     @Override
262     public Future<RpcResult<java.lang.Void>> multipartRequest(MultipartRequestInput input, SwitchConnectionDistinguisher cookie) {
263         try {
264             return getConnectionAdapter(cookie).multipartRequest(input);
265         } catch (ConnectionException e) {
266             return getRpcErrorFuture(e);
267         }
268     }
269
270     @Override
271     public Future<RpcResult<Void>> packetOut(PacketOutInput input, SwitchConnectionDistinguisher cookie) {
272         try {
273             return getConnectionAdapter(cookie).packetOut(input);
274         } catch (ConnectionException e) {
275             return getRpcErrorFuture(e);
276         }
277     }
278
279     @Override
280     public Future<RpcResult<UpdatePortOutput>> portMod(final PortModInput input, SwitchConnectionDistinguisher cookie) {
281         LOG.debug("Calling OFLibrary portMod");
282         Future<RpcResult<Void>> response = null;
283         try {
284             response = getConnectionAdapter(cookie).portMod(input);
285         } catch (ConnectionException e) {
286             return getRpcErrorFuture(e);
287         }
288
289         // appending xid
290         ListenableFuture<RpcResult<UpdatePortOutput>> xidResult = Futures.transform(
291                 JdkFutureAdapters.listenInPoolThread(response),
292                 new Function<RpcResult<Void>, RpcResult<UpdatePortOutput>>() {
293
294                     @Override
295                     public RpcResult<UpdatePortOutput> apply(final RpcResult<Void> inputArg) {
296                         UpdatePortOutputBuilder portModOutput = new UpdatePortOutputBuilder();
297                         BigInteger bigIntXid = BigInteger.valueOf(input.getXid());
298                         portModOutput.setTransactionId(new TransactionId(bigIntXid));
299
300                         UpdatePortOutput result = portModOutput.build();
301                         RpcResult<UpdatePortOutput> rpcResult = Rpcs.getRpcResult(
302                                 inputArg.isSuccessful(), result, inputArg.getErrors());
303                         return rpcResult;
304                     }
305                 });
306
307         return xidResult;
308     }
309
310     @Override
311     public Future<RpcResult<RoleRequestOutput>> roleRequest(RoleRequestInput input, SwitchConnectionDistinguisher cookie) {
312         try {
313             return getConnectionAdapter(cookie).roleRequest(input);
314         } catch (ConnectionException e) {
315             return getRpcErrorFuture(e);
316         }
317     }
318
319     @Override
320     public Future<RpcResult<Void>> setAsync(SetAsyncInput input, SwitchConnectionDistinguisher cookie) {
321         try {
322             return getConnectionAdapter(cookie).setAsync(input);
323         } catch (ConnectionException e) {
324             return getRpcErrorFuture(e);
325         }
326     }
327
328     @Override
329     public Future<RpcResult<Void>> setConfig(SetConfigInput input, SwitchConnectionDistinguisher cookie) {
330         try {
331             return getConnectionAdapter(cookie).setConfig(input);
332         } catch (ConnectionException e) {
333             return getRpcErrorFuture(e);
334         }
335     }
336
337     @Override
338     public Future<RpcResult<Void>> tableMod(TableModInput input, SwitchConnectionDistinguisher cookie) {
339         try {
340             return getConnectionAdapter(cookie).tableMod(input);
341         } catch (ConnectionException e) {
342             return getRpcErrorFuture(e);
343         }
344     }
345 }