2 * Copyright IBM Corporation, 2013. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.openflowplugin.openflow.md.core.session;
10 import com.google.common.base.Function;
11 import com.google.common.util.concurrent.Futures;
12 import com.google.common.util.concurrent.JdkFutureAdapters;
13 import com.google.common.util.concurrent.ListenableFuture;
14 import com.google.common.util.concurrent.SettableFuture;
15 import org.opendaylight.controller.sal.common.util.RpcErrors;
16 import org.opendaylight.controller.sal.common.util.Rpcs;
17 import org.opendaylight.openflowjava.protocol.api.connection.ConnectionAdapter;
18 import org.opendaylight.openflowplugin.ConnectionException;
19 import org.opendaylight.openflowplugin.api.OFConstants;
20 import org.opendaylight.openflowplugin.api.openflow.md.core.ConnectionConductor;
21 import org.opendaylight.openflowplugin.api.openflow.md.core.SwitchConnectionDistinguisher;
22 import org.opendaylight.openflowplugin.api.openflow.md.core.session.IMessageDispatchService;
23 import org.opendaylight.openflowplugin.api.openflow.md.core.session.SessionContext;
24 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.UpdateFlowOutput;
25 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.UpdateFlowOutputBuilder;
26 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.transaction.rev131103.TransactionId;
27 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.service.rev130918.UpdateGroupOutput;
28 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.service.rev130918.UpdateGroupOutputBuilder;
29 import org.opendaylight.yang.gen.v1.urn.opendaylight.meter.service.rev130918.UpdateMeterOutput;
30 import org.opendaylight.yang.gen.v1.urn.opendaylight.meter.service.rev130918.UpdateMeterOutputBuilder;
31 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.*;
32 import org.opendaylight.yang.gen.v1.urn.opendaylight.port.service.rev131107.UpdatePortOutput;
33 import org.opendaylight.yang.gen.v1.urn.opendaylight.port.service.rev131107.UpdatePortOutputBuilder;
34 import org.opendaylight.yangtools.yang.common.RpcError;
35 import org.opendaylight.yangtools.yang.common.RpcResult;
36 import org.slf4j.Logger;
37 import org.slf4j.LoggerFactory;
39 import java.math.BigInteger;
40 import java.util.ArrayList;
41 import java.util.List;
42 import java.util.concurrent.Future;
45 * message dispatch service to send the message to switch.
49 public class MessageDispatchServiceImpl implements IMessageDispatchService {
51 private static final Logger LOG = LoggerFactory.getLogger(MessageDispatchServiceImpl.class);
52 private static final String CONNECTION_ERROR_MESSAGE = "Session for the cookie is invalid. Reason: "
53 + "the switch has been recently disconnected OR inventory provides outdated information.";
55 private SessionContext session;
60 * @param session - MessageDispatchService for this session
62 public MessageDispatchServiceImpl(SessionContext session) {
63 this.session = session;
67 * get proper connection adapter to send the message to switch.
69 * @param cookie to identify the right connection, it can be null also.
70 * @return connectionAdapter associated with cookie, otherwise return best
71 * suitable connection.
74 private ConnectionAdapter getConnectionAdapter(SwitchConnectionDistinguisher cookie) throws ConnectionException {
76 if (!session.isValid()) {
77 LOG.warn("No valid connection found for the node [datapath-id : {}]", session.getSessionKey().getId());
78 throw new ConnectionException(CONNECTION_ERROR_MESSAGE);
80 LOG.debug("finding connecton for cookie value {}. ", cookie);
81 // set main connection as default
82 ConnectionAdapter connectionAdapter = session.getPrimaryConductor().getConnectionAdapter();
84 ConnectionConductor conductor = session.getAuxiliaryConductor(cookie);
85 // check if auxiliary connection exist
86 if (null != conductor) {
87 LOG.debug("found auxiliary connection for the cookie.");
88 connectionAdapter = conductor.getConnectionAdapter();
91 // TODO: pick connection to utilize all the available connection.
93 return connectionAdapter;
97 public Future<RpcResult<BarrierOutput>> barrier(BarrierInput input, SwitchConnectionDistinguisher cookie) {
99 return getConnectionAdapter(cookie).barrier(input);
100 } catch (ConnectionException e) {
101 return getRpcErrorFuture(e);
105 private <T> SettableFuture<RpcResult<T>> getRpcErrorFuture(ConnectionException e) {
106 List<RpcError> rpcErrorList = getConnectionErrorAsRpcErrors(e);
107 SettableFuture<RpcResult<T>> futureWithError = SettableFuture.create();
108 futureWithError.set(Rpcs.<T>getRpcResult(false, rpcErrorList));
109 return futureWithError;
112 private List<RpcError> getConnectionErrorAsRpcErrors(ConnectionException e) {
113 List<RpcError> rpcErrorList = new ArrayList<>();
114 rpcErrorList.add(RpcErrors.getRpcError(OFConstants.APPLICATION_TAG,
115 OFConstants.ERROR_TAG_TIMEOUT,
116 CONNECTION_ERROR_MESSAGE,
117 RpcError.ErrorSeverity.WARNING,
119 RpcError.ErrorType.TRANSPORT,
125 public Future<RpcResult<Void>> experimenter(ExperimenterInput input, SwitchConnectionDistinguisher cookie) {
127 return getConnectionAdapter(cookie).experimenter(input);
128 } catch (ConnectionException e) {
129 return getRpcErrorFuture(e);
134 public Future<RpcResult<UpdateFlowOutput>> flowMod(final FlowModInput input, SwitchConnectionDistinguisher cookie) {
135 LOG.debug("Calling OFLibrary flowMod");
136 Future<RpcResult<Void>> response = null;
138 response = getConnectionAdapter(cookie).flowMod(input);
139 } catch (ConnectionException e) {
140 return getRpcErrorFuture(e);
144 ListenableFuture<RpcResult<UpdateFlowOutput>> xidResult = Futures.transform(
145 JdkFutureAdapters.listenInPoolThread(response),
146 new Function<RpcResult<Void>, RpcResult<UpdateFlowOutput>>() {
149 public RpcResult<UpdateFlowOutput> apply(final RpcResult<Void> inputArg) {
150 UpdateFlowOutputBuilder flowModOutput = new UpdateFlowOutputBuilder();
151 BigInteger bigIntXid = BigInteger.valueOf(input.getXid());
152 flowModOutput.setTransactionId(new TransactionId(bigIntXid));
154 UpdateFlowOutput result = flowModOutput.build();
155 RpcResult<UpdateFlowOutput> rpcResult = Rpcs.getRpcResult(
156 inputArg.isSuccessful(), result, inputArg.getErrors());
165 public Future<RpcResult<GetAsyncOutput>> getAsync(GetAsyncInput input, SwitchConnectionDistinguisher cookie) {
167 return getConnectionAdapter(cookie).getAsync(input);
168 } catch (ConnectionException e) {
169 return getRpcErrorFuture(e);
174 public Future<RpcResult<GetConfigOutput>> getConfig(GetConfigInput input, SwitchConnectionDistinguisher cookie) {
176 return getConnectionAdapter(cookie).getConfig(input);
177 } catch (ConnectionException e) {
178 return getRpcErrorFuture(e);
183 public Future<RpcResult<GetFeaturesOutput>> getFeatures(GetFeaturesInput input, SwitchConnectionDistinguisher cookie) {
185 return getConnectionAdapter(cookie).getFeatures(input);
186 } catch (ConnectionException e) {
187 return getRpcErrorFuture(e);
192 public Future<RpcResult<GetQueueConfigOutput>> getQueueConfig(GetQueueConfigInput input,
193 SwitchConnectionDistinguisher cookie) {
195 return getConnectionAdapter(cookie).getQueueConfig(input);
196 } catch (ConnectionException e) {
197 return getRpcErrorFuture(e);
202 public Future<RpcResult<UpdateGroupOutput>> groupMod(final GroupModInput input, SwitchConnectionDistinguisher cookie) {
203 LOG.debug("Calling OFLibrary groupMod");
204 Future<RpcResult<Void>> response = null;
206 response = getConnectionAdapter(cookie).groupMod(input);
207 } catch (ConnectionException e) {
208 return getRpcErrorFuture(e);
212 ListenableFuture<RpcResult<UpdateGroupOutput>> xidResult = Futures.transform(
213 JdkFutureAdapters.listenInPoolThread(response),
214 new Function<RpcResult<Void>, RpcResult<UpdateGroupOutput>>() {
217 public RpcResult<UpdateGroupOutput> apply(final RpcResult<Void> inputArg) {
218 UpdateGroupOutputBuilder groupModOutput = new UpdateGroupOutputBuilder();
219 BigInteger bigIntXid = BigInteger.valueOf(input.getXid());
220 groupModOutput.setTransactionId(new TransactionId(bigIntXid));
222 UpdateGroupOutput result = groupModOutput.build();
223 RpcResult<UpdateGroupOutput> rpcResult = Rpcs.getRpcResult(
224 inputArg.isSuccessful(), result, inputArg.getErrors());
233 public Future<RpcResult<UpdateMeterOutput>> meterMod(final MeterModInput input, SwitchConnectionDistinguisher cookie) {
234 LOG.debug("Calling OFLibrary meterMod");
235 Future<RpcResult<Void>> response = null;
237 response = getConnectionAdapter(cookie).meterMod(input);
238 } catch (ConnectionException e) {
239 return getRpcErrorFuture(e);
243 ListenableFuture<RpcResult<UpdateMeterOutput>> xidResult = Futures.transform(
244 JdkFutureAdapters.listenInPoolThread(response),
245 new Function<RpcResult<Void>, RpcResult<UpdateMeterOutput>>() {
248 public RpcResult<UpdateMeterOutput> apply(final RpcResult<Void> inputArg) {
249 UpdateMeterOutputBuilder meterModOutput = new UpdateMeterOutputBuilder();
250 BigInteger bigIntXid = BigInteger.valueOf(input.getXid());
251 meterModOutput.setTransactionId(new TransactionId(bigIntXid));
253 UpdateMeterOutput result = meterModOutput.build();
254 RpcResult<UpdateMeterOutput> rpcResult = Rpcs.getRpcResult(
255 inputArg.isSuccessful(), result, inputArg.getErrors());
264 public Future<RpcResult<java.lang.Void>> multipartRequest(MultipartRequestInput input, SwitchConnectionDistinguisher cookie) {
266 return getConnectionAdapter(cookie).multipartRequest(input);
267 } catch (ConnectionException e) {
268 return getRpcErrorFuture(e);
273 public Future<RpcResult<Void>> packetOut(PacketOutInput input, SwitchConnectionDistinguisher cookie) {
275 return getConnectionAdapter(cookie).packetOut(input);
276 } catch (ConnectionException e) {
277 return getRpcErrorFuture(e);
282 public Future<RpcResult<UpdatePortOutput>> portMod(final PortModInput input, SwitchConnectionDistinguisher cookie) {
283 LOG.debug("Calling OFLibrary portMod");
284 Future<RpcResult<Void>> response = null;
286 response = getConnectionAdapter(cookie).portMod(input);
287 } catch (ConnectionException e) {
288 return getRpcErrorFuture(e);
292 ListenableFuture<RpcResult<UpdatePortOutput>> xidResult = Futures.transform(
293 JdkFutureAdapters.listenInPoolThread(response),
294 new Function<RpcResult<Void>, RpcResult<UpdatePortOutput>>() {
297 public RpcResult<UpdatePortOutput> apply(final RpcResult<Void> inputArg) {
298 UpdatePortOutputBuilder portModOutput = new UpdatePortOutputBuilder();
299 BigInteger bigIntXid = BigInteger.valueOf(input.getXid());
300 portModOutput.setTransactionId(new TransactionId(bigIntXid));
302 UpdatePortOutput result = portModOutput.build();
303 RpcResult<UpdatePortOutput> rpcResult = Rpcs.getRpcResult(
304 inputArg.isSuccessful(), result, inputArg.getErrors());
313 public Future<RpcResult<RoleRequestOutput>> roleRequest(RoleRequestInput input, SwitchConnectionDistinguisher cookie) {
315 return getConnectionAdapter(cookie).roleRequest(input);
316 } catch (ConnectionException e) {
317 return getRpcErrorFuture(e);
322 public Future<RpcResult<Void>> setAsync(SetAsyncInput input, SwitchConnectionDistinguisher cookie) {
324 return getConnectionAdapter(cookie).setAsync(input);
325 } catch (ConnectionException e) {
326 return getRpcErrorFuture(e);
331 public Future<RpcResult<Void>> setConfig(SetConfigInput input, SwitchConnectionDistinguisher cookie) {
333 return getConnectionAdapter(cookie).setConfig(input);
334 } catch (ConnectionException e) {
335 return getRpcErrorFuture(e);
340 public Future<RpcResult<Void>> tableMod(TableModInput input, SwitchConnectionDistinguisher cookie) {
342 return getConnectionAdapter(cookie).tableMod(input);
343 } catch (ConnectionException e) {
344 return getRpcErrorFuture(e);