2 * Copyright IBM Corporation, 2013. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.openflowplugin.openflow.md.core.session;
10 import com.google.common.base.Function;
11 import com.google.common.util.concurrent.Futures;
12 import com.google.common.util.concurrent.JdkFutureAdapters;
13 import com.google.common.util.concurrent.ListenableFuture;
14 import com.google.common.util.concurrent.SettableFuture;
15 import org.opendaylight.controller.sal.common.util.RpcErrors;
16 import org.opendaylight.controller.sal.common.util.Rpcs;
17 import org.opendaylight.openflowjava.protocol.api.connection.ConnectionAdapter;
18 import org.opendaylight.openflowplugin.ConnectionException;
19 import org.opendaylight.openflowplugin.openflow.md.OFConstants;
20 import org.opendaylight.openflowplugin.openflow.md.core.ConnectionConductor;
21 import org.opendaylight.openflowplugin.openflow.md.core.SwitchConnectionDistinguisher;
22 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.UpdateFlowOutput;
23 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.UpdateFlowOutputBuilder;
24 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.transaction.rev131103.TransactionId;
25 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.service.rev130918.UpdateGroupOutput;
26 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.service.rev130918.UpdateGroupOutputBuilder;
27 import org.opendaylight.yang.gen.v1.urn.opendaylight.meter.service.rev130918.UpdateMeterOutput;
28 import org.opendaylight.yang.gen.v1.urn.opendaylight.meter.service.rev130918.UpdateMeterOutputBuilder;
29 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.*;
30 import org.opendaylight.yang.gen.v1.urn.opendaylight.port.service.rev131107.UpdatePortOutput;
31 import org.opendaylight.yang.gen.v1.urn.opendaylight.port.service.rev131107.UpdatePortOutputBuilder;
32 import org.opendaylight.yangtools.yang.common.RpcError;
33 import org.opendaylight.yangtools.yang.common.RpcResult;
34 import org.slf4j.Logger;
35 import org.slf4j.LoggerFactory;
37 import java.math.BigInteger;
38 import java.util.ArrayList;
39 import java.util.List;
40 import java.util.concurrent.Future;
43 * message dispatch service to send the message to switch.
47 public class MessageDispatchServiceImpl implements IMessageDispatchService {
49 private static final Logger LOG = LoggerFactory.getLogger(MessageDispatchServiceImpl.class);
50 private static final String CONNECTION_ERROR_MESSAGE = "Session for the cookie is invalid. Reason: "
51 + "the switch has been recently disconnected OR inventory provides outdated information.";
53 private SessionContext session;
58 * @param session - MessageDispatchService for this session
60 public MessageDispatchServiceImpl(SessionContext session) {
61 this.session = session;
65 * get proper connection adapter to send the message to switch.
67 * @param cookie to identify the right connection, it can be null also.
68 * @return connectionAdapter associated with cookie, otherwise return best
69 * suitable connection.
72 private ConnectionAdapter getConnectionAdapter(SwitchConnectionDistinguisher cookie) throws ConnectionException {
74 if (!session.isValid()) {
75 LOG.warn("Session for the cookie {} is invalid.", cookie);
76 throw new ConnectionException(CONNECTION_ERROR_MESSAGE);
78 LOG.debug("finding connecton for cookie value {}. ", cookie);
79 // set main connection as default
80 ConnectionAdapter connectionAdapter = session.getPrimaryConductor().getConnectionAdapter();
82 ConnectionConductor conductor = session.getAuxiliaryConductor(cookie);
83 // check if auxiliary connection exist
84 if (null != conductor) {
85 LOG.debug("found auxiliary connection for the cookie.");
86 connectionAdapter = conductor.getConnectionAdapter();
89 // TODO: pick connection to utilize all the available connection.
91 return connectionAdapter;
95 public Future<RpcResult<BarrierOutput>> barrier(BarrierInput input, SwitchConnectionDistinguisher cookie) {
97 return getConnectionAdapter(cookie).barrier(input);
98 } catch (ConnectionException e) {
99 return getRpcErrorFuture(e);
103 private <T> SettableFuture<RpcResult<T>> getRpcErrorFuture(ConnectionException e) {
104 List<RpcError> rpcErrorList = getConnectionErrorAsRpcErrors(e);
105 SettableFuture<RpcResult<T>> futureWithError = SettableFuture.create();
106 futureWithError.set(Rpcs.<T>getRpcResult(false, rpcErrorList));
107 return futureWithError;
110 private List<RpcError> getConnectionErrorAsRpcErrors(ConnectionException e) {
111 List<RpcError> rpcErrorList = new ArrayList<>();
112 rpcErrorList.add(RpcErrors.getRpcError(OFConstants.APPLICATION_TAG,
113 OFConstants.ERROR_TAG_TIMEOUT,
114 CONNECTION_ERROR_MESSAGE,
115 RpcError.ErrorSeverity.WARNING,
117 RpcError.ErrorType.TRANSPORT,
123 public Future<RpcResult<Void>> experimenter(ExperimenterInput input, SwitchConnectionDistinguisher cookie) {
125 return getConnectionAdapter(cookie).experimenter(input);
126 } catch (ConnectionException e) {
127 return getRpcErrorFuture(e);
132 public Future<RpcResult<UpdateFlowOutput>> flowMod(final FlowModInput input, SwitchConnectionDistinguisher cookie) {
133 LOG.debug("Calling OFLibrary flowMod");
134 Future<RpcResult<Void>> response = null;
136 response = getConnectionAdapter(cookie).flowMod(input);
137 } catch (ConnectionException e) {
138 return getRpcErrorFuture(e);
142 ListenableFuture<RpcResult<UpdateFlowOutput>> xidResult = Futures.transform(
143 JdkFutureAdapters.listenInPoolThread(response),
144 new Function<RpcResult<Void>, RpcResult<UpdateFlowOutput>>() {
147 public RpcResult<UpdateFlowOutput> apply(final RpcResult<Void> inputArg) {
148 UpdateFlowOutputBuilder flowModOutput = new UpdateFlowOutputBuilder();
149 BigInteger bigIntXid = BigInteger.valueOf(input.getXid());
150 flowModOutput.setTransactionId(new TransactionId(bigIntXid));
152 UpdateFlowOutput result = flowModOutput.build();
153 RpcResult<UpdateFlowOutput> rpcResult = Rpcs.getRpcResult(
154 inputArg.isSuccessful(), result, inputArg.getErrors());
163 public Future<RpcResult<GetAsyncOutput>> getAsync(GetAsyncInput input, SwitchConnectionDistinguisher cookie) {
165 return getConnectionAdapter(cookie).getAsync(input);
166 } catch (ConnectionException e) {
167 return getRpcErrorFuture(e);
172 public Future<RpcResult<GetConfigOutput>> getConfig(GetConfigInput input, SwitchConnectionDistinguisher cookie) {
174 return getConnectionAdapter(cookie).getConfig(input);
175 } catch (ConnectionException e) {
176 return getRpcErrorFuture(e);
181 public Future<RpcResult<GetFeaturesOutput>> getFeatures(GetFeaturesInput input, SwitchConnectionDistinguisher cookie) {
183 return getConnectionAdapter(cookie).getFeatures(input);
184 } catch (ConnectionException e) {
185 return getRpcErrorFuture(e);
190 public Future<RpcResult<GetQueueConfigOutput>> getQueueConfig(GetQueueConfigInput input,
191 SwitchConnectionDistinguisher cookie) {
193 return getConnectionAdapter(cookie).getQueueConfig(input);
194 } catch (ConnectionException e) {
195 return getRpcErrorFuture(e);
200 public Future<RpcResult<UpdateGroupOutput>> groupMod(final GroupModInput input, SwitchConnectionDistinguisher cookie) {
201 LOG.debug("Calling OFLibrary groupMod");
202 Future<RpcResult<Void>> response = null;
204 response = getConnectionAdapter(cookie).groupMod(input);
205 } catch (ConnectionException e) {
206 return getRpcErrorFuture(e);
210 ListenableFuture<RpcResult<UpdateGroupOutput>> xidResult = Futures.transform(
211 JdkFutureAdapters.listenInPoolThread(response),
212 new Function<RpcResult<Void>, RpcResult<UpdateGroupOutput>>() {
215 public RpcResult<UpdateGroupOutput> apply(final RpcResult<Void> inputArg) {
216 UpdateGroupOutputBuilder groupModOutput = new UpdateGroupOutputBuilder();
217 BigInteger bigIntXid = BigInteger.valueOf(input.getXid());
218 groupModOutput.setTransactionId(new TransactionId(bigIntXid));
220 UpdateGroupOutput result = groupModOutput.build();
221 RpcResult<UpdateGroupOutput> rpcResult = Rpcs.getRpcResult(
222 inputArg.isSuccessful(), result, inputArg.getErrors());
231 public Future<RpcResult<UpdateMeterOutput>> meterMod(final MeterModInput input, SwitchConnectionDistinguisher cookie) {
232 LOG.debug("Calling OFLibrary meterMod");
233 Future<RpcResult<Void>> response = null;
235 response = getConnectionAdapter(cookie).meterMod(input);
236 } catch (ConnectionException e) {
237 return getRpcErrorFuture(e);
241 ListenableFuture<RpcResult<UpdateMeterOutput>> xidResult = Futures.transform(
242 JdkFutureAdapters.listenInPoolThread(response),
243 new Function<RpcResult<Void>, RpcResult<UpdateMeterOutput>>() {
246 public RpcResult<UpdateMeterOutput> apply(final RpcResult<Void> inputArg) {
247 UpdateMeterOutputBuilder meterModOutput = new UpdateMeterOutputBuilder();
248 BigInteger bigIntXid = BigInteger.valueOf(input.getXid());
249 meterModOutput.setTransactionId(new TransactionId(bigIntXid));
251 UpdateMeterOutput result = meterModOutput.build();
252 RpcResult<UpdateMeterOutput> rpcResult = Rpcs.getRpcResult(
253 inputArg.isSuccessful(), result, inputArg.getErrors());
262 public Future<RpcResult<java.lang.Void>> multipartRequest(MultipartRequestInput input, SwitchConnectionDistinguisher cookie) {
264 return getConnectionAdapter(cookie).multipartRequest(input);
265 } catch (ConnectionException e) {
266 return getRpcErrorFuture(e);
271 public Future<RpcResult<Void>> packetOut(PacketOutInput input, SwitchConnectionDistinguisher cookie) {
273 return getConnectionAdapter(cookie).packetOut(input);
274 } catch (ConnectionException e) {
275 return getRpcErrorFuture(e);
280 public Future<RpcResult<UpdatePortOutput>> portMod(final PortModInput input, SwitchConnectionDistinguisher cookie) {
281 LOG.debug("Calling OFLibrary portMod");
282 Future<RpcResult<Void>> response = null;
284 response = getConnectionAdapter(cookie).portMod(input);
285 } catch (ConnectionException e) {
286 return getRpcErrorFuture(e);
290 ListenableFuture<RpcResult<UpdatePortOutput>> xidResult = Futures.transform(
291 JdkFutureAdapters.listenInPoolThread(response),
292 new Function<RpcResult<Void>, RpcResult<UpdatePortOutput>>() {
295 public RpcResult<UpdatePortOutput> apply(final RpcResult<Void> inputArg) {
296 UpdatePortOutputBuilder portModOutput = new UpdatePortOutputBuilder();
297 BigInteger bigIntXid = BigInteger.valueOf(input.getXid());
298 portModOutput.setTransactionId(new TransactionId(bigIntXid));
300 UpdatePortOutput result = portModOutput.build();
301 RpcResult<UpdatePortOutput> rpcResult = Rpcs.getRpcResult(
302 inputArg.isSuccessful(), result, inputArg.getErrors());
311 public Future<RpcResult<RoleRequestOutput>> roleRequest(RoleRequestInput input, SwitchConnectionDistinguisher cookie) {
313 return getConnectionAdapter(cookie).roleRequest(input);
314 } catch (ConnectionException e) {
315 return getRpcErrorFuture(e);
320 public Future<RpcResult<Void>> setAsync(SetAsyncInput input, SwitchConnectionDistinguisher cookie) {
322 return getConnectionAdapter(cookie).setAsync(input);
323 } catch (ConnectionException e) {
324 return getRpcErrorFuture(e);
329 public Future<RpcResult<Void>> setConfig(SetConfigInput input, SwitchConnectionDistinguisher cookie) {
331 return getConnectionAdapter(cookie).setConfig(input);
332 } catch (ConnectionException e) {
333 return getRpcErrorFuture(e);
338 public Future<RpcResult<Void>> tableMod(TableModInput input, SwitchConnectionDistinguisher cookie) {
340 return getConnectionAdapter(cookie).tableMod(input);
341 } catch (ConnectionException e) {
342 return getRpcErrorFuture(e);