2 * Copyright IBM Corporation, 2013. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.openflowplugin.openflow.md.core.session;
10 import com.google.common.base.Function;
11 import com.google.common.util.concurrent.Futures;
12 import com.google.common.util.concurrent.JdkFutureAdapters;
13 import com.google.common.util.concurrent.ListenableFuture;
15 import org.opendaylight.openflowjava.protocol.api.connection.ConnectionAdapter;
16 import org.opendaylight.openflowplugin.ConnectionException;
17 import org.opendaylight.openflowplugin.api.openflow.md.core.ConnectionConductor;
18 import org.opendaylight.openflowplugin.api.openflow.md.core.SwitchConnectionDistinguisher;
19 import org.opendaylight.openflowplugin.api.openflow.md.core.session.IMessageDispatchService;
20 import org.opendaylight.openflowplugin.api.openflow.md.core.session.SessionContext;
21 import org.opendaylight.openflowplugin.openflow.md.util.RpcResultUtil;
22 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.UpdateFlowOutput;
23 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.UpdateFlowOutputBuilder;
24 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.transaction.rev131103.TransactionId;
25 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.service.rev130918.UpdateGroupOutput;
26 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.service.rev130918.UpdateGroupOutputBuilder;
27 import org.opendaylight.yang.gen.v1.urn.opendaylight.meter.service.rev130918.UpdateMeterOutput;
28 import org.opendaylight.yang.gen.v1.urn.opendaylight.meter.service.rev130918.UpdateMeterOutputBuilder;
29 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.*;
30 import org.opendaylight.yang.gen.v1.urn.opendaylight.port.service.rev131107.UpdatePortOutput;
31 import org.opendaylight.yang.gen.v1.urn.opendaylight.port.service.rev131107.UpdatePortOutputBuilder;
32 import org.opendaylight.yangtools.yang.common.RpcResult;
33 import org.opendaylight.yangtools.yang.common.RpcResultBuilder;
34 import org.slf4j.Logger;
35 import org.slf4j.LoggerFactory;
37 import java.math.BigInteger;
38 import java.util.concurrent.Future;
41 * message dispatch service to send the message to switch.
45 public class MessageDispatchServiceImpl implements IMessageDispatchService {
47 private static final Logger LOG = LoggerFactory.getLogger(MessageDispatchServiceImpl.class);
48 private SessionContext session;
53 * @param session - MessageDispatchService for this session
55 public MessageDispatchServiceImpl(SessionContext session) {
56 this.session = session;
60 * get proper connection adapter to send the message to switch.
62 * @param cookie to identify the right connection, it can be null also.
63 * @return connectionAdapter associated with cookie, otherwise return best
64 * suitable connection.
67 private ConnectionAdapter getConnectionAdapter(SwitchConnectionDistinguisher cookie) throws ConnectionException {
69 if (!session.isValid()) {
70 LOG.warn("No valid connection found for the node [datapath-id : {}]", session.getSessionKey().getId());
71 throw new ConnectionException(CONNECTION_ERROR_MESSAGE);
73 LOG.debug("finding connecton for cookie value {}. ", cookie);
74 // set main connection as default
75 ConnectionAdapter connectionAdapter = session.getPrimaryConductor().getConnectionAdapter();
77 ConnectionConductor conductor = session.getAuxiliaryConductor(cookie);
78 // check if auxiliary connection exist
79 if (null != conductor) {
80 LOG.debug("found auxiliary connection for the cookie.");
81 connectionAdapter = conductor.getConnectionAdapter();
84 // TODO: pick connection to utilize all the available connection.
86 return connectionAdapter;
90 public Future<RpcResult<BarrierOutput>> barrier(BarrierInput input, SwitchConnectionDistinguisher cookie) {
92 return getConnectionAdapter(cookie).barrier(input);
93 } catch (ConnectionException e) {
94 return RpcResultUtil.getRpcErrorFuture(e);
99 public Future<RpcResult<Void>> experimenter(ExperimenterInput input, SwitchConnectionDistinguisher cookie) {
101 return getConnectionAdapter(cookie).experimenter(input);
102 } catch (ConnectionException e) {
103 return RpcResultUtil.getRpcErrorFuture(e);
108 public Future<RpcResult<UpdateFlowOutput>> flowMod(final FlowModInput input, SwitchConnectionDistinguisher cookie) {
109 LOG.debug("Calling OFLibrary flowMod");
110 Future<RpcResult<Void>> response = null;
112 response = getConnectionAdapter(cookie).flowMod(input);
113 } catch (ConnectionException e) {
114 return RpcResultUtil.getRpcErrorFuture(e);
118 ListenableFuture<RpcResult<UpdateFlowOutput>> xidResult = Futures.transform(
119 JdkFutureAdapters.listenInPoolThread(response),
120 new Function<RpcResult<Void>, RpcResult<UpdateFlowOutput>>() {
123 public RpcResult<UpdateFlowOutput> apply(final RpcResult<Void> inputArg) {
124 UpdateFlowOutputBuilder flowModOutput = new UpdateFlowOutputBuilder();
125 BigInteger bigIntXid = BigInteger.valueOf(input.getXid());
126 flowModOutput.setTransactionId(new TransactionId(bigIntXid));
128 UpdateFlowOutput result = flowModOutput.build();
129 RpcResult<UpdateFlowOutput> rpcResult = RpcResultBuilder
130 .<UpdateFlowOutput>status(inputArg.isSuccessful())
131 .withResult(result).withRpcErrors(inputArg.getErrors())
141 public Future<RpcResult<GetAsyncOutput>> getAsync(GetAsyncInput input, SwitchConnectionDistinguisher cookie) {
143 return getConnectionAdapter(cookie).getAsync(input);
144 } catch (ConnectionException e) {
145 return RpcResultUtil.getRpcErrorFuture(e);
150 public Future<RpcResult<GetConfigOutput>> getConfig(GetConfigInput input, SwitchConnectionDistinguisher cookie) {
152 return getConnectionAdapter(cookie).getConfig(input);
153 } catch (ConnectionException e) {
154 return RpcResultUtil.getRpcErrorFuture(e);
159 public Future<RpcResult<GetFeaturesOutput>> getFeatures(GetFeaturesInput input, SwitchConnectionDistinguisher cookie) {
161 return getConnectionAdapter(cookie).getFeatures(input);
162 } catch (ConnectionException e) {
163 return RpcResultUtil.getRpcErrorFuture(e);
168 public Future<RpcResult<GetQueueConfigOutput>> getQueueConfig(GetQueueConfigInput input,
169 SwitchConnectionDistinguisher cookie) {
171 return getConnectionAdapter(cookie).getQueueConfig(input);
172 } catch (ConnectionException e) {
173 return RpcResultUtil.getRpcErrorFuture(e);
178 public Future<RpcResult<UpdateGroupOutput>> groupMod(final GroupModInput input, SwitchConnectionDistinguisher cookie) {
179 LOG.debug("Calling OFLibrary groupMod");
180 Future<RpcResult<Void>> response = null;
182 response = getConnectionAdapter(cookie).groupMod(input);
183 } catch (ConnectionException e) {
184 return RpcResultUtil.getRpcErrorFuture(e);
188 ListenableFuture<RpcResult<UpdateGroupOutput>> xidResult = Futures.transform(
189 JdkFutureAdapters.listenInPoolThread(response),
190 new Function<RpcResult<Void>, RpcResult<UpdateGroupOutput>>() {
193 public RpcResult<UpdateGroupOutput> apply(final RpcResult<Void> inputArg) {
194 UpdateGroupOutputBuilder groupModOutput = new UpdateGroupOutputBuilder();
195 BigInteger bigIntXid = BigInteger.valueOf(input.getXid());
196 groupModOutput.setTransactionId(new TransactionId(bigIntXid));
198 UpdateGroupOutput result = groupModOutput.build();
199 RpcResult<UpdateGroupOutput> rpcResult = RpcResultBuilder
200 .<UpdateGroupOutput>status(inputArg.isSuccessful()).withResult(result)
201 .withRpcErrors(inputArg.getErrors()).build();
210 public Future<RpcResult<UpdateMeterOutput>> meterMod(final MeterModInput input, SwitchConnectionDistinguisher cookie) {
211 LOG.debug("Calling OFLibrary meterMod");
212 Future<RpcResult<Void>> response = null;
214 response = getConnectionAdapter(cookie).meterMod(input);
215 } catch (ConnectionException e) {
216 return RpcResultUtil.getRpcErrorFuture(e);
220 ListenableFuture<RpcResult<UpdateMeterOutput>> xidResult = Futures.transform(
221 JdkFutureAdapters.listenInPoolThread(response),
222 new Function<RpcResult<Void>, RpcResult<UpdateMeterOutput>>() {
225 public RpcResult<UpdateMeterOutput> apply(final RpcResult<Void> inputArg) {
226 UpdateMeterOutputBuilder meterModOutput = new UpdateMeterOutputBuilder();
227 BigInteger bigIntXid = BigInteger.valueOf(input.getXid());
228 meterModOutput.setTransactionId(new TransactionId(bigIntXid));
230 UpdateMeterOutput result = meterModOutput.build();
231 RpcResult<UpdateMeterOutput> rpcResult = RpcResultBuilder
232 .<UpdateMeterOutput>status(inputArg.isSuccessful()).withResult(result)
233 .withRpcErrors(inputArg.getErrors()).build();
242 public Future<RpcResult<java.lang.Void>> multipartRequest(MultipartRequestInput input, SwitchConnectionDistinguisher cookie) {
244 return getConnectionAdapter(cookie).multipartRequest(input);
245 } catch (ConnectionException e) {
246 return RpcResultUtil.getRpcErrorFuture(e);
251 public Future<RpcResult<Void>> packetOut(PacketOutInput input, SwitchConnectionDistinguisher cookie) {
253 return getConnectionAdapter(cookie).packetOut(input);
254 } catch (ConnectionException e) {
255 return RpcResultUtil.getRpcErrorFuture(e);
260 public Future<RpcResult<UpdatePortOutput>> portMod(final PortModInput input, SwitchConnectionDistinguisher cookie) {
261 LOG.debug("Calling OFLibrary portMod");
262 Future<RpcResult<Void>> response = null;
264 response = getConnectionAdapter(cookie).portMod(input);
265 } catch (ConnectionException e) {
266 return RpcResultUtil.getRpcErrorFuture(e);
270 ListenableFuture<RpcResult<UpdatePortOutput>> xidResult = Futures.transform(
271 JdkFutureAdapters.listenInPoolThread(response),
272 new Function<RpcResult<Void>, RpcResult<UpdatePortOutput>>() {
275 public RpcResult<UpdatePortOutput> apply(final RpcResult<Void> inputArg) {
276 UpdatePortOutputBuilder portModOutput = new UpdatePortOutputBuilder();
277 BigInteger bigIntXid = BigInteger.valueOf(input.getXid());
278 portModOutput.setTransactionId(new TransactionId(bigIntXid));
280 UpdatePortOutput result = portModOutput.build();
281 RpcResult<UpdatePortOutput> rpcResult = RpcResultBuilder
282 .<UpdatePortOutput>status(inputArg.isSuccessful()).withResult(result)
283 .withRpcErrors(inputArg.getErrors()).build();
292 public Future<RpcResult<RoleRequestOutput>> roleRequest(RoleRequestInput input, SwitchConnectionDistinguisher cookie) {
294 return getConnectionAdapter(cookie).roleRequest(input);
295 } catch (ConnectionException e) {
296 return RpcResultUtil.getRpcErrorFuture(e);
301 public Future<RpcResult<Void>> setAsync(SetAsyncInput input, SwitchConnectionDistinguisher cookie) {
303 return getConnectionAdapter(cookie).setAsync(input);
304 } catch (ConnectionException e) {
305 return RpcResultUtil.getRpcErrorFuture(e);
310 public Future<RpcResult<Void>> setConfig(SetConfigInput input, SwitchConnectionDistinguisher cookie) {
312 return getConnectionAdapter(cookie).setConfig(input);
313 } catch (ConnectionException e) {
314 return RpcResultUtil.getRpcErrorFuture(e);
319 public Future<RpcResult<Void>> tableMod(TableModInput input, SwitchConnectionDistinguisher cookie) {
321 return getConnectionAdapter(cookie).tableMod(input);
322 } catch (ConnectionException e) {
323 return RpcResultUtil.getRpcErrorFuture(e);