X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;f=openflowplugin%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fopenflowplugin%2Fopenflow%2Fmd%2Fcore%2Fsession%2FMessageDispatchServiceImpl.java;h=f3c122b723e16da42da360d701c3278c4b894376;hb=611180ac770b6038b526c54994701db16d1a8567;hp=7f1239b1be4457db3c57dd28935089a0f143fb3b;hpb=81f49492b1b4dcfe2d674902b71c6b0cd5e8bec3;p=openflowplugin.git diff --git a/openflowplugin/src/main/java/org/opendaylight/openflowplugin/openflow/md/core/session/MessageDispatchServiceImpl.java b/openflowplugin/src/main/java/org/opendaylight/openflowplugin/openflow/md/core/session/MessageDispatchServiceImpl.java index 7f1239b1be..f3c122b723 100644 --- a/openflowplugin/src/main/java/org/opendaylight/openflowplugin/openflow/md/core/session/MessageDispatchServiceImpl.java +++ b/openflowplugin/src/main/java/org/opendaylight/openflowplugin/openflow/md/core/session/MessageDispatchServiceImpl.java @@ -1,77 +1,59 @@ +/** + * Copyright IBM Corporation, 2013. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ package org.opendaylight.openflowplugin.openflow.md.core.session; -import java.math.BigInteger; -import java.util.Collection; -import java.util.Collections; -import java.util.concurrent.Future; +import com.google.common.base.Function; +import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.JdkFutureAdapters; +import com.google.common.util.concurrent.ListenableFuture; -import org.opendaylight.controller.sal.binding.api.NotificationProviderService; - import org.opendaylight.controller.sal.common.util.Rpcs; import org.opendaylight.openflowjava.protocol.api.connection.ConnectionAdapter; -import org.opendaylight.openflowplugin.openflow.md.core.ConnectionConductor; -import org.opendaylight.openflowplugin.openflow.md.core.SwitchConnectionDistinguisher; +import org.opendaylight.openflowplugin.ConnectionException; +import org.opendaylight.openflowplugin.api.openflow.md.core.ConnectionConductor; +import org.opendaylight.openflowplugin.api.openflow.md.core.SwitchConnectionDistinguisher; +import org.opendaylight.openflowplugin.api.openflow.md.core.session.IMessageDispatchService; +import org.opendaylight.openflowplugin.api.openflow.md.core.session.SessionContext; +import org.opendaylight.openflowplugin.openflow.md.util.RpcResultUtil; import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.UpdateFlowOutput; import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.UpdateFlowOutputBuilder; -import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.transaction.rev131103.TransactionId; +import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.transaction.rev150304.TransactionId; import org.opendaylight.yang.gen.v1.urn.opendaylight.group.service.rev130918.UpdateGroupOutput; import org.opendaylight.yang.gen.v1.urn.opendaylight.group.service.rev130918.UpdateGroupOutputBuilder; import org.opendaylight.yang.gen.v1.urn.opendaylight.meter.service.rev130918.UpdateMeterOutput; import org.opendaylight.yang.gen.v1.urn.opendaylight.meter.service.rev130918.UpdateMeterOutputBuilder; -import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.BarrierInput; -import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.BarrierInputBuilder; -import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.BarrierOutput; -import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.ExperimenterInput; -import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.FlowModInput; -import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.FlowModInputBuilder; -import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.GetAsyncInput; -import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.GetAsyncOutput; -import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.GetConfigInput; -import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.GetConfigOutput; -import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.GetFeaturesInput; -import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.GetFeaturesOutput; -import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.GetQueueConfigInput; -import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.GetQueueConfigOutput; -import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.GroupModInput; -import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.GroupModInputBuilder; -import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.MeterModInput; -import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.MeterModInputBuilder; -import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.MultipartRequestInput; -import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.PacketOutInput; -import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.PortModInput; -import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.RoleRequestInput; -import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.RoleRequestOutput; -import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.SetAsyncInput; -import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.SetConfigInput; -import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.TableModInput; +import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.*; import org.opendaylight.yang.gen.v1.urn.opendaylight.port.service.rev131107.UpdatePortOutput; import org.opendaylight.yang.gen.v1.urn.opendaylight.port.service.rev131107.UpdatePortOutputBuilder; -import org.opendaylight.yangtools.yang.common.RpcError; import org.opendaylight.yangtools.yang.common.RpcResult; +import org.opendaylight.yangtools.yang.common.RpcResultBuilder; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import com.google.common.util.concurrent.Futures; +import java.math.BigInteger; +import java.util.concurrent.Future; /** * message dispatch service to send the message to switch. * * @author AnilGujele - * */ public class MessageDispatchServiceImpl implements IMessageDispatchService { private static final Logger LOG = LoggerFactory.getLogger(MessageDispatchServiceImpl.class); - - private SessionContext session; + private SessionContext session; /** * constructor * - * @param session - * - MessageDispatchService for this session + * @param session - MessageDispatchService for this session */ public MessageDispatchServiceImpl(SessionContext session) { - this.session = session; + this.session = session; } /** @@ -79,15 +61,14 @@ public class MessageDispatchServiceImpl implements IMessageDispatchService { * * @param cookie to identify the right connection, it can be null also. * @return connectionAdapter associated with cookie, otherwise return best - * suitable connection. - * + * suitable connection. */ - private ConnectionAdapter getConnectionAdapter(SwitchConnectionDistinguisher cookie) { + private ConnectionAdapter getConnectionAdapter(SwitchConnectionDistinguisher cookie) throws ConnectionException { if (!session.isValid()) { - LOG.warn("Session for the cookie {} is invalid.", cookie); - throw new IllegalArgumentException("Session for the cookie is invalid."); + LOG.warn("No valid connection found for the node [datapath-id : {}]", session.getSessionKey().getId()); + throw new ConnectionException(CONNECTION_ERROR_MESSAGE); } LOG.debug("finding connecton for cookie value {}. ", cookie); // set main connection as default @@ -106,145 +87,240 @@ public class MessageDispatchServiceImpl implements IMessageDispatchService { } @Override - public Future> barrier(BarrierInput input, SwitchConnectionDistinguisher cookie) { - return getConnectionAdapter(cookie).barrier(input); + public Future> barrier(BarrierInput input, SwitchConnectionDistinguisher cookie) { + try { + return getConnectionAdapter(cookie).barrier(input); + } catch (ConnectionException e) { + return RpcResultUtil.getRpcErrorFuture(e); + } } @Override public Future> experimenter(ExperimenterInput input, SwitchConnectionDistinguisher cookie) { - return getConnectionAdapter(cookie).experimenter(input); + try { + return getConnectionAdapter(cookie).experimenter(input); + } catch (ConnectionException e) { + return RpcResultUtil.getRpcErrorFuture(e); + } } @Override - public Future> flowMod(FlowModInput input, SwitchConnectionDistinguisher cookie) { + public Future> flowMod(final FlowModInput input, SwitchConnectionDistinguisher cookie) { LOG.debug("Calling OFLibrary flowMod"); - Future> response = getConnectionAdapter(cookie).flowMod(input); - - // Send the same Xid back to caller - MessageDrivenSwitch - UpdateFlowOutputBuilder flowModOutput = new UpdateFlowOutputBuilder(); - BigInteger bigIntXid = BigInteger.valueOf(input.getXid()) ; - flowModOutput.setTransactionId(new TransactionId(bigIntXid)); - - UpdateFlowOutput result = flowModOutput.build(); - Collection errors = Collections.emptyList(); - RpcResult rpcResult = Rpcs.getRpcResult(true, result, errors); - - // solution 1: sending directly and hooking listener to get error - // hookup listener to catch the possible error with no reference to returned future-object - LOG.debug("Returning to ModelDrivenSwitch for flowMod RPC"); - return Futures.immediateFuture(rpcResult); + Future> response = null; + try { + response = getConnectionAdapter(cookie).flowMod(input); + } catch (ConnectionException e) { + return RpcResultUtil.getRpcErrorFuture(e); + } + // appending xid + ListenableFuture> xidResult = Futures.transform( + JdkFutureAdapters.listenInPoolThread(response), + new Function, RpcResult>() { + + @Override + public RpcResult apply(final RpcResult inputArg) { + UpdateFlowOutputBuilder flowModOutput = new UpdateFlowOutputBuilder(); + BigInteger bigIntXid = BigInteger.valueOf(input.getXid()); + flowModOutput.setTransactionId(new TransactionId(bigIntXid)); + + UpdateFlowOutput result = flowModOutput.build(); + RpcResult rpcResult = RpcResultBuilder + .status(inputArg.isSuccessful()) + .withResult(result).withRpcErrors(inputArg.getErrors()) + .build(); + return rpcResult; + } + }); + + return xidResult; } @Override public Future> getAsync(GetAsyncInput input, SwitchConnectionDistinguisher cookie) { - return getConnectionAdapter(cookie).getAsync(input); + try { + return getConnectionAdapter(cookie).getAsync(input); + } catch (ConnectionException e) { + return RpcResultUtil.getRpcErrorFuture(e); + } } @Override public Future> getConfig(GetConfigInput input, SwitchConnectionDistinguisher cookie) { - return getConnectionAdapter(cookie).getConfig(input); + try { + return getConnectionAdapter(cookie).getConfig(input); + } catch (ConnectionException e) { + return RpcResultUtil.getRpcErrorFuture(e); + } } @Override public Future> getFeatures(GetFeaturesInput input, SwitchConnectionDistinguisher cookie) { - return getConnectionAdapter(cookie).getFeatures(input); + try { + return getConnectionAdapter(cookie).getFeatures(input); + } catch (ConnectionException e) { + return RpcResultUtil.getRpcErrorFuture(e); + } } @Override public Future> getQueueConfig(GetQueueConfigInput input, - SwitchConnectionDistinguisher cookie) { - return getConnectionAdapter(cookie).getQueueConfig(input); + SwitchConnectionDistinguisher cookie) { + try { + return getConnectionAdapter(cookie).getQueueConfig(input); + } catch (ConnectionException e) { + return RpcResultUtil.getRpcErrorFuture(e); + } } @Override - public Future> groupMod(GroupModInput input, SwitchConnectionDistinguisher cookie) { + public Future> groupMod(final GroupModInput input, SwitchConnectionDistinguisher cookie) { LOG.debug("Calling OFLibrary groupMod"); - Future> response = getConnectionAdapter(cookie).groupMod(input); - - // Send the same Xid back to caller - MessageDrivenSwitch - UpdateGroupOutputBuilder groupModOutput = new UpdateGroupOutputBuilder(); - BigInteger bigIntXid = BigInteger.valueOf(input.getXid()); - groupModOutput.setTransactionId(new TransactionId(bigIntXid)); - - UpdateGroupOutput result = groupModOutput.build(); - Collection errors = Collections.emptyList(); - RpcResult rpcResult = Rpcs.getRpcResult(true, result, errors); - - // solution 1: sending directly and hooking listener to get error - // hookup listener to catch the possible error with no reference to returned future-object - LOG.debug("Returning to ModelDrivenSwitch for groupMod RPC"); - return Futures.immediateFuture(rpcResult); + Future> response = null; + try { + response = getConnectionAdapter(cookie).groupMod(input); + } catch (ConnectionException e) { + return RpcResultUtil.getRpcErrorFuture(e); + } + // appending xid + ListenableFuture> xidResult = Futures.transform( + JdkFutureAdapters.listenInPoolThread(response), + new Function, RpcResult>() { + + @Override + public RpcResult apply(final RpcResult inputArg) { + UpdateGroupOutputBuilder groupModOutput = new UpdateGroupOutputBuilder(); + BigInteger bigIntXid = BigInteger.valueOf(input.getXid()); + groupModOutput.setTransactionId(new TransactionId(bigIntXid)); + + UpdateGroupOutput result = groupModOutput.build(); + RpcResult rpcResult = RpcResultBuilder + .status(inputArg.isSuccessful()).withResult(result) + .withRpcErrors(inputArg.getErrors()).build(); + return rpcResult; + } + }); + + return xidResult; } @Override - public Future> meterMod(MeterModInput input, SwitchConnectionDistinguisher cookie) { + public Future> meterMod(final MeterModInput input, SwitchConnectionDistinguisher cookie) { LOG.debug("Calling OFLibrary meterMod"); - Future> response = getConnectionAdapter(cookie).meterMod(input); - - // Send the same Xid back to caller - MessageDrivenSwitch - UpdateMeterOutputBuilder meterModOutput = new UpdateMeterOutputBuilder(); - BigInteger bigIntXid =BigInteger.valueOf(input.getXid()); - meterModOutput.setTransactionId(new TransactionId(bigIntXid)); - - UpdateMeterOutput result = meterModOutput.build(); - Collection errors = Collections.emptyList(); - RpcResult rpcResult = Rpcs.getRpcResult(true, result, errors); - - // solution 1: sending directly and hooking listener to get error - // hookup listener to catch the possible error with no reference to returned future-object - LOG.debug("Returning to ModelDrivenSwitch for meterMod RPC"); - return Futures.immediateFuture(rpcResult); + Future> response = null; + try { + response = getConnectionAdapter(cookie).meterMod(input); + } catch (ConnectionException e) { + return RpcResultUtil.getRpcErrorFuture(e); + } + // appending xid + ListenableFuture> xidResult = Futures.transform( + JdkFutureAdapters.listenInPoolThread(response), + new Function, RpcResult>() { + + @Override + public RpcResult apply(final RpcResult inputArg) { + UpdateMeterOutputBuilder meterModOutput = new UpdateMeterOutputBuilder(); + BigInteger bigIntXid = BigInteger.valueOf(input.getXid()); + meterModOutput.setTransactionId(new TransactionId(bigIntXid)); + + UpdateMeterOutput result = meterModOutput.build(); + RpcResult rpcResult = RpcResultBuilder + .status(inputArg.isSuccessful()).withResult(result) + .withRpcErrors(inputArg.getErrors()).build(); + return rpcResult; + } + }); + + return xidResult; } @Override public Future> multipartRequest(MultipartRequestInput input, SwitchConnectionDistinguisher cookie) { - return getConnectionAdapter(cookie).multipartRequest(input); + try { + return getConnectionAdapter(cookie).multipartRequest(input); + } catch (ConnectionException e) { + return RpcResultUtil.getRpcErrorFuture(e); + } } @Override public Future> packetOut(PacketOutInput input, SwitchConnectionDistinguisher cookie) { - return getConnectionAdapter(cookie).packetOut(input); + try { + return getConnectionAdapter(cookie).packetOut(input); + } catch (ConnectionException e) { + return RpcResultUtil.getRpcErrorFuture(e); + } } @Override - public Future> portMod(PortModInput input, SwitchConnectionDistinguisher cookie) { - + public Future> portMod(final PortModInput input, SwitchConnectionDistinguisher cookie) { LOG.debug("Calling OFLibrary portMod"); - Future> response = getConnectionAdapter(cookie).portMod(input); - - // Send the same Xid back to caller - ModelDrivenSwitch - UpdatePortOutputBuilder portModOutput = new UpdatePortOutputBuilder(); - String stringXid =input.getXid().toString(); - BigInteger bigIntXid = new BigInteger( stringXid ); - portModOutput.setTransactionId(new TransactionId(bigIntXid)); - UpdatePortOutput result = portModOutput.build(); - Collection errors = Collections.emptyList(); - RpcResult rpcResult = Rpcs.getRpcResult(true, result, errors); + Future> response = null; + try { + response = getConnectionAdapter(cookie).portMod(input); + } catch (ConnectionException e) { + return RpcResultUtil.getRpcErrorFuture(e); + } - LOG.debug("Returning to ModelDrivenSwitch for portMod RPC"); - return Futures.immediateFuture(rpcResult); + // appending xid + ListenableFuture> xidResult = Futures.transform( + JdkFutureAdapters.listenInPoolThread(response), + new Function, RpcResult>() { + + @Override + public RpcResult apply(final RpcResult inputArg) { + UpdatePortOutputBuilder portModOutput = new UpdatePortOutputBuilder(); + BigInteger bigIntXid = BigInteger.valueOf(input.getXid()); + portModOutput.setTransactionId(new TransactionId(bigIntXid)); + + UpdatePortOutput result = portModOutput.build(); + RpcResult rpcResult = RpcResultBuilder + .status(inputArg.isSuccessful()).withResult(result) + .withRpcErrors(inputArg.getErrors()).build(); + return rpcResult; + } + }); + + return xidResult; } @Override public Future> roleRequest(RoleRequestInput input, SwitchConnectionDistinguisher cookie) { - return getConnectionAdapter(cookie).roleRequest(input); + try { + return getConnectionAdapter(cookie).roleRequest(input); + } catch (ConnectionException e) { + return RpcResultUtil.getRpcErrorFuture(e); + } } @Override public Future> setAsync(SetAsyncInput input, SwitchConnectionDistinguisher cookie) { - return getConnectionAdapter(cookie).setAsync(input); + try { + return getConnectionAdapter(cookie).setAsync(input); + } catch (ConnectionException e) { + return RpcResultUtil.getRpcErrorFuture(e); + } } @Override public Future> setConfig(SetConfigInput input, SwitchConnectionDistinguisher cookie) { - return getConnectionAdapter(cookie).setConfig(input); + try { + return getConnectionAdapter(cookie).setConfig(input); + } catch (ConnectionException e) { + return RpcResultUtil.getRpcErrorFuture(e); + } } @Override public Future> tableMod(TableModInput input, SwitchConnectionDistinguisher cookie) { - return getConnectionAdapter(cookie).tableMod(input); + try { + return getConnectionAdapter(cookie).tableMod(input); + } catch (ConnectionException e) { + return RpcResultUtil.getRpcErrorFuture(e); + } } }