X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;f=openflowplugin%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fopenflowplugin%2Fopenflow%2Fmd%2Fcore%2FMDController.java;h=c425c08f375d8e5db93c6b97ddcfc865f7793c6b;hb=433f14f2bc9b3d3739861afd4a6329ad3431403f;hp=4e0d35f496ccbb347efee0371ba16aad5ef44205;hpb=9c6573de4d86bebcbbd2e623da0f09c93b4d9a24;p=openflowplugin.git diff --git a/openflowplugin/src/main/java/org/opendaylight/openflowplugin/openflow/md/core/MDController.java b/openflowplugin/src/main/java/org/opendaylight/openflowplugin/openflow/md/core/MDController.java index 4e0d35f496..c425c08f37 100644 --- a/openflowplugin/src/main/java/org/opendaylight/openflowplugin/openflow/md/core/MDController.java +++ b/openflowplugin/src/main/java/org/opendaylight/openflowplugin/openflow/md/core/MDController.java @@ -8,22 +8,32 @@ package org.opendaylight.openflowplugin.openflow.md.core; +import java.util.ArrayList; import java.util.Collection; import java.util.LinkedHashSet; import java.util.List; import java.util.Map; +import java.util.concurrent.BlockingQueue; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ExecutionException; import java.util.concurrent.Future; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.RejectedExecutionException; +import java.util.concurrent.RejectedExecutionHandler; +import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; -import org.opendaylight.openflowjava.protocol.api.connection.ConnectionConfiguration; -import org.opendaylight.openflowjava.protocol.api.connection.SwitchConnectionHandler; import org.opendaylight.openflowjava.protocol.spi.connection.SwitchConnectionProvider; +import org.opendaylight.openflowplugin.api.OFConstants; +import org.opendaylight.openflowplugin.api.openflow.md.core.IMDMessageTranslator; +import org.opendaylight.openflowplugin.api.openflow.md.core.NotificationQueueWrapper; +import org.opendaylight.openflowplugin.api.openflow.md.core.TranslatorKey; +import org.opendaylight.openflowplugin.extension.api.core.extension.ExtensionConverterProvider; import org.opendaylight.openflowplugin.openflow.md.core.session.OFSessionUtil; import org.opendaylight.openflowplugin.openflow.md.core.translator.ErrorTranslator; +import org.opendaylight.openflowplugin.openflow.md.core.translator.ErrorV10Translator; import org.opendaylight.openflowplugin.openflow.md.core.translator.ExperimenterTranslator; import org.opendaylight.openflowplugin.openflow.md.core.translator.FeaturesV10ToNodeConnectorUpdatedTranslator; import org.opendaylight.openflowplugin.openflow.md.core.translator.FlowRemovedTranslator; @@ -31,23 +41,43 @@ import org.opendaylight.openflowplugin.openflow.md.core.translator.MultiPartMess import org.opendaylight.openflowplugin.openflow.md.core.translator.MultiPartReplyPortToNodeConnectorUpdatedTranslator; import org.opendaylight.openflowplugin.openflow.md.core.translator.MultipartReplyTableFeaturesToTableUpdatedTranslator; import org.opendaylight.openflowplugin.openflow.md.core.translator.MultipartReplyTranslator; +import org.opendaylight.openflowplugin.openflow.md.core.translator.NotificationPlainTranslator; import org.opendaylight.openflowplugin.openflow.md.core.translator.PacketInTranslator; +import org.opendaylight.openflowplugin.openflow.md.core.translator.PacketInV10Translator; import org.opendaylight.openflowplugin.openflow.md.core.translator.PortStatusMessageToNodeConnectorUpdatedTranslator; -import org.opendaylight.openflowplugin.openflow.md.lldp.LLDPSpeakerPopListener; -import org.opendaylight.openflowplugin.openflow.md.queue.PopListener; -import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeConnectorUpdated; -import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.SwitchFlowRemoved; +import org.opendaylight.openflowplugin.api.openflow.md.queue.PopListener; +import org.opendaylight.openflowplugin.openflow.md.util.OpenflowPortsUtil; +import org.opendaylight.openflowplugin.api.statistics.MessageSpy; import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.NodeErrorNotification; +import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.SwitchFlowRemoved; import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.statistics.rev130819.AggregateFlowStatisticsUpdate; import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.statistics.rev130819.FlowsStatisticsUpdate; import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.table.statistics.rev131215.FlowTableStatisticsUpdate; import org.opendaylight.yang.gen.v1.urn.opendaylight.group.statistics.rev131111.GroupDescStatsUpdated; import org.opendaylight.yang.gen.v1.urn.opendaylight.group.statistics.rev131111.GroupFeaturesUpdated; import org.opendaylight.yang.gen.v1.urn.opendaylight.group.statistics.rev131111.GroupStatisticsUpdated; +import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeConnectorUpdated; +import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeConnectorRemoved; +import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeRemoved; import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeUpdated; import org.opendaylight.yang.gen.v1.urn.opendaylight.meter.statistics.rev131111.MeterConfigStatsUpdated; import org.opendaylight.yang.gen.v1.urn.opendaylight.meter.statistics.rev131111.MeterFeaturesUpdated; import org.opendaylight.yang.gen.v1.urn.opendaylight.meter.statistics.rev131111.MeterStatisticsUpdated; +import org.opendaylight.yang.gen.v1.urn.opendaylight.node.error.service.rev140410.BadActionErrorNotification; +import org.opendaylight.yang.gen.v1.urn.opendaylight.node.error.service.rev140410.BadInstructionErrorNotification; +import org.opendaylight.yang.gen.v1.urn.opendaylight.node.error.service.rev140410.BadMatchErrorNotification; +import org.opendaylight.yang.gen.v1.urn.opendaylight.node.error.service.rev140410.BadRequestErrorNotification; +import org.opendaylight.yang.gen.v1.urn.opendaylight.node.error.service.rev140410.ExperimenterErrorNotification; +import org.opendaylight.yang.gen.v1.urn.opendaylight.node.error.service.rev140410.FlowModErrorNotification; +import org.opendaylight.yang.gen.v1.urn.opendaylight.node.error.service.rev140410.GroupModErrorNotification; +import org.opendaylight.yang.gen.v1.urn.opendaylight.node.error.service.rev140410.HelloFailedErrorNotification; +import org.opendaylight.yang.gen.v1.urn.opendaylight.node.error.service.rev140410.MeterModErrorNotification; +import org.opendaylight.yang.gen.v1.urn.opendaylight.node.error.service.rev140410.PortModErrorNotification; +import org.opendaylight.yang.gen.v1.urn.opendaylight.node.error.service.rev140410.QueueOpErrorNotification; +import org.opendaylight.yang.gen.v1.urn.opendaylight.node.error.service.rev140410.RoleRequestErrorNotification; +import org.opendaylight.yang.gen.v1.urn.opendaylight.node.error.service.rev140410.SwitchConfigErrorNotification; +import org.opendaylight.yang.gen.v1.urn.opendaylight.node.error.service.rev140410.TableFeaturesErrorNotification; +import org.opendaylight.yang.gen.v1.urn.opendaylight.node.error.service.rev140410.TableModErrorNotification; import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.ErrorMessage; import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.ExperimenterMessage; import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.FlowRemovedMessage; @@ -61,28 +91,36 @@ import org.opendaylight.yang.gen.v1.urn.opendaylight.packet.service.rev130709.Tr import org.opendaylight.yang.gen.v1.urn.opendaylight.port.statistics.rev131214.NodeConnectorStatisticsUpdate; import org.opendaylight.yang.gen.v1.urn.opendaylight.queue.statistics.rev131216.QueueStatisticsUpdate; import org.opendaylight.yang.gen.v1.urn.opendaylight.table.service.rev131026.TableUpdated; +import org.opendaylight.yangtools.yang.binding.DataContainer; import org.opendaylight.yangtools.yang.binding.DataObject; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import com.google.common.collect.Lists; +import com.google.common.util.concurrent.ForwardingBlockingQueue; +import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.ListeningExecutorService; +import com.google.common.util.concurrent.MoreExecutors; /** - * @author mirehak * */ -public class MDController implements IMDController { +public class MDController implements IMDController, AutoCloseable { private static final Logger LOG = LoggerFactory.getLogger(MDController.class); - private SwitchConnectionProvider switchConnectionProvider; + private Collection switchConnectionProviders; private ConcurrentMap>>> messageTranslators; private Map, Collection>> popListeners; + private MessageSpy messageSpyCounter; + + final private int OF10 = OFConstants.OFP_VERSION_1_0; + final private int OF13 = OFConstants.OFP_VERSION_1_3; - final private int OF10 = 1; - final private int OF13 = 4; + private ErrorHandlerSimpleImpl errorHandler; + private ExtensionConverterProvider extensionConverterProvider; /** * @return translator mapping @@ -95,15 +133,18 @@ public class MDController implements IMDController { * provisioning of translator mapping */ public void init() { - LOG.debug("Initializing!"); + LOG.debug("init"); + + OpenflowPortsUtil.init(); + messageTranslators = new ConcurrentHashMap<>(); popListeners = new ConcurrentHashMap<>(); //TODO: move registration to factory - addMessageTranslator(ErrorMessage.class, OF10, new ErrorTranslator()); + addMessageTranslator(ErrorMessage.class, OF10, new ErrorV10Translator()); addMessageTranslator(ErrorMessage.class, OF13, new ErrorTranslator()); addMessageTranslator(FlowRemovedMessage.class, OF10, new FlowRemovedTranslator()); addMessageTranslator(FlowRemovedMessage.class, OF13, new FlowRemovedTranslator()); - addMessageTranslator(PacketInMessage.class,OF10, new PacketInTranslator()); + addMessageTranslator(PacketInMessage.class,OF10, new PacketInV10Translator()); addMessageTranslator(PacketInMessage.class,OF13, new PacketInTranslator()); addMessageTranslator(PortStatusMessage.class,OF10, new PortStatusMessageToNodeConnectorUpdatedTranslator()); addMessageTranslator(PortStatusMessage.class,OF13, new PortStatusMessageToNodeConnectorUpdatedTranslator()); @@ -115,22 +156,45 @@ public class MDController implements IMDController { addMessageTranslator(MultipartReplyMessage.class,OF13, new MultipartReplyTranslator()); addMessageTranslator(MultipartReplyMessage.class,OF13,new MultipartReplyTableFeaturesToTableUpdatedTranslator()); addMessageTranslator(GetFeaturesOutput.class,OF10, new FeaturesV10ToNodeConnectorUpdatedTranslator()); + addMessageTranslator(NotificationQueueWrapper.class, OF10, new NotificationPlainTranslator()); + addMessageTranslator(NotificationQueueWrapper.class, OF13, new NotificationPlainTranslator()); - //TODO: move registration to factory NotificationPopListener notificationPopListener = new NotificationPopListener(); + notificationPopListener.setNotificationProviderService( + OFSessionUtil.getSessionManager().getNotificationProviderService()); + notificationPopListener.setMessageSpy(messageSpyCounter); + + //TODO: move registration to factory addMessagePopListener(NodeErrorNotification.class, notificationPopListener); + addMessagePopListener(BadActionErrorNotification.class, notificationPopListener); + addMessagePopListener(BadInstructionErrorNotification.class, notificationPopListener); + addMessagePopListener(BadMatchErrorNotification.class, notificationPopListener); + addMessagePopListener(BadRequestErrorNotification.class, notificationPopListener); + addMessagePopListener(ExperimenterErrorNotification.class, notificationPopListener); + addMessagePopListener(FlowModErrorNotification.class, notificationPopListener); + addMessagePopListener(GroupModErrorNotification.class, notificationPopListener); + addMessagePopListener(HelloFailedErrorNotification.class, notificationPopListener); + addMessagePopListener(MeterModErrorNotification.class, notificationPopListener); + addMessagePopListener(PortModErrorNotification.class, notificationPopListener); + addMessagePopListener(QueueOpErrorNotification.class, notificationPopListener); + addMessagePopListener(RoleRequestErrorNotification.class, notificationPopListener); + addMessagePopListener(SwitchConfigErrorNotification.class, notificationPopListener); + addMessagePopListener(TableFeaturesErrorNotification.class, notificationPopListener); + addMessagePopListener(TableModErrorNotification.class, notificationPopListener); addMessagePopListener(NodeConnectorUpdated.class,notificationPopListener); + addMessagePopListener(NodeConnectorRemoved.class,notificationPopListener); addMessagePopListener(PacketReceived.class,notificationPopListener); addMessagePopListener(TransmitPacketInput.class, notificationPopListener); addMessagePopListener(NodeUpdated.class, notificationPopListener); + addMessagePopListener(NodeRemoved.class, notificationPopListener); addMessagePopListener(SwitchFlowRemoved.class, notificationPopListener); addMessagePopListener(TableUpdated.class, notificationPopListener); - + //Notification registration for flow statistics addMessagePopListener(FlowsStatisticsUpdate.class, notificationPopListener); addMessagePopListener(AggregateFlowStatisticsUpdate.class, notificationPopListener); - + //Notification registrations for group-statistics addMessagePopListener(GroupStatisticsUpdated.class, notificationPopListener); addMessagePopListener(GroupFeaturesUpdated.class, notificationPopListener); @@ -143,38 +207,73 @@ public class MDController implements IMDController { //Notification registration for port-statistics addMessagePopListener(NodeConnectorStatisticsUpdate.class, notificationPopListener); - + //Notification registration for flow-table statistics addMessagePopListener(FlowTableStatisticsUpdate.class, notificationPopListener); - + //Notification registration for queue-statistics addMessagePopListener(QueueStatisticsUpdate.class, notificationPopListener); - //Notification for LLDPSpeaker - LLDPSpeakerPopListener lldpPopListener = new LLDPSpeakerPopListener(); - addMessagePopListener(NodeConnectorUpdated.class,lldpPopListener); - // Push the updated Listeners to Session Manager which will be then picked up by ConnectionConductor eventually OFSessionUtil.getSessionManager().setTranslatorMapping(messageTranslators); OFSessionUtil.getSessionManager().setPopListenerMapping(popListeners); + OFSessionUtil.getSessionManager().setMessageSpy(messageSpyCounter); + + // prepare worker pool for rpc + // TODO: get size from configSubsystem + int rpcThreadLimit = 10; + ListeningExecutorService rpcPoolDelegator = createRpcPoolSpyDecorated(rpcThreadLimit, messageSpyCounter); + OFSessionUtil.getSessionManager().setRpcPool(rpcPoolDelegator); + OFSessionUtil.getSessionManager().setExtensionConverterProvider(extensionConverterProvider); + } /** - * @param switchConnectionProvider - * the switchConnectionProvider to set + * @param rpcThreadLimit + * @param messageSpy + * @return */ - public void setSwitchConnectionProvider(SwitchConnectionProvider switchConnectionProvider) { - this.switchConnectionProvider = switchConnectionProvider; + private static ListeningExecutorService createRpcPoolSpyDecorated(final int rpcThreadLimit, final MessageSpy messageSpy) { + final BlockingQueue delegate = new LinkedBlockingQueue<>(100000); + final BlockingQueue queue = new ForwardingBlockingQueue() { + @Override + protected BlockingQueue delegate() { + return delegate; + } + + @Override + public boolean offer(final Runnable r) { + // ThreadPoolExecutor will spawn a new thread after core size is reached only + // if the queue.offer returns false. + return false; + } + }; + + ThreadPoolLoggingExecutor rpcPool = new ThreadPoolLoggingExecutor(rpcThreadLimit, rpcThreadLimit, 0L, + TimeUnit.MILLISECONDS, queue, "OFRpc"); + rpcPool.setRejectedExecutionHandler(new RejectedExecutionHandler() { + @Override + public void rejectedExecution(final Runnable r, final ThreadPoolExecutor executor) { + try { + executor.getQueue().put(r); + } catch (InterruptedException e) { + throw new RejectedExecutionException("Interrupted while waiting on queue", e); + } + + } + }); + ListeningExecutorService listeningRpcPool = MoreExecutors.listeningDecorator(rpcPool); + RpcListeningExecutorService rpcPoolDecorated = new RpcListeningExecutorService(listeningRpcPool); + rpcPoolDecorated.setMessageSpy(messageSpy); + return rpcPoolDecorated; } /** - * @param switchConnectionProviderToUnset - * the switchConnectionProvider to unset + * @param switchConnectionProviders + * the switchConnectionProviders to set */ - public void unsetSwitchConnectionProvider(SwitchConnectionProvider switchConnectionProviderToUnset) { - if (this.switchConnectionProvider == switchConnectionProviderToUnset) { - this.switchConnectionProvider = null; - } + public void setSwitchConnectionProviders(final Collection switchConnectionProviders) { + this.switchConnectionProviders = switchConnectionProviders; } /** @@ -184,23 +283,24 @@ public class MDController implements IMDController { */ public void start() { LOG.debug("starting .."); - LOG.debug("switchConnectionProvider: " + switchConnectionProvider); + LOG.debug("switchConnectionProvider: " + switchConnectionProviders); // setup handler - SwitchConnectionHandler switchConnectionHandler = new SwitchConnectionHandlerImpl(); - switchConnectionProvider.setSwitchConnectionHandler(switchConnectionHandler); - // configure and startup library servers - switchConnectionProvider.configure(getConnectionConfiguration()); - Future> srvStarted = switchConnectionProvider.startup(); - } + SwitchConnectionHandlerImpl switchConnectionHandler = new SwitchConnectionHandlerImpl(); + switchConnectionHandler.setMessageSpy(messageSpyCounter); - /** - * @return wished connections configurations - */ - private static Collection getConnectionConfiguration() { - // TODO:: get config from state manager - ConnectionConfiguration configuration = ConnectionConfigurationFactory.getDefault(); - ConnectionConfiguration configurationLegacy = ConnectionConfigurationFactory.getLegacy(); - return Lists.newArrayList(configuration, configurationLegacy); + errorHandler = new ErrorHandlerSimpleImpl(); + + switchConnectionHandler.setErrorHandler(errorHandler); + switchConnectionHandler.init(); + + List> starterChain = new ArrayList<>(switchConnectionProviders.size()); + for (SwitchConnectionProvider switchConnectionPrv : switchConnectionProviders) { + switchConnectionPrv.setSwitchConnectionHandler(switchConnectionHandler); + ListenableFuture isOnlineFuture = switchConnectionPrv.startup(); + starterChain.add(isOnlineFuture); + } + + Future> srvStarted = Futures.allAsList(starterChain); } /** @@ -211,12 +311,17 @@ public class MDController implements IMDController { */ public void stop() { LOG.debug("stopping"); - Future> srvStopped = switchConnectionProvider.shutdown(); + List> stopChain = new ArrayList<>(switchConnectionProviders.size()); try { - srvStopped.get(5000, TimeUnit.MILLISECONDS); + for (SwitchConnectionProvider switchConnectionPrv : switchConnectionProviders) { + ListenableFuture shutdown = switchConnectionPrv.shutdown(); + stopChain.add(shutdown); + } + Futures.allAsList(stopChain).get(5000, TimeUnit.MILLISECONDS); } catch (InterruptedException | ExecutionException | TimeoutException e) { LOG.error(e.getMessage(), e); } + close(); } /** @@ -226,11 +331,11 @@ public class MDController implements IMDController { * */ public void destroy() { - // do nothing + close(); } @Override - public void addMessageTranslator(Class messageType, int version, IMDMessageTranslator> translator) { + public void addMessageTranslator(final Class messageType, final int version, final IMDMessageTranslator> translator) { TranslatorKey tKey = new TranslatorKey(version, messageType.getName()); Collection>> existingValues = messageTranslators.get(tKey); @@ -243,7 +348,7 @@ public class MDController implements IMDController { } @Override - public void removeMessageTranslator(Class messageType, int version, IMDMessageTranslator> translator) { + public void removeMessageTranslator(final Class messageType, final int version, final IMDMessageTranslator> translator) { TranslatorKey tKey = new TranslatorKey(version, messageType.getName()); Collection>> values = messageTranslators.get(tKey); if (values != null) { @@ -256,7 +361,7 @@ public class MDController implements IMDController { } @Override - public void addMessagePopListener(Class messageType, PopListener popListener) { + public void addMessagePopListener(final Class messageType, final PopListener popListener) { Collection> existingValues = popListeners.get(messageType); if (existingValues == null) { existingValues = new LinkedHashSet<>(); @@ -267,7 +372,7 @@ public class MDController implements IMDController { } @Override - public void removeMessagePopListener(Class messageType, PopListener popListener) { + public void removeMessagePopListener(final Class messageType, final PopListener popListener) { Collection> values = popListeners.get(messageType); if (values != null) { values.remove(popListener); @@ -278,5 +383,33 @@ public class MDController implements IMDController { } } + /** + * @param messageSpyCounter the messageSpyCounter to set + */ + public void setMessageSpyCounter( + final MessageSpy messageSpyCounter) { + this.messageSpyCounter = messageSpyCounter; + } + + @Override + public void close() { + LOG.debug("close"); + messageSpyCounter = null; + messageTranslators = null; + popListeners = null; + for (SwitchConnectionProvider switchConnectionPrv : switchConnectionProviders) { + switchConnectionPrv.setSwitchConnectionHandler(null); + } + switchConnectionProviders = null; + OpenflowPortsUtil.close(); + OFSessionUtil.releaseSessionManager(); + errorHandler = null; + } + /** + * @param extensionConverterProvider + */ + public void setExtensionConverterProvider(ExtensionConverterProvider extensionConverterProvider) { + this.extensionConverterProvider = extensionConverterProvider; + } }