From: Michal Rehak Date: Wed, 7 May 2014 22:43:47 +0000 (+0000) Subject: Merge "additional fix fro BUG-782 unregistering switch providers" X-Git-Tag: release/helium~199 X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=commitdiff_plain;h=a1187cf3c66079f1578abf72f0a6849a1e7671e9;hp=8362ce3bbb113d86463d6a2bc9b86b62dc9b2ca1;p=openflowplugin.git Merge "additional fix fro BUG-782 unregistering switch providers" --- diff --git a/openflowplugin/src/main/java/org/opendaylight/openflowplugin/openflow/md/core/MDController.java b/openflowplugin/src/main/java/org/opendaylight/openflowplugin/openflow/md/core/MDController.java index 156c823af6..acd3814e7f 100644 --- a/openflowplugin/src/main/java/org/opendaylight/openflowplugin/openflow/md/core/MDController.java +++ b/openflowplugin/src/main/java/org/opendaylight/openflowplugin/openflow/md/core/MDController.java @@ -11,6 +11,7 @@ package org.opendaylight.openflowplugin.openflow.md.core; import com.google.common.collect.Lists; import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; + import org.opendaylight.openflowjava.protocol.api.connection.ConnectionConfiguration; import org.opendaylight.openflowjava.protocol.spi.connection.SwitchConnectionProvider; import org.opendaylight.openflowplugin.openflow.md.OFConstants; @@ -83,6 +84,8 @@ import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; @@ -105,6 +108,7 @@ public class MDController implements IMDController, AutoCloseable { final private int OF13 = OFConstants.OFP_VERSION_1_3; private ErrorHandlerQueueImpl errorHandler; + private ExecutorService rpcPool; /** @@ -195,6 +199,11 @@ public class MDController implements IMDController, AutoCloseable { // Push the updated Listeners to Session Manager which will be then picked up by ConnectionConductor eventually OFSessionUtil.getSessionManager().setTranslatorMapping(messageTranslators); OFSessionUtil.getSessionManager().setPopListenerMapping(popListeners); + + // prepare worker pool for rpc + // TODO: get size from configSubsystem + OFSessionUtil.getSessionManager().setRpcPool(Executors.newFixedThreadPool(10)); + } /** diff --git a/openflowplugin/src/main/java/org/opendaylight/openflowplugin/openflow/md/core/sal/ModelDrivenSwitchImpl.java b/openflowplugin/src/main/java/org/opendaylight/openflowplugin/openflow/md/core/sal/ModelDrivenSwitchImpl.java index 0ffddbe2ba..4b36178d20 100644 --- a/openflowplugin/src/main/java/org/opendaylight/openflowplugin/openflow/md/core/sal/ModelDrivenSwitchImpl.java +++ b/openflowplugin/src/main/java/org/opendaylight/openflowplugin/openflow/md/core/sal/ModelDrivenSwitchImpl.java @@ -7,8 +7,14 @@ */ package org.opendaylight.openflowplugin.openflow.md.core.sal; -import com.google.common.base.Objects; -import com.google.common.util.concurrent.Futures; +import java.math.BigInteger; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.List; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; + import org.opendaylight.controller.sal.binding.api.NotificationProviderService; import org.opendaylight.controller.sal.common.util.Rpcs; import org.opendaylight.openflowjava.protocol.api.util.BinContent; @@ -29,16 +35,12 @@ import org.opendaylight.openflowplugin.openflow.md.util.FlowCreatorUtil; import org.opendaylight.openflowplugin.openflow.md.util.InventoryDataServiceUtil; import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.AddFlowInput; import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.AddFlowOutput; -import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.AddFlowOutputBuilder; -import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.FlowAddedBuilder; import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.FlowRemovedBuilder; -import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.FlowUpdatedBuilder; import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.RemoveFlowInput; import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.RemoveFlowOutput; import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.RemoveFlowOutputBuilder; import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.UpdateFlowInput; import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.UpdateFlowOutput; -import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.UpdateFlowOutputBuilder; import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.statistics.rev130819.GetAggregateFlowStatisticsFromFlowTableForAllFlowsInput; import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.statistics.rev130819.GetAggregateFlowStatisticsFromFlowTableForAllFlowsOutput; import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.statistics.rev130819.GetAggregateFlowStatisticsFromFlowTableForAllFlowsOutputBuilder; @@ -177,12 +179,9 @@ import org.opendaylight.yangtools.yang.common.RpcError; import org.opendaylight.yangtools.yang.common.RpcResult; import org.slf4j.Logger; -import java.math.BigInteger; -import java.util.ArrayList; -import java.util.Collection; -import java.util.Collections; -import java.util.List; -import java.util.concurrent.Future; +import com.google.common.base.Objects; +import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.JdkFutureAdapters; /** * RPC implementation of MD-switch @@ -195,81 +194,46 @@ public class ModelDrivenSwitchImpl extends AbstractModelDrivenSwitch { private short version = 0; private final SessionContext session; NotificationProviderService rpcNotificationProviderService; - - protected ModelDrivenSwitchImpl(NodeId nodeId, InstanceIdentifier identifier, SessionContext context) { + private OFRpcTaskHelper rpcTaskHelper; + + // TODO:read timeout from configSubsystem + protected long maxTimeout = 1000; + protected TimeUnit maxTimeoutUnit = TimeUnit.MILLISECONDS; + + protected ModelDrivenSwitchImpl(final NodeId nodeId, final InstanceIdentifier identifier, final SessionContext context) { super(identifier, context); this.nodeId = nodeId; messageService = sessionContext.getMessageDispatchService(); version = context.getPrimaryConductor().getVersion(); this.session = context; rpcNotificationProviderService = OFSessionUtil.getSessionManager().getNotificationProviderService(); + rpcTaskHelper = new OFRpcTaskHelper(messageService, context, rpcNotificationProviderService); } @Override - public Future> addFlow(AddFlowInput input) { + public Future> addFlow(final AddFlowInput input) { LOG.debug("Calling the FlowMod RPC method on MessageDispatchService"); - Long xId = null; - // For Flow provisioning, the SwitchConnectionDistinguisher is set to - // null so - // the request can be routed through any connection to the switch - + // use primary connection SwitchConnectionDistinguisher cookie = null; - if (Objects.firstNonNull(input.isBarrier(), Boolean.FALSE)) { - xId = session.getNextXid(); - BarrierInputBuilder barrierInput = new BarrierInputBuilder(); - barrierInput.setVersion(version); - barrierInput.setXid(xId); - @SuppressWarnings("unused") - Future> barrierOFLib = messageService.barrier(barrierInput.build(), cookie); - } - - // Convert the AddFlowInput to FlowModInput - FlowModInputBuilder ofFlowModInput = FlowConvertor.toFlowModInput(input, version, this.getSessionContext() - .getFeatures().getDatapathId()); - xId = session.getNextXid(); - ofFlowModInput.setXid(xId); - - if (null != rpcNotificationProviderService) { - FlowAddedBuilder newFlow = new FlowAddedBuilder( - (org.opendaylight.yang.gen.v1.urn.opendaylight.flow.types.rev131026.Flow) input); - newFlow.setTransactionId(new TransactionId(BigInteger.valueOf(xId.intValue()))); - newFlow.setFlowRef(input.getFlowRef()); - rpcNotificationProviderService.publish(newFlow.build()); - } - - session.getbulkTransactionCache().put(new TransactionKey(xId), input); - Future> resultFromOFLib = messageService.flowMod(ofFlowModInput.build(), cookie); - RpcResult rpcResultFromOFLib = null; - - try { - rpcResultFromOFLib = resultFromOFLib.get(); - } catch (Exception ex) { - LOG.error(" Error while getting result for AddFlow RPC" + ex.getMessage()); - } - - UpdateFlowOutput updateFlowOutput = rpcResultFromOFLib.getResult(); - - AddFlowOutputBuilder addFlowOutput = new AddFlowOutputBuilder(); - addFlowOutput.setTransactionId(updateFlowOutput.getTransactionId()); - AddFlowOutput result = addFlowOutput.build(); - - Collection errors = rpcResultFromOFLib.getErrors(); - RpcResult rpcResult = Rpcs.getRpcResult(true, result, errors); - - LOG.debug("Returning the Add Flow RPC result to MD-SAL"); - return Futures.immediateFuture(rpcResult); + + OFRpcTask> task = + OFRpcTaskFactory.createAddFlowTask(maxTimeout, maxTimeoutUnit, rpcTaskHelper); + rpcTaskHelper.initTask(task, input, cookie); + OFSessionUtil.getSessionManager().getRpcPool().submit(task); + + return Futures.transform(JdkFutureAdapters.listenInPoolThread(task.getResult()), + OFRpcFutureResultTransformFactory.createForAddFlowOutput()); } + @Override - public Future> addGroup(AddGroupInput input) { + public Future> addGroup(final AddGroupInput input) { LOG.debug("Calling the GroupMod RPC method on MessageDispatchService"); Long xId = null; - // For Flow provisioning, the SwitchConnectionDistinguisher is set to - // null so - // the request can be routed through any connection to the switch - + // use primary connection SwitchConnectionDistinguisher cookie = null; + if (Objects.firstNonNull(input.isBarrier(), Boolean.FALSE)) { xId = session.getNextXid(); BarrierInputBuilder barrierInput = new BarrierInputBuilder(); @@ -317,7 +281,7 @@ public class ModelDrivenSwitchImpl extends AbstractModelDrivenSwitch { } @Override - public Future> addMeter(AddMeterInput input) { + public Future> addMeter(final AddMeterInput input) { LOG.debug("Calling the MeterMod RPC method on MessageDispatchService"); Long xId = null; // For Meter provisioning, the SwitchConnectionDistinguisher is set to @@ -372,7 +336,7 @@ public class ModelDrivenSwitchImpl extends AbstractModelDrivenSwitch { } @Override - public Future> removeFlow(RemoveFlowInput input) { + public Future> removeFlow(final RemoveFlowInput input) { LOG.debug("Calling the removeFlow RPC method on MessageDispatchService"); Long xId = null; // For Flow provisioning, the SwitchConnectionDistinguisher is set to @@ -428,7 +392,7 @@ public class ModelDrivenSwitchImpl extends AbstractModelDrivenSwitch { } @Override - public Future> removeGroup(RemoveGroupInput input) { + public Future> removeGroup(final RemoveGroupInput input) { LOG.debug("Calling the Remove Group RPC method on MessageDispatchService"); Long xId = null; @@ -485,7 +449,7 @@ public class ModelDrivenSwitchImpl extends AbstractModelDrivenSwitch { } @Override - public Future> removeMeter(RemoveMeterInput input) { + public Future> removeMeter(final RemoveMeterInput input) { LOG.debug("Calling the Remove MeterMod RPC method on MessageDispatchService"); Long xId = null; @@ -540,7 +504,7 @@ public class ModelDrivenSwitchImpl extends AbstractModelDrivenSwitch { } @Override - public Future> transmitPacket(TransmitPacketInput input) { + public Future> transmitPacket(final TransmitPacketInput input) { LOG.debug("TransmitPacket - {}", input); // Convert TransmitPacket to PacketOutInput PacketOutInput message = PacketOutConvertor.toPacketOutInput(input, version, sessionContext.getNextXid(), @@ -556,7 +520,7 @@ public class ModelDrivenSwitchImpl extends AbstractModelDrivenSwitch { return messageService.packetOut(message, cookie); } - private FlowModInputBuilder toFlowModInputBuilder(Flow source) { + private FlowModInputBuilder toFlowModInputBuilder(final Flow source) { FlowModInputBuilder target = new FlowModInputBuilder(); target.setCookie(source.getCookie().getValue()); target.setIdleTimeout(source.getIdleTimeout()); @@ -566,7 +530,7 @@ public class ModelDrivenSwitchImpl extends AbstractModelDrivenSwitch { return target; } - private Match toMatch(org.opendaylight.yang.gen.v1.urn.opendaylight.flow.types.rev131026.flow.Match match) { + private Match toMatch(final org.opendaylight.yang.gen.v1.urn.opendaylight.flow.types.rev131026.flow.Match match) { MatchBuilder target = new MatchBuilder(); target.setMatchEntries(toMatchEntries(match)); @@ -575,70 +539,29 @@ public class ModelDrivenSwitchImpl extends AbstractModelDrivenSwitch { } private List toMatchEntries( - org.opendaylight.yang.gen.v1.urn.opendaylight.flow.types.rev131026.flow.Match match) { + final org.opendaylight.yang.gen.v1.urn.opendaylight.flow.types.rev131026.flow.Match match) { List entries = new ArrayList<>(); return null; } @Override - public Future> updateFlow(UpdateFlowInput input) { + public Future> updateFlow(final UpdateFlowInput input) { LOG.debug("Calling the updateFlow RPC method on MessageDispatchService"); - Long xId = null; - // Call the RPC method on MessageDispatchService - - // For Flow provisioning, the SwitchConnectionDistinguisher is set to - // null so - // the request can be routed through any connection to the switch - + + // use primary connection SwitchConnectionDistinguisher cookie = null; - if (Objects.firstNonNull(input.getUpdatedFlow().isBarrier(), Boolean.FALSE)) { - BarrierInputBuilder barrierInput = new BarrierInputBuilder(); - xId = session.getNextXid(); - barrierInput.setVersion(version); - barrierInput.setXid(xId); - Future> barrierOFLib = messageService.barrier(barrierInput.build(), cookie); - } - - // Convert the UpdateFlowInput to FlowModInput - FlowModInputBuilder ofFlowModInput = FlowConvertor.toFlowModInput(input.getUpdatedFlow(), version, this - .getSessionContext().getFeatures().getDatapathId()); - xId = session.getNextXid(); - ofFlowModInput.setXid(xId); - - if (null != rpcNotificationProviderService) { - FlowUpdatedBuilder updateFlow = new FlowUpdatedBuilder(input.getUpdatedFlow()); - updateFlow.setTransactionId(new TransactionId(BigInteger.valueOf(xId.intValue()))); - updateFlow.setFlowRef(input.getFlowRef()); - rpcNotificationProviderService.publish(updateFlow.build()); - } - - session.getbulkTransactionCache().put(new TransactionKey(xId), input); - Future> resultFromOFLib = messageService.flowMod(ofFlowModInput.build(), cookie); - - RpcResult rpcResultFromOFLib = null; - - try { - rpcResultFromOFLib = resultFromOFLib.get(); - } catch (Exception ex) { - LOG.error(" Error while getting result for UpdateFlow RPC" + ex.getMessage()); - } - - UpdateFlowOutput updateFlowOutputOFLib = rpcResultFromOFLib.getResult(); - - UpdateFlowOutputBuilder updateFlowOutput = new UpdateFlowOutputBuilder(); - updateFlowOutput.setTransactionId(updateFlowOutputOFLib.getTransactionId()); - UpdateFlowOutput result = updateFlowOutput.build(); - - Collection errors = rpcResultFromOFLib.getErrors(); - RpcResult rpcResult = Rpcs.getRpcResult(true, result, errors); - - LOG.debug("Returning the Update Flow RPC result to MD-SAL"); - return Futures.immediateFuture(rpcResult); + + OFRpcTask> task = + OFRpcTaskFactory.createUpdateFlowTask(maxTimeout, maxTimeoutUnit, rpcTaskHelper); + rpcTaskHelper.initTask(task, input, cookie); + OFSessionUtil.getSessionManager().getRpcPool().submit(task); + + return task.getResult(); } @Override - public Future> updateGroup(UpdateGroupInput input) { + public Future> updateGroup(final UpdateGroupInput input) { LOG.debug("Calling the update Group Mod RPC method on MessageDispatchService"); Long xId = null; @@ -694,7 +617,7 @@ public class ModelDrivenSwitchImpl extends AbstractModelDrivenSwitch { } @Override - public Future> updateMeter(UpdateMeterInput input) { + public Future> updateMeter(final UpdateMeterInput input) { LOG.debug("Calling the MeterMod RPC method on MessageDispatchService"); Long xId = null; @@ -756,7 +679,7 @@ public class ModelDrivenSwitchImpl extends AbstractModelDrivenSwitch { * Methods for requesting statistics from switch */ @Override - public Future> getAllGroupStatistics(GetAllGroupStatisticsInput input) { + public Future> getAllGroupStatistics(final GetAllGroupStatisticsInput input) { GetAllGroupStatisticsOutputBuilder output = new GetAllGroupStatisticsOutputBuilder(); Collection errors = Collections.emptyList(); @@ -804,7 +727,7 @@ public class ModelDrivenSwitchImpl extends AbstractModelDrivenSwitch { } @Override - public Future> getGroupDescription(GetGroupDescriptionInput input) { + public Future> getGroupDescription(final GetGroupDescriptionInput input) { GetGroupDescriptionOutputBuilder output = new GetGroupDescriptionOutputBuilder(); Collection errors = Collections.emptyList(); @@ -850,7 +773,7 @@ public class ModelDrivenSwitchImpl extends AbstractModelDrivenSwitch { } @Override - public Future> getGroupFeatures(GetGroupFeaturesInput input) { + public Future> getGroupFeatures(final GetGroupFeaturesInput input) { GetGroupFeaturesOutputBuilder output = new GetGroupFeaturesOutputBuilder(); Collection errors = Collections.emptyList(); @@ -893,7 +816,7 @@ public class ModelDrivenSwitchImpl extends AbstractModelDrivenSwitch { } @Override - public Future> getGroupStatistics(GetGroupStatisticsInput input) { + public Future> getGroupStatistics(final GetGroupStatisticsInput input) { GetGroupStatisticsOutputBuilder output = new GetGroupStatisticsOutputBuilder(); Collection errors = Collections.emptyList(); @@ -942,7 +865,7 @@ public class ModelDrivenSwitchImpl extends AbstractModelDrivenSwitch { @Override public Future> getAllMeterConfigStatistics( - GetAllMeterConfigStatisticsInput input) { + final GetAllMeterConfigStatisticsInput input) { GetAllMeterConfigStatisticsOutputBuilder output = new GetAllMeterConfigStatisticsOutputBuilder(); Collection errors = Collections.emptyList(); @@ -990,7 +913,7 @@ public class ModelDrivenSwitchImpl extends AbstractModelDrivenSwitch { } @Override - public Future> getAllMeterStatistics(GetAllMeterStatisticsInput input) { + public Future> getAllMeterStatistics(final GetAllMeterStatisticsInput input) { GetAllMeterStatisticsOutputBuilder output = new GetAllMeterStatisticsOutputBuilder(); Collection errors = Collections.emptyList(); @@ -1037,7 +960,7 @@ public class ModelDrivenSwitchImpl extends AbstractModelDrivenSwitch { } @Override - public Future> getMeterFeatures(GetMeterFeaturesInput input) { + public Future> getMeterFeatures(final GetMeterFeaturesInput input) { GetMeterFeaturesOutputBuilder output = new GetMeterFeaturesOutputBuilder(); Collection errors = Collections.emptyList(); @@ -1080,7 +1003,7 @@ public class ModelDrivenSwitchImpl extends AbstractModelDrivenSwitch { } @Override - public Future> getMeterStatistics(GetMeterStatisticsInput input) { + public Future> getMeterStatistics(final GetMeterStatisticsInput input) { GetMeterStatisticsOutputBuilder output = new GetMeterStatisticsOutputBuilder(); Collection errors = Collections.emptyList(); @@ -1130,7 +1053,7 @@ public class ModelDrivenSwitchImpl extends AbstractModelDrivenSwitch { @Override public Future> getAllNodeConnectorsStatistics( - GetAllNodeConnectorsStatisticsInput arg0) { + final GetAllNodeConnectorsStatisticsInput arg0) { // Generate xid to associate it with the request Long xid = this.getSessionContext().getNextXid(); @@ -1171,7 +1094,7 @@ public class ModelDrivenSwitchImpl extends AbstractModelDrivenSwitch { @Override public Future> getNodeConnectorStatistics( - GetNodeConnectorStatisticsInput arg0) { + final GetNodeConnectorStatisticsInput arg0) { // Generate xid to associate it with the request Long xid = this.getSessionContext().getNextXid(); @@ -1211,7 +1134,7 @@ public class ModelDrivenSwitchImpl extends AbstractModelDrivenSwitch { return Futures.immediateFuture(rpcResult); } - private TransactionId generateTransactionId(Long xid) { + private TransactionId generateTransactionId(final Long xid) { String stringXid = xid.toString(); BigInteger bigIntXid = new BigInteger(stringXid); return new TransactionId(bigIntXid); @@ -1219,7 +1142,7 @@ public class ModelDrivenSwitchImpl extends AbstractModelDrivenSwitch { } @Override - public Future> updatePort(UpdatePortInput input) { + public Future> updatePort(final UpdatePortInput input) { PortModInput ofPortModInput = null; RpcResult rpcResultFromOFLib = null; @@ -1277,7 +1200,7 @@ public class ModelDrivenSwitchImpl extends AbstractModelDrivenSwitch { } @Override - public Future> updateTable(UpdateTableInput input) { + public Future> updateTable(final UpdateTableInput input) { // Get the Xid. The same Xid has to be sent in all the Multipart // requests @@ -1324,7 +1247,7 @@ public class ModelDrivenSwitchImpl extends AbstractModelDrivenSwitch { @Override public Future> getAllFlowStatisticsFromFlowTable( - GetAllFlowStatisticsFromFlowTableInput arg0) { + final GetAllFlowStatisticsFromFlowTableInput arg0) { // Generate xid to associate it with the request Long xid = this.getSessionContext().getNextXid(); @@ -1370,7 +1293,7 @@ public class ModelDrivenSwitchImpl extends AbstractModelDrivenSwitch { @Override public Future> getAllFlowsStatisticsFromAllFlowTables( - GetAllFlowsStatisticsFromAllFlowTablesInput arg0) { + final GetAllFlowsStatisticsFromAllFlowTablesInput arg0) { // Generate xid to associate it with the request Long xid = this.getSessionContext().getNextXid(); @@ -1418,7 +1341,7 @@ public class ModelDrivenSwitchImpl extends AbstractModelDrivenSwitch { @Override public Future> getFlowStatisticsFromFlowTable( - GetFlowStatisticsFromFlowTableInput arg0) { + final GetFlowStatisticsFromFlowTableInput arg0) { // Generate xid to associate it with the request Long xid = this.getSessionContext().getNextXid(); @@ -1481,7 +1404,7 @@ public class ModelDrivenSwitchImpl extends AbstractModelDrivenSwitch { @Override public Future> getAggregateFlowStatisticsFromFlowTableForAllFlows( - GetAggregateFlowStatisticsFromFlowTableForAllFlowsInput arg0) { + final GetAggregateFlowStatisticsFromFlowTableForAllFlowsInput arg0) { // Generate xid to associate it with the request Long xid = this.getSessionContext().getNextXid(); @@ -1527,7 +1450,7 @@ public class ModelDrivenSwitchImpl extends AbstractModelDrivenSwitch { @Override public Future> getAggregateFlowStatisticsFromFlowTableForGivenMatch( - GetAggregateFlowStatisticsFromFlowTableForGivenMatchInput arg0) { + final GetAggregateFlowStatisticsFromFlowTableForGivenMatchInput arg0) { // Generate xid to associate it with the request Long xid = this.getSessionContext().getNextXid(); @@ -1580,7 +1503,7 @@ public class ModelDrivenSwitchImpl extends AbstractModelDrivenSwitch { } @Override - public Future> getFlowTablesStatistics(GetFlowTablesStatisticsInput arg0) { + public Future> getFlowTablesStatistics(final GetFlowTablesStatisticsInput arg0) { // Generate xid to associate it with the request Long xid = this.getSessionContext().getNextXid(); @@ -1618,7 +1541,7 @@ public class ModelDrivenSwitchImpl extends AbstractModelDrivenSwitch { @Override public Future> getAllQueuesStatisticsFromAllPorts( - GetAllQueuesStatisticsFromAllPortsInput arg0) { + final GetAllQueuesStatisticsFromAllPortsInput arg0) { // Generate xid to associate it with the request Long xid = this.getSessionContext().getNextXid(); @@ -1663,7 +1586,7 @@ public class ModelDrivenSwitchImpl extends AbstractModelDrivenSwitch { @Override public Future> getAllQueuesStatisticsFromGivenPort( - GetAllQueuesStatisticsFromGivenPortInput arg0) { + final GetAllQueuesStatisticsFromGivenPortInput arg0) { // Generate xid to associate it with the request Long xid = this.getSessionContext().getNextXid(); @@ -1709,7 +1632,7 @@ public class ModelDrivenSwitchImpl extends AbstractModelDrivenSwitch { @Override public Future> getQueueStatisticsFromGivenPort( - GetQueueStatisticsFromGivenPortInput arg0) { + final GetQueueStatisticsFromGivenPortInput arg0) { // Generate xid to associate it with the request Long xid = this.getSessionContext().getNextXid(); @@ -1751,5 +1674,21 @@ public class ModelDrivenSwitchImpl extends AbstractModelDrivenSwitch { RpcResult rpcResult = Rpcs.getRpcResult(true, output.build(), errors); return Futures.immediateFuture(rpcResult); } + + /** + * @param input + * @param cookie + * @param session + * @param messageService + * @return barrier result + */ + public static Future> sendBarrier( + SwitchConnectionDistinguisher cookie, SessionContext session, + IMessageDispatchService messageService) { + BarrierInputBuilder barrierInput = new BarrierInputBuilder(); + barrierInput.setVersion(session.getFeatures().getVersion()); + barrierInput.setXid(session.getNextXid()); + return messageService.barrier(barrierInput.build(), cookie); + } } diff --git a/openflowplugin/src/main/java/org/opendaylight/openflowplugin/openflow/md/core/sal/OFRpcFutureResultTransformFactory.java b/openflowplugin/src/main/java/org/opendaylight/openflowplugin/openflow/md/core/sal/OFRpcFutureResultTransformFactory.java new file mode 100644 index 0000000000..a0bb177b25 --- /dev/null +++ b/openflowplugin/src/main/java/org/opendaylight/openflowplugin/openflow/md/core/sal/OFRpcFutureResultTransformFactory.java @@ -0,0 +1,65 @@ +/** + * Copyright (c) 2013 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.openflowplugin.openflow.md.core.sal; + +import java.util.Collection; + +import org.opendaylight.controller.sal.common.util.Rpcs; +import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.AddFlowOutput; +import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.AddFlowOutputBuilder; +import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.UpdateFlowOutput; +import org.opendaylight.yangtools.yang.common.RpcError; +import org.opendaylight.yangtools.yang.common.RpcResult; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.base.Function; + +/** + * collection of transformation functions dedicated to rpc future results + */ +public abstract class OFRpcFutureResultTransformFactory { + + protected static Logger LOG = LoggerFactory + .getLogger(OFRpcFutureResultTransformFactory.class); + + /** + * @return translator from {@link UpdateFlowOutput} to {@link AddFlowOutput} + */ + public static Function,RpcResult> createForAddFlowOutput() { + return new Function,RpcResult>() { + + @Override + public RpcResult apply(final RpcResult input) { + + UpdateFlowOutput updateFlowOutput = input.getResult(); + + AddFlowOutputBuilder addFlowOutput = new AddFlowOutputBuilder(); + addFlowOutput.setTransactionId(updateFlowOutput.getTransactionId()); + AddFlowOutput result = addFlowOutput.build(); + + RpcResult rpcResult = assembleRpcResult(input, result); + LOG.debug("Returning the Add Flow RPC result to MD-SAL"); + return rpcResult; + } + + }; + } + + + /** + * @param input + * @param result + * @return + */ + protected static RpcResult assembleRpcResult(RpcResult input, E result) { + Collection errors = input.getErrors(); + RpcResult rpcResult = Rpcs.getRpcResult(input.isSuccessful(), result, errors); + return rpcResult; + } +} diff --git a/openflowplugin/src/main/java/org/opendaylight/openflowplugin/openflow/md/core/sal/OFRpcTask.java b/openflowplugin/src/main/java/org/opendaylight/openflowplugin/openflow/md/core/sal/OFRpcTask.java new file mode 100644 index 0000000000..b153fcc420 --- /dev/null +++ b/openflowplugin/src/main/java/org/opendaylight/openflowplugin/openflow/md/core/sal/OFRpcTask.java @@ -0,0 +1,121 @@ +/** + * Copyright (c) 2013 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.openflowplugin.openflow.md.core.sal; + +import org.opendaylight.controller.sal.binding.api.NotificationProviderService; +import org.opendaylight.openflowplugin.openflow.md.core.SwitchConnectionDistinguisher; +import org.opendaylight.openflowplugin.openflow.md.core.session.IMessageDispatchService; +import org.opendaylight.openflowplugin.openflow.md.core.session.SessionContext; + +import com.google.common.util.concurrent.SettableFuture; + +/** + * @param input type + * @param future output type + */ +public abstract class OFRpcTask implements Runnable { + + private SwitchConnectionDistinguisher cookie; + private IMessageDispatchService messageService; + private SessionContext session; + private T input; + private SettableFuture result; + private NotificationProviderService rpcNotificationProviderService; + + /** + * @return the result + */ + public SettableFuture getResult() { + return result; + } + + /** + * @param result the result to set + */ + public void setResult(SettableFuture result) { + this.result = result; + } + + /** + * @return the cookie + */ + public SwitchConnectionDistinguisher getCookie() { + return cookie; + } + + /** + * @return the messageService + */ + public IMessageDispatchService getMessageService() { + return messageService; + } + + /** + * @return the session + */ + public SessionContext getSession() { + return session; + } + + /** + * @return protocol version + */ + public Short getVersion() { + return session.getFeatures().getVersion(); + } + + /** + * @param cookie the cookie to set + */ + public void setCookie(SwitchConnectionDistinguisher cookie) { + this.cookie = cookie; + } + + /** + * @param messageService the messageService to set + */ + public void setMessageService(IMessageDispatchService messageService) { + this.messageService = messageService; + } + + /** + * @param session the session to set + */ + public void setSession(SessionContext session) { + this.session = session; + } + + /** + * @return the input + */ + public T getInput() { + return input; + } + + /** + * @param input the input to set + */ + public void setInput(T input) { + this.input = input; + } + + /** + * @param rpcNotificationProviderService + */ + public void setRpcNotificationProviderService( + NotificationProviderService rpcNotificationProviderService) { + this.rpcNotificationProviderService = rpcNotificationProviderService; + } + + /** + * @return the rpcNotificationProviderService + */ + public NotificationProviderService getRpcNotificationProviderService() { + return rpcNotificationProviderService; + } +} diff --git a/openflowplugin/src/main/java/org/opendaylight/openflowplugin/openflow/md/core/sal/OFRpcTaskFactory.java b/openflowplugin/src/main/java/org/opendaylight/openflowplugin/openflow/md/core/sal/OFRpcTaskFactory.java new file mode 100644 index 0000000000..5bb1119138 --- /dev/null +++ b/openflowplugin/src/main/java/org/opendaylight/openflowplugin/openflow/md/core/sal/OFRpcTaskFactory.java @@ -0,0 +1,111 @@ +/** + * Copyright (c) 2013 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.openflowplugin.openflow.md.core.sal; + +import java.math.BigInteger; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; + +import org.opendaylight.openflowplugin.openflow.md.core.sal.convertor.FlowConvertor; +import org.opendaylight.openflowplugin.openflow.md.core.session.TransactionKey; +import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.AddFlowInput; +import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.FlowAddedBuilder; +import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.UpdateFlowInput; +import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.UpdateFlowOutput; +import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.transaction.rev131103.TransactionId; +import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.FlowModInputBuilder; +import org.opendaylight.yangtools.yang.common.RpcResult; + +/** + * + */ +public abstract class OFRpcTaskFactory { + + /** + * @param maxTimeout + * @param maxTimeoutUnit + * @param helper + * @return UpdateFlow task + */ + public static OFRpcTask> createAddFlowTask( + final long maxTimeout, final TimeUnit maxTimeoutUnit, final OFRpcTaskHelper helper) { + OFRpcTask> task = + new OFRpcTask>() { + + @Override + public void run() { + helper.rawBarrierSend(maxTimeout, maxTimeoutUnit, getInput().isBarrier(), getCookie(), getResult()); + if (getResult().isDone()) { + return; + } + + // Convert the AddFlowInput to FlowModInput + FlowModInputBuilder ofFlowModInput = FlowConvertor.toFlowModInput(getInput(), + getVersion(), getSession().getFeatures().getDatapathId()); + Long xId = getSession().getNextXid(); + ofFlowModInput.setXid(xId); + + if (null != getRpcNotificationProviderService()) { + FlowAddedBuilder newFlow = new FlowAddedBuilder( + (org.opendaylight.yang.gen.v1.urn.opendaylight.flow.types.rev131026.Flow) getInput()); + newFlow.setTransactionId(new TransactionId(BigInteger.valueOf(xId.intValue()))); + newFlow.setFlowRef(getInput().getFlowRef()); + getRpcNotificationProviderService().publish(newFlow.build()); + } + + getSession().getbulkTransactionCache().put(new TransactionKey(xId), getInput()); + Future> resultFromOFLib = + getMessageService().flowMod(ofFlowModInput.build(), getCookie()); + OFRpcTaskHelper.chainFutures(resultFromOFLib, getResult()); + } + }; + return task; + } + + /** + * @param maxTimeout + * @param maxTimeoutUnit + * @param helper + * @return UpdateFlow task + */ + public static OFRpcTask> createUpdateFlowTask( + final long maxTimeout, final TimeUnit maxTimeoutUnit, final OFRpcTaskHelper helper) { + OFRpcTask> task = + new OFRpcTask>() { + + @Override + public void run() { + helper.rawBarrierSend(maxTimeout, maxTimeoutUnit, getInput().getUpdatedFlow().isBarrier(), getCookie(), getResult()); + if (getResult().isDone()) { + return; + } + + // Convert the AddFlowInput to FlowModInput + FlowModInputBuilder ofFlowModInput = FlowConvertor.toFlowModInput(getInput().getUpdatedFlow(), + getVersion(), getSession().getFeatures().getDatapathId()); + Long xId = getSession().getNextXid(); + ofFlowModInput.setXid(xId); + + if (null != getRpcNotificationProviderService()) { + FlowAddedBuilder newFlow = new FlowAddedBuilder( + (org.opendaylight.yang.gen.v1.urn.opendaylight.flow.types.rev131026.Flow) getInput()); + newFlow.setTransactionId(new TransactionId(BigInteger.valueOf(xId.intValue()))); + newFlow.setFlowRef(getInput().getFlowRef()); + getRpcNotificationProviderService().publish(newFlow.build()); + } + + getSession().getbulkTransactionCache().put(new TransactionKey(xId), getInput()); + Future> resultFromOFLib = + getMessageService().flowMod(ofFlowModInput.build(), getCookie()); + OFRpcTaskHelper.chainFutures(resultFromOFLib, getResult()); + } + }; + return task; + } + +} diff --git a/openflowplugin/src/main/java/org/opendaylight/openflowplugin/openflow/md/core/sal/OFRpcTaskHelper.java b/openflowplugin/src/main/java/org/opendaylight/openflowplugin/openflow/md/core/sal/OFRpcTaskHelper.java new file mode 100644 index 0000000000..d844a65b69 --- /dev/null +++ b/openflowplugin/src/main/java/org/opendaylight/openflowplugin/openflow/md/core/sal/OFRpcTaskHelper.java @@ -0,0 +1,109 @@ +/** + * Copyright (c) 2013 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.openflowplugin.openflow.md.core.sal; + +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; + +import org.opendaylight.controller.sal.binding.api.NotificationProviderService; +import org.opendaylight.controller.sal.common.util.Rpcs; +import org.opendaylight.openflowplugin.openflow.md.core.SwitchConnectionDistinguisher; +import org.opendaylight.openflowplugin.openflow.md.core.session.IMessageDispatchService; +import org.opendaylight.openflowplugin.openflow.md.core.session.SessionContext; +import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.BarrierOutput; +import org.opendaylight.yangtools.yang.common.RpcResult; + +import com.google.common.base.Objects; +import com.google.common.util.concurrent.FutureCallback; +import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.JdkFutureAdapters; +import com.google.common.util.concurrent.SettableFuture; + +/** + * + */ +public class OFRpcTaskHelper { + + private IMessageDispatchService messageService; + private SessionContext session; + private NotificationProviderService rpcNotificationProviderService; + /** + * @param cookie + * @param messageService + * @param session + * @param rpcNotificationProviderService + */ + public OFRpcTaskHelper(IMessageDispatchService messageService, SessionContext session, + NotificationProviderService rpcNotificationProviderService) { + this.messageService = messageService; + this.session = session; + this.rpcNotificationProviderService = rpcNotificationProviderService; + } + + + /** + * @param task + * @param input + * @param cookie + * @return inited task + */ + public OFRpcTask initTask(OFRpcTask task, T input, SwitchConnectionDistinguisher cookie) { + task.setMessageService(messageService); + task.setSession(session); + task.setRpcNotificationProviderService(rpcNotificationProviderService); + task.setResult(SettableFuture.create()); + task.setCookie(cookie); + task.setInput(input); + return task; + } + + /** + * @param intern + * @param wrapper + */ + public static void chainFutures(final Future intern, final SettableFuture wrapper) { + Futures.addCallback( + JdkFutureAdapters.listenInPoolThread(intern), + new FutureCallback() { + + @Override + public void onSuccess( + K result) { + wrapper.set(result); + } + + @Override + public void onFailure(Throwable t) { + wrapper.setException(t); + } + + }); + } + + /** + * @param maxTimeout + * @param maxTimeoutUnit + * @param isBarrier + * @param cookie + * @param result + */ + public void rawBarrierSend(final long maxTimeout, final TimeUnit maxTimeoutUnit, + Boolean isBarrier, SwitchConnectionDistinguisher cookie, SettableFuture> result) { + if (Objects.firstNonNull(isBarrier, Boolean.FALSE)) { + Future> barrierFuture = ModelDrivenSwitchImpl.sendBarrier(cookie, session, messageService); + try { + RpcResult barrierResult = barrierFuture.get(maxTimeout, maxTimeoutUnit); + if (!barrierResult.isSuccessful()) { + result.set(Rpcs.getRpcResult(false, barrierResult.getErrors())); + } + } catch (Exception e) { + result.setException(e); + } + } + } +} diff --git a/openflowplugin/src/main/java/org/opendaylight/openflowplugin/openflow/md/core/session/OFSessionUtil.java b/openflowplugin/src/main/java/org/opendaylight/openflowplugin/openflow/md/core/session/OFSessionUtil.java index 41b4fae906..e6f3df7644 100644 --- a/openflowplugin/src/main/java/org/opendaylight/openflowplugin/openflow/md/core/session/OFSessionUtil.java +++ b/openflowplugin/src/main/java/org/opendaylight/openflowplugin/openflow/md/core/session/OFSessionUtil.java @@ -170,7 +170,6 @@ public abstract class OFSessionUtil { * @return pop listener Map */ public static Map, Collection>> getPopListenerMapping() { - // TODO Auto-generated method stub return getSessionManager().getPopListenerMapping(); } diff --git a/openflowplugin/src/main/java/org/opendaylight/openflowplugin/openflow/md/core/session/SessionManager.java b/openflowplugin/src/main/java/org/opendaylight/openflowplugin/openflow/md/core/session/SessionManager.java index fd3bab7438..19c9180c8e 100644 --- a/openflowplugin/src/main/java/org/opendaylight/openflowplugin/openflow/md/core/session/SessionManager.java +++ b/openflowplugin/src/main/java/org/opendaylight/openflowplugin/openflow/md/core/session/SessionManager.java @@ -11,6 +11,7 @@ package org.opendaylight.openflowplugin.openflow.md.core.session; import java.util.Collection; import java.util.List; import java.util.Map; +import java.util.concurrent.ExecutorService; import org.opendaylight.controller.sal.binding.api.NotificationProviderService; import org.opendaylight.controller.sal.binding.api.data.DataProviderService; @@ -111,4 +112,14 @@ public interface SessionManager extends AutoCloseable { * @param popListenerMapping the popListenerMapping to set */ void setPopListenerMapping(Map, Collection>> popListenerMapping); + + /** + * @param newFixedThreadPool + */ + void setRpcPool(ExecutorService newFixedThreadPool); + + /** + * @return the rpcPool instance + */ + ExecutorService getRpcPool(); } diff --git a/openflowplugin/src/main/java/org/opendaylight/openflowplugin/openflow/md/core/session/SessionManagerOFImpl.java b/openflowplugin/src/main/java/org/opendaylight/openflowplugin/openflow/md/core/session/SessionManagerOFImpl.java index 9f4429dbc3..0af916d6dc 100644 --- a/openflowplugin/src/main/java/org/opendaylight/openflowplugin/openflow/md/core/session/SessionManagerOFImpl.java +++ b/openflowplugin/src/main/java/org/opendaylight/openflowplugin/openflow/md/core/session/SessionManagerOFImpl.java @@ -14,6 +14,7 @@ import java.util.List; import java.util.Map; import java.util.Map.Entry; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ExecutorService; import org.opendaylight.controller.sal.binding.api.NotificationProviderService; import org.opendaylight.controller.sal.binding.api.data.DataProviderService; @@ -195,6 +196,7 @@ public class SessionManagerOFImpl implements SessionManager { } } }; + private ExecutorService rpcPool; @Override @@ -244,6 +246,18 @@ public class SessionManagerOFImpl implements SessionManager { for (SessionContext sessionContext : sessionLot.values()) { sessionContext.getPrimaryConductor().disconnect(); } + // TODO: handle timeouted shutdown + rpcPool.shutdown(); } } + + @Override + public void setRpcPool(ExecutorService rpcPool) { + this.rpcPool = rpcPool; + } + + @Override + public ExecutorService getRpcPool() { + return rpcPool; + } } diff --git a/openflowplugin/src/main/java/org/opendaylight/openflowplugin/openflow/md/core/translator/PacketInTranslator.java b/openflowplugin/src/main/java/org/opendaylight/openflowplugin/openflow/md/core/translator/PacketInTranslator.java index 245ecb5b09..f547040998 100644 --- a/openflowplugin/src/main/java/org/opendaylight/openflowplugin/openflow/md/core/translator/PacketInTranslator.java +++ b/openflowplugin/src/main/java/org/opendaylight/openflowplugin/openflow/md/core/translator/PacketInTranslator.java @@ -54,6 +54,8 @@ public class PacketInTranslator implements IMDMessageTranslator