X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;f=openflowplugin%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fopenflowplugin%2Fopenflow%2Fmd%2Fcore%2Fsession%2FMessageDispatchServiceImpl.java;h=f3c122b723e16da42da360d701c3278c4b894376;hb=611180ac770b6038b526c54994701db16d1a8567;hp=7d8f43cdbbf2968e9bb3f343f4de99ba4170177e;hpb=7ee637c4449e773b1e65298e4b75065b0032ba9f;p=openflowplugin.git diff --git a/openflowplugin/src/main/java/org/opendaylight/openflowplugin/openflow/md/core/session/MessageDispatchServiceImpl.java b/openflowplugin/src/main/java/org/opendaylight/openflowplugin/openflow/md/core/session/MessageDispatchServiceImpl.java index 7d8f43cdbb..f3c122b723 100644 --- a/openflowplugin/src/main/java/org/opendaylight/openflowplugin/openflow/md/core/session/MessageDispatchServiceImpl.java +++ b/openflowplugin/src/main/java/org/opendaylight/openflowplugin/openflow/md/core/session/MessageDispatchServiceImpl.java @@ -1,52 +1,56 @@ +/** + * Copyright IBM Corporation, 2013. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ package org.opendaylight.openflowplugin.openflow.md.core.session; -import java.util.concurrent.Future; +import com.google.common.base.Function; +import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.JdkFutureAdapters; +import com.google.common.util.concurrent.ListenableFuture; import org.opendaylight.openflowjava.protocol.api.connection.ConnectionAdapter; -import org.opendaylight.openflowplugin.openflow.md.core.ConnectionConductor; -import org.opendaylight.openflowplugin.openflow.md.core.SwitchConnectionDistinguisher; -import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.BarrierInput; -import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.BarrierOutput; -import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.ExperimenterInput; -import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.FlowModInput; -import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.GetAsyncInput; -import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.GetAsyncOutput; -import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.GetConfigInput; -import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.GetConfigOutput; -import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.GetFeaturesInput; -import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.GetFeaturesOutput; -import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.GetQueueConfigInput; -import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.GetQueueConfigOutput; -import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.GroupModInput; -import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.MeterModInput; -import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.PacketOutInput; -import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.PortModInput; -import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.RoleRequestInput; -import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.RoleRequestOutput; -import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.SetAsyncInput; -import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.SetConfigInput; -import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.TableModInput; +import org.opendaylight.openflowplugin.ConnectionException; +import org.opendaylight.openflowplugin.api.openflow.md.core.ConnectionConductor; +import org.opendaylight.openflowplugin.api.openflow.md.core.SwitchConnectionDistinguisher; +import org.opendaylight.openflowplugin.api.openflow.md.core.session.IMessageDispatchService; +import org.opendaylight.openflowplugin.api.openflow.md.core.session.SessionContext; +import org.opendaylight.openflowplugin.openflow.md.util.RpcResultUtil; +import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.UpdateFlowOutput; +import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.UpdateFlowOutputBuilder; +import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.transaction.rev150304.TransactionId; +import org.opendaylight.yang.gen.v1.urn.opendaylight.group.service.rev130918.UpdateGroupOutput; +import org.opendaylight.yang.gen.v1.urn.opendaylight.group.service.rev130918.UpdateGroupOutputBuilder; +import org.opendaylight.yang.gen.v1.urn.opendaylight.meter.service.rev130918.UpdateMeterOutput; +import org.opendaylight.yang.gen.v1.urn.opendaylight.meter.service.rev130918.UpdateMeterOutputBuilder; +import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.*; +import org.opendaylight.yang.gen.v1.urn.opendaylight.port.service.rev131107.UpdatePortOutput; +import org.opendaylight.yang.gen.v1.urn.opendaylight.port.service.rev131107.UpdatePortOutputBuilder; import org.opendaylight.yangtools.yang.common.RpcResult; +import org.opendaylight.yangtools.yang.common.RpcResultBuilder; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.math.BigInteger; +import java.util.concurrent.Future; + /** * message dispatch service to send the message to switch. * * @author AnilGujele - * */ public class MessageDispatchServiceImpl implements IMessageDispatchService { private static final Logger LOG = LoggerFactory.getLogger(MessageDispatchServiceImpl.class); - private SessionContext session; /** * constructor * - * @param session - * - MessageDispatchService for this session + * @param session - MessageDispatchService for this session */ public MessageDispatchServiceImpl(SessionContext session) { this.session = session; @@ -55,17 +59,16 @@ public class MessageDispatchServiceImpl implements IMessageDispatchService { /** * get proper connection adapter to send the message to switch. * - * @param - cookie to identify the right connection, it can be null also. + * @param cookie to identify the right connection, it can be null also. * @return connectionAdapter associated with cookie, otherwise return best - * suitable connection. - * + * suitable connection. */ - private ConnectionAdapter getConnectionAdapter(SwitchConnectionDistinguisher cookie) { + private ConnectionAdapter getConnectionAdapter(SwitchConnectionDistinguisher cookie) throws ConnectionException { if (!session.isValid()) { - LOG.warn("Session for the cookie {} is invalid.", cookie); - throw new IllegalArgumentException("Session for the cookie is invalid."); + LOG.warn("No valid connection found for the node [datapath-id : {}]", session.getSessionKey().getId()); + throw new ConnectionException(CONNECTION_ERROR_MESSAGE); } LOG.debug("finding connecton for cookie value {}. ", cookie); // set main connection as default @@ -85,78 +88,239 @@ public class MessageDispatchServiceImpl implements IMessageDispatchService { @Override public Future> barrier(BarrierInput input, SwitchConnectionDistinguisher cookie) { - return getConnectionAdapter(cookie).barrier(input); + try { + return getConnectionAdapter(cookie).barrier(input); + } catch (ConnectionException e) { + return RpcResultUtil.getRpcErrorFuture(e); + } } @Override public Future> experimenter(ExperimenterInput input, SwitchConnectionDistinguisher cookie) { - return getConnectionAdapter(cookie).experimenter(input); + try { + return getConnectionAdapter(cookie).experimenter(input); + } catch (ConnectionException e) { + return RpcResultUtil.getRpcErrorFuture(e); + } } @Override - public Future> flowMod(FlowModInput input, SwitchConnectionDistinguisher cookie) { - return getConnectionAdapter(cookie).flowMod(input); + public Future> flowMod(final FlowModInput input, SwitchConnectionDistinguisher cookie) { + LOG.debug("Calling OFLibrary flowMod"); + Future> response = null; + try { + response = getConnectionAdapter(cookie).flowMod(input); + } catch (ConnectionException e) { + return RpcResultUtil.getRpcErrorFuture(e); + } + + // appending xid + ListenableFuture> xidResult = Futures.transform( + JdkFutureAdapters.listenInPoolThread(response), + new Function, RpcResult>() { + + @Override + public RpcResult apply(final RpcResult inputArg) { + UpdateFlowOutputBuilder flowModOutput = new UpdateFlowOutputBuilder(); + BigInteger bigIntXid = BigInteger.valueOf(input.getXid()); + flowModOutput.setTransactionId(new TransactionId(bigIntXid)); + + UpdateFlowOutput result = flowModOutput.build(); + RpcResult rpcResult = RpcResultBuilder + .status(inputArg.isSuccessful()) + .withResult(result).withRpcErrors(inputArg.getErrors()) + .build(); + return rpcResult; + } + }); + + return xidResult; } @Override public Future> getAsync(GetAsyncInput input, SwitchConnectionDistinguisher cookie) { - return getConnectionAdapter(cookie).getAsync(input); + try { + return getConnectionAdapter(cookie).getAsync(input); + } catch (ConnectionException e) { + return RpcResultUtil.getRpcErrorFuture(e); + } } @Override public Future> getConfig(GetConfigInput input, SwitchConnectionDistinguisher cookie) { - return getConnectionAdapter(cookie).getConfig(input); + try { + return getConnectionAdapter(cookie).getConfig(input); + } catch (ConnectionException e) { + return RpcResultUtil.getRpcErrorFuture(e); + } } @Override public Future> getFeatures(GetFeaturesInput input, SwitchConnectionDistinguisher cookie) { - return getConnectionAdapter(cookie).getFeatures(input); + try { + return getConnectionAdapter(cookie).getFeatures(input); + } catch (ConnectionException e) { + return RpcResultUtil.getRpcErrorFuture(e); + } } @Override public Future> getQueueConfig(GetQueueConfigInput input, - SwitchConnectionDistinguisher cookie) { - return getConnectionAdapter(cookie).getQueueConfig(input); + SwitchConnectionDistinguisher cookie) { + try { + return getConnectionAdapter(cookie).getQueueConfig(input); + } catch (ConnectionException e) { + return RpcResultUtil.getRpcErrorFuture(e); + } } @Override - public Future> groupMod(GroupModInput input, SwitchConnectionDistinguisher cookie) { - return getConnectionAdapter(cookie).groupMod(input); + public Future> groupMod(final GroupModInput input, SwitchConnectionDistinguisher cookie) { + LOG.debug("Calling OFLibrary groupMod"); + Future> response = null; + try { + response = getConnectionAdapter(cookie).groupMod(input); + } catch (ConnectionException e) { + return RpcResultUtil.getRpcErrorFuture(e); + } + + // appending xid + ListenableFuture> xidResult = Futures.transform( + JdkFutureAdapters.listenInPoolThread(response), + new Function, RpcResult>() { + + @Override + public RpcResult apply(final RpcResult inputArg) { + UpdateGroupOutputBuilder groupModOutput = new UpdateGroupOutputBuilder(); + BigInteger bigIntXid = BigInteger.valueOf(input.getXid()); + groupModOutput.setTransactionId(new TransactionId(bigIntXid)); + + UpdateGroupOutput result = groupModOutput.build(); + RpcResult rpcResult = RpcResultBuilder + .status(inputArg.isSuccessful()).withResult(result) + .withRpcErrors(inputArg.getErrors()).build(); + return rpcResult; + } + }); + + return xidResult; } @Override - public Future> meterMod(MeterModInput input, SwitchConnectionDistinguisher cookie) { - return getConnectionAdapter(cookie).meterMod(input); + public Future> meterMod(final MeterModInput input, SwitchConnectionDistinguisher cookie) { + LOG.debug("Calling OFLibrary meterMod"); + Future> response = null; + try { + response = getConnectionAdapter(cookie).meterMod(input); + } catch (ConnectionException e) { + return RpcResultUtil.getRpcErrorFuture(e); + } + + // appending xid + ListenableFuture> xidResult = Futures.transform( + JdkFutureAdapters.listenInPoolThread(response), + new Function, RpcResult>() { + + @Override + public RpcResult apply(final RpcResult inputArg) { + UpdateMeterOutputBuilder meterModOutput = new UpdateMeterOutputBuilder(); + BigInteger bigIntXid = BigInteger.valueOf(input.getXid()); + meterModOutput.setTransactionId(new TransactionId(bigIntXid)); + + UpdateMeterOutput result = meterModOutput.build(); + RpcResult rpcResult = RpcResultBuilder + .status(inputArg.isSuccessful()).withResult(result) + .withRpcErrors(inputArg.getErrors()).build(); + return rpcResult; + } + }); + + return xidResult; + } + + @Override + public Future> multipartRequest(MultipartRequestInput input, SwitchConnectionDistinguisher cookie) { + try { + return getConnectionAdapter(cookie).multipartRequest(input); + } catch (ConnectionException e) { + return RpcResultUtil.getRpcErrorFuture(e); + } } @Override public Future> packetOut(PacketOutInput input, SwitchConnectionDistinguisher cookie) { - return getConnectionAdapter(cookie).packetOut(input); + try { + return getConnectionAdapter(cookie).packetOut(input); + } catch (ConnectionException e) { + return RpcResultUtil.getRpcErrorFuture(e); + } } @Override - public Future> portMod(PortModInput input, SwitchConnectionDistinguisher cookie) { - return getConnectionAdapter(cookie).portMod(input); + public Future> portMod(final PortModInput input, SwitchConnectionDistinguisher cookie) { + LOG.debug("Calling OFLibrary portMod"); + Future> response = null; + try { + response = getConnectionAdapter(cookie).portMod(input); + } catch (ConnectionException e) { + return RpcResultUtil.getRpcErrorFuture(e); + } + + // appending xid + ListenableFuture> xidResult = Futures.transform( + JdkFutureAdapters.listenInPoolThread(response), + new Function, RpcResult>() { + + @Override + public RpcResult apply(final RpcResult inputArg) { + UpdatePortOutputBuilder portModOutput = new UpdatePortOutputBuilder(); + BigInteger bigIntXid = BigInteger.valueOf(input.getXid()); + portModOutput.setTransactionId(new TransactionId(bigIntXid)); + + UpdatePortOutput result = portModOutput.build(); + RpcResult rpcResult = RpcResultBuilder + .status(inputArg.isSuccessful()).withResult(result) + .withRpcErrors(inputArg.getErrors()).build(); + return rpcResult; + } + }); + + return xidResult; } @Override public Future> roleRequest(RoleRequestInput input, SwitchConnectionDistinguisher cookie) { - return getConnectionAdapter(cookie).roleRequest(input); + try { + return getConnectionAdapter(cookie).roleRequest(input); + } catch (ConnectionException e) { + return RpcResultUtil.getRpcErrorFuture(e); + } } @Override public Future> setAsync(SetAsyncInput input, SwitchConnectionDistinguisher cookie) { - return getConnectionAdapter(cookie).setAsync(input); + try { + return getConnectionAdapter(cookie).setAsync(input); + } catch (ConnectionException e) { + return RpcResultUtil.getRpcErrorFuture(e); + } } @Override public Future> setConfig(SetConfigInput input, SwitchConnectionDistinguisher cookie) { - return getConnectionAdapter(cookie).setConfig(input); + try { + return getConnectionAdapter(cookie).setConfig(input); + } catch (ConnectionException e) { + return RpcResultUtil.getRpcErrorFuture(e); + } } @Override public Future> tableMod(TableModInput input, SwitchConnectionDistinguisher cookie) { - return getConnectionAdapter(cookie).tableMod(input); + try { + return getConnectionAdapter(cookie).tableMod(input); + } catch (ConnectionException e) { + return RpcResultUtil.getRpcErrorFuture(e); + } } - }