Revert "Bug-2827: role switch proposal"
[openflowplugin.git] / openflowplugin / src / main / java / org / opendaylight / openflowplugin / openflow / md / core / session / MessageDispatchServiceImpl.java
1 /**
2  * Copyright IBM Corporation, 2013.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.openflowplugin.openflow.md.core.session;
9
10 import com.google.common.base.Function;
11 import com.google.common.util.concurrent.Futures;
12 import com.google.common.util.concurrent.JdkFutureAdapters;
13 import com.google.common.util.concurrent.ListenableFuture;
14 import com.google.common.util.concurrent.SettableFuture;
15 import org.opendaylight.controller.sal.common.util.RpcErrors;
16 import org.opendaylight.controller.sal.common.util.Rpcs;
17 import org.opendaylight.openflowjava.protocol.api.connection.ConnectionAdapter;
18 import org.opendaylight.openflowplugin.ConnectionException;
19 import org.opendaylight.openflowplugin.api.OFConstants;
20 import org.opendaylight.openflowplugin.api.openflow.md.core.ConnectionConductor;
21 import org.opendaylight.openflowplugin.api.openflow.md.core.SwitchConnectionDistinguisher;
22 import org.opendaylight.openflowplugin.api.openflow.md.core.session.IMessageDispatchService;
23 import org.opendaylight.openflowplugin.api.openflow.md.core.session.SessionContext;
24 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.UpdateFlowOutput;
25 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.UpdateFlowOutputBuilder;
26 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.transaction.rev131103.TransactionId;
27 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.service.rev130918.UpdateGroupOutput;
28 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.service.rev130918.UpdateGroupOutputBuilder;
29 import org.opendaylight.yang.gen.v1.urn.opendaylight.meter.service.rev130918.UpdateMeterOutput;
30 import org.opendaylight.yang.gen.v1.urn.opendaylight.meter.service.rev130918.UpdateMeterOutputBuilder;
31 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.*;
32 import org.opendaylight.yang.gen.v1.urn.opendaylight.port.service.rev131107.UpdatePortOutput;
33 import org.opendaylight.yang.gen.v1.urn.opendaylight.port.service.rev131107.UpdatePortOutputBuilder;
34 import org.opendaylight.yangtools.yang.common.RpcError;
35 import org.opendaylight.yangtools.yang.common.RpcResult;
36 import org.slf4j.Logger;
37 import org.slf4j.LoggerFactory;
38
39 import java.math.BigInteger;
40 import java.util.ArrayList;
41 import java.util.List;
42 import java.util.concurrent.Future;
43
44 /**
45  * message dispatch service to send the message to switch.
46  *
47  * @author AnilGujele
48  */
49 public class MessageDispatchServiceImpl implements IMessageDispatchService {
50
51     private static final Logger LOG = LoggerFactory.getLogger(MessageDispatchServiceImpl.class);
52     private static final String CONNECTION_ERROR_MESSAGE = "Session for the cookie is invalid. Reason: "
53             + "the switch has been recently disconnected OR inventory provides outdated information.";
54
55     private SessionContext session;
56
57     /**
58      * constructor
59      *
60      * @param session - MessageDispatchService for this session
61      */
62     public MessageDispatchServiceImpl(SessionContext session) {
63         this.session = session;
64     }
65
66     /**
67      * get proper connection adapter to send the message to switch.
68      *
69      * @param cookie to identify the right connection, it can be null also.
70      * @return connectionAdapter associated with cookie, otherwise return best
71      * suitable connection.
72      */
73
74     private ConnectionAdapter getConnectionAdapter(SwitchConnectionDistinguisher cookie) throws ConnectionException {
75
76         if (!session.isValid()) {
77             LOG.warn("No valid connection found for the node [datapath-id : {}]", session.getSessionKey().getId());
78             throw new ConnectionException(CONNECTION_ERROR_MESSAGE);
79         }
80         LOG.debug("finding connecton for cookie value {}. ", cookie);
81         // set main connection as default
82         ConnectionAdapter connectionAdapter = session.getPrimaryConductor().getConnectionAdapter();
83         if (null != cookie) {
84             ConnectionConductor conductor = session.getAuxiliaryConductor(cookie);
85             // check if auxiliary connection exist
86             if (null != conductor) {
87                 LOG.debug("found auxiliary connection for the cookie.");
88                 connectionAdapter = conductor.getConnectionAdapter();
89             }
90         } else {
91             // TODO: pick connection to utilize all the available connection.
92         }
93         return connectionAdapter;
94     }
95
96     @Override
97     public Future<RpcResult<BarrierOutput>> barrier(BarrierInput input, SwitchConnectionDistinguisher cookie) {
98         try {
99             return getConnectionAdapter(cookie).barrier(input);
100         } catch (ConnectionException e) {
101             return getRpcErrorFuture(e);
102         }
103     }
104
105     private <T> SettableFuture<RpcResult<T>> getRpcErrorFuture(ConnectionException e) {
106         List<RpcError> rpcErrorList = getConnectionErrorAsRpcErrors(e);
107         SettableFuture<RpcResult<T>> futureWithError = SettableFuture.create();
108         futureWithError.set(Rpcs.<T>getRpcResult(false, rpcErrorList));
109         return futureWithError;
110     }
111
112     private List<RpcError> getConnectionErrorAsRpcErrors(ConnectionException e) {
113         List<RpcError> rpcErrorList = new ArrayList<>();
114         rpcErrorList.add(RpcErrors.getRpcError(OFConstants.APPLICATION_TAG,
115                 OFConstants.ERROR_TAG_TIMEOUT,
116                 CONNECTION_ERROR_MESSAGE,
117                 RpcError.ErrorSeverity.WARNING,
118                 e.getMessage(),
119                 RpcError.ErrorType.TRANSPORT,
120                 e.getCause()));
121         return rpcErrorList;
122     }
123
124     @Override
125     public Future<RpcResult<Void>> experimenter(ExperimenterInput input, SwitchConnectionDistinguisher cookie) {
126         try {
127             return getConnectionAdapter(cookie).experimenter(input);
128         } catch (ConnectionException e) {
129             return getRpcErrorFuture(e);
130         }
131     }
132
133     @Override
134     public Future<RpcResult<UpdateFlowOutput>> flowMod(final FlowModInput input, SwitchConnectionDistinguisher cookie) {
135         LOG.debug("Calling OFLibrary flowMod");
136         Future<RpcResult<Void>> response = null;
137         try {
138             response = getConnectionAdapter(cookie).flowMod(input);
139         } catch (ConnectionException e) {
140             return getRpcErrorFuture(e);
141         }
142
143         // appending xid
144         ListenableFuture<RpcResult<UpdateFlowOutput>> xidResult = Futures.transform(
145                 JdkFutureAdapters.listenInPoolThread(response),
146                 new Function<RpcResult<Void>, RpcResult<UpdateFlowOutput>>() {
147
148                     @Override
149                     public RpcResult<UpdateFlowOutput> apply(final RpcResult<Void> inputArg) {
150                         UpdateFlowOutputBuilder flowModOutput = new UpdateFlowOutputBuilder();
151                         BigInteger bigIntXid = BigInteger.valueOf(input.getXid());
152                         flowModOutput.setTransactionId(new TransactionId(bigIntXid));
153
154                         UpdateFlowOutput result = flowModOutput.build();
155                         RpcResult<UpdateFlowOutput> rpcResult = Rpcs.getRpcResult(
156                                 inputArg.isSuccessful(), result, inputArg.getErrors());
157                         return rpcResult;
158                     }
159                 });
160
161         return xidResult;
162     }
163
164     @Override
165     public Future<RpcResult<GetAsyncOutput>> getAsync(GetAsyncInput input, SwitchConnectionDistinguisher cookie) {
166         try {
167             return getConnectionAdapter(cookie).getAsync(input);
168         } catch (ConnectionException e) {
169             return getRpcErrorFuture(e);
170         }
171     }
172
173     @Override
174     public Future<RpcResult<GetConfigOutput>> getConfig(GetConfigInput input, SwitchConnectionDistinguisher cookie) {
175         try {
176             return getConnectionAdapter(cookie).getConfig(input);
177         } catch (ConnectionException e) {
178             return getRpcErrorFuture(e);
179         }
180     }
181
182     @Override
183     public Future<RpcResult<GetFeaturesOutput>> getFeatures(GetFeaturesInput input, SwitchConnectionDistinguisher cookie) {
184         try {
185             return getConnectionAdapter(cookie).getFeatures(input);
186         } catch (ConnectionException e) {
187             return getRpcErrorFuture(e);
188         }
189     }
190
191     @Override
192     public Future<RpcResult<GetQueueConfigOutput>> getQueueConfig(GetQueueConfigInput input,
193                                                                   SwitchConnectionDistinguisher cookie) {
194         try {
195             return getConnectionAdapter(cookie).getQueueConfig(input);
196         } catch (ConnectionException e) {
197             return getRpcErrorFuture(e);
198         }
199     }
200
201     @Override
202     public Future<RpcResult<UpdateGroupOutput>> groupMod(final GroupModInput input, SwitchConnectionDistinguisher cookie) {
203         LOG.debug("Calling OFLibrary groupMod");
204         Future<RpcResult<Void>> response = null;
205         try {
206             response = getConnectionAdapter(cookie).groupMod(input);
207         } catch (ConnectionException e) {
208             return getRpcErrorFuture(e);
209         }
210
211         // appending xid
212         ListenableFuture<RpcResult<UpdateGroupOutput>> xidResult = Futures.transform(
213                 JdkFutureAdapters.listenInPoolThread(response),
214                 new Function<RpcResult<Void>, RpcResult<UpdateGroupOutput>>() {
215
216                     @Override
217                     public RpcResult<UpdateGroupOutput> apply(final RpcResult<Void> inputArg) {
218                         UpdateGroupOutputBuilder groupModOutput = new UpdateGroupOutputBuilder();
219                         BigInteger bigIntXid = BigInteger.valueOf(input.getXid());
220                         groupModOutput.setTransactionId(new TransactionId(bigIntXid));
221
222                         UpdateGroupOutput result = groupModOutput.build();
223                         RpcResult<UpdateGroupOutput> rpcResult = Rpcs.getRpcResult(
224                                 inputArg.isSuccessful(), result, inputArg.getErrors());
225                         return rpcResult;
226                     }
227                 });
228
229         return xidResult;
230     }
231
232     @Override
233     public Future<RpcResult<UpdateMeterOutput>> meterMod(final MeterModInput input, SwitchConnectionDistinguisher cookie) {
234         LOG.debug("Calling OFLibrary meterMod");
235         Future<RpcResult<Void>> response = null;
236         try {
237             response = getConnectionAdapter(cookie).meterMod(input);
238         } catch (ConnectionException e) {
239             return getRpcErrorFuture(e);
240         }
241
242         // appending xid
243         ListenableFuture<RpcResult<UpdateMeterOutput>> xidResult = Futures.transform(
244                 JdkFutureAdapters.listenInPoolThread(response),
245                 new Function<RpcResult<Void>, RpcResult<UpdateMeterOutput>>() {
246
247                     @Override
248                     public RpcResult<UpdateMeterOutput> apply(final RpcResult<Void> inputArg) {
249                         UpdateMeterOutputBuilder meterModOutput = new UpdateMeterOutputBuilder();
250                         BigInteger bigIntXid = BigInteger.valueOf(input.getXid());
251                         meterModOutput.setTransactionId(new TransactionId(bigIntXid));
252
253                         UpdateMeterOutput result = meterModOutput.build();
254                         RpcResult<UpdateMeterOutput> rpcResult = Rpcs.getRpcResult(
255                                 inputArg.isSuccessful(), result, inputArg.getErrors());
256                         return rpcResult;
257                     }
258                 });
259
260         return xidResult;
261     }
262
263     @Override
264     public Future<RpcResult<java.lang.Void>> multipartRequest(MultipartRequestInput input, SwitchConnectionDistinguisher cookie) {
265         try {
266             return getConnectionAdapter(cookie).multipartRequest(input);
267         } catch (ConnectionException e) {
268             return getRpcErrorFuture(e);
269         }
270     }
271
272     @Override
273     public Future<RpcResult<Void>> packetOut(PacketOutInput input, SwitchConnectionDistinguisher cookie) {
274         try {
275             return getConnectionAdapter(cookie).packetOut(input);
276         } catch (ConnectionException e) {
277             return getRpcErrorFuture(e);
278         }
279     }
280
281     @Override
282     public Future<RpcResult<UpdatePortOutput>> portMod(final PortModInput input, SwitchConnectionDistinguisher cookie) {
283         LOG.debug("Calling OFLibrary portMod");
284         Future<RpcResult<Void>> response = null;
285         try {
286             response = getConnectionAdapter(cookie).portMod(input);
287         } catch (ConnectionException e) {
288             return getRpcErrorFuture(e);
289         }
290
291         // appending xid
292         ListenableFuture<RpcResult<UpdatePortOutput>> xidResult = Futures.transform(
293                 JdkFutureAdapters.listenInPoolThread(response),
294                 new Function<RpcResult<Void>, RpcResult<UpdatePortOutput>>() {
295
296                     @Override
297                     public RpcResult<UpdatePortOutput> apply(final RpcResult<Void> inputArg) {
298                         UpdatePortOutputBuilder portModOutput = new UpdatePortOutputBuilder();
299                         BigInteger bigIntXid = BigInteger.valueOf(input.getXid());
300                         portModOutput.setTransactionId(new TransactionId(bigIntXid));
301
302                         UpdatePortOutput result = portModOutput.build();
303                         RpcResult<UpdatePortOutput> rpcResult = Rpcs.getRpcResult(
304                                 inputArg.isSuccessful(), result, inputArg.getErrors());
305                         return rpcResult;
306                     }
307                 });
308
309         return xidResult;
310     }
311
312     @Override
313     public Future<RpcResult<RoleRequestOutput>> roleRequest(RoleRequestInput input, SwitchConnectionDistinguisher cookie) {
314         try {
315             return getConnectionAdapter(cookie).roleRequest(input);
316         } catch (ConnectionException e) {
317             return getRpcErrorFuture(e);
318         }
319     }
320
321     @Override
322     public Future<RpcResult<Void>> setAsync(SetAsyncInput input, SwitchConnectionDistinguisher cookie) {
323         try {
324             return getConnectionAdapter(cookie).setAsync(input);
325         } catch (ConnectionException e) {
326             return getRpcErrorFuture(e);
327         }
328     }
329
330     @Override
331     public Future<RpcResult<Void>> setConfig(SetConfigInput input, SwitchConnectionDistinguisher cookie) {
332         try {
333             return getConnectionAdapter(cookie).setConfig(input);
334         } catch (ConnectionException e) {
335             return getRpcErrorFuture(e);
336         }
337     }
338
339     @Override
340     public Future<RpcResult<Void>> tableMod(TableModInput input, SwitchConnectionDistinguisher cookie) {
341         try {
342             return getConnectionAdapter(cookie).tableMod(input);
343         } catch (ConnectionException e) {
344             return getRpcErrorFuture(e);
345         }
346     }
347 }