X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;ds=sidebyside;f=openflowplugin%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fopenflowplugin%2Fopenflow%2Fmd%2Fcore%2FMDController.java;h=acd3814e7f192c9b1cb479794ae8fea1ff8bd6b8;hb=d4b81f43deae62aaea2f3ed2eb10652eabaa1a5c;hp=0a9fcfe12b39ab8114a55e3d2ca34022801c6e74;hpb=e1ffff6642108f5de645e1c38cff955f284c72c5;p=openflowplugin.git diff --git a/openflowplugin/src/main/java/org/opendaylight/openflowplugin/openflow/md/core/MDController.java b/openflowplugin/src/main/java/org/opendaylight/openflowplugin/openflow/md/core/MDController.java index 0a9fcfe12b..acd3814e7f 100644 --- a/openflowplugin/src/main/java/org/opendaylight/openflowplugin/openflow/md/core/MDController.java +++ b/openflowplugin/src/main/java/org/opendaylight/openflowplugin/openflow/md/core/MDController.java @@ -8,48 +8,113 @@ package org.opendaylight.openflowplugin.openflow.md.core; -import java.util.ArrayList; -import java.util.Collection; -import java.util.Collections; -import java.util.List; -import java.util.Map; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentMap; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.Future; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.TimeoutException; +import com.google.common.collect.Lists; +import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.ListenableFuture; import org.opendaylight.openflowjava.protocol.api.connection.ConnectionConfiguration; -import org.opendaylight.openflowjava.protocol.api.connection.SwitchConnectionHandler; import org.opendaylight.openflowjava.protocol.spi.connection.SwitchConnectionProvider; +import org.opendaylight.openflowplugin.openflow.md.OFConstants; import org.opendaylight.openflowplugin.openflow.md.core.session.OFSessionUtil; import org.opendaylight.openflowplugin.openflow.md.core.translator.ErrorTranslator; +import org.opendaylight.openflowplugin.openflow.md.core.translator.ErrorV10Translator; +import org.opendaylight.openflowplugin.openflow.md.core.translator.ExperimenterTranslator; +import org.opendaylight.openflowplugin.openflow.md.core.translator.FeaturesV10ToNodeConnectorUpdatedTranslator; +import org.opendaylight.openflowplugin.openflow.md.core.translator.FlowRemovedTranslator; +import org.opendaylight.openflowplugin.openflow.md.core.translator.MultiPartMessageDescToNodeUpdatedTranslator; +import org.opendaylight.openflowplugin.openflow.md.core.translator.MultiPartReplyPortToNodeConnectorUpdatedTranslator; +import org.opendaylight.openflowplugin.openflow.md.core.translator.MultipartReplyTableFeaturesToTableUpdatedTranslator; +import org.opendaylight.openflowplugin.openflow.md.core.translator.MultipartReplyTranslator; +import org.opendaylight.openflowplugin.openflow.md.core.translator.PacketInTranslator; +import org.opendaylight.openflowplugin.openflow.md.core.translator.PacketInV10Translator; +import org.opendaylight.openflowplugin.openflow.md.core.translator.PortStatusMessageToNodeConnectorUpdatedTranslator; +import org.opendaylight.openflowplugin.openflow.md.lldp.LLDPSpeakerPopListener; +import org.opendaylight.openflowplugin.openflow.md.queue.MessageSpy; +import org.opendaylight.openflowplugin.openflow.md.queue.PopListener; +import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.NodeErrorNotification; +import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.SwitchFlowRemoved; +import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.statistics.rev130819.AggregateFlowStatisticsUpdate; +import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.statistics.rev130819.FlowsStatisticsUpdate; +import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.table.statistics.rev131215.FlowTableStatisticsUpdate; +import org.opendaylight.yang.gen.v1.urn.opendaylight.group.statistics.rev131111.GroupDescStatsUpdated; +import org.opendaylight.yang.gen.v1.urn.opendaylight.group.statistics.rev131111.GroupFeaturesUpdated; +import org.opendaylight.yang.gen.v1.urn.opendaylight.group.statistics.rev131111.GroupStatisticsUpdated; +import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeConnectorUpdated; +import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeUpdated; +import org.opendaylight.yang.gen.v1.urn.opendaylight.meter.statistics.rev131111.MeterConfigStatsUpdated; +import org.opendaylight.yang.gen.v1.urn.opendaylight.meter.statistics.rev131111.MeterFeaturesUpdated; +import org.opendaylight.yang.gen.v1.urn.opendaylight.meter.statistics.rev131111.MeterStatisticsUpdated; +import org.opendaylight.yang.gen.v1.urn.opendaylight.node.error.service.rev140410.BadActionErrorNotification; +import org.opendaylight.yang.gen.v1.urn.opendaylight.node.error.service.rev140410.BadInstructionErrorNotification; +import org.opendaylight.yang.gen.v1.urn.opendaylight.node.error.service.rev140410.BadMatchErrorNotification; +import org.opendaylight.yang.gen.v1.urn.opendaylight.node.error.service.rev140410.BadRequestErrorNotification; +import org.opendaylight.yang.gen.v1.urn.opendaylight.node.error.service.rev140410.ExperimenterErrorNotification; +import org.opendaylight.yang.gen.v1.urn.opendaylight.node.error.service.rev140410.FlowModErrorNotification; +import org.opendaylight.yang.gen.v1.urn.opendaylight.node.error.service.rev140410.GroupModErrorNotification; +import org.opendaylight.yang.gen.v1.urn.opendaylight.node.error.service.rev140410.HelloFailedErrorNotification; +import org.opendaylight.yang.gen.v1.urn.opendaylight.node.error.service.rev140410.MeterModErrorNotification; +import org.opendaylight.yang.gen.v1.urn.opendaylight.node.error.service.rev140410.PortModErrorNotification; +import org.opendaylight.yang.gen.v1.urn.opendaylight.node.error.service.rev140410.QueueOpErrorNotification; +import org.opendaylight.yang.gen.v1.urn.opendaylight.node.error.service.rev140410.RoleRequestErrorNotification; +import org.opendaylight.yang.gen.v1.urn.opendaylight.node.error.service.rev140410.SwitchConfigErrorNotification; +import org.opendaylight.yang.gen.v1.urn.opendaylight.node.error.service.rev140410.TableFeaturesErrorNotification; +import org.opendaylight.yang.gen.v1.urn.opendaylight.node.error.service.rev140410.TableModErrorNotification; import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.ErrorMessage; +import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.ExperimenterMessage; +import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.FlowRemovedMessage; +import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.GetFeaturesOutput; +import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.MultipartReplyMessage; import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.OfHeader; +import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.PacketInMessage; +import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.PortStatusMessage; +import org.opendaylight.yang.gen.v1.urn.opendaylight.packet.service.rev130709.PacketReceived; +import org.opendaylight.yang.gen.v1.urn.opendaylight.packet.service.rev130709.TransmitPacketInput; +import org.opendaylight.yang.gen.v1.urn.opendaylight.port.statistics.rev131214.NodeConnectorStatisticsUpdate; +import org.opendaylight.yang.gen.v1.urn.opendaylight.queue.statistics.rev131216.QueueStatisticsUpdate; +import org.opendaylight.yang.gen.v1.urn.opendaylight.table.service.rev131026.TableUpdated; import org.opendaylight.yangtools.yang.binding.DataObject; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import com.google.common.collect.Collections2; -import com.google.common.collect.Lists; +import java.util.ArrayList; +import java.util.Collection; +import java.util.LinkedHashSet; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; /** * @author mirehak * */ -public class MDController implements IMDController { +public class MDController implements IMDController, AutoCloseable { private static final Logger LOG = LoggerFactory.getLogger(MDController.class); - private SwitchConnectionProvider switchConnectionProvider; + private Collection switchConnectionProviders; + + private ConcurrentMap>>> messageTranslators; + private Map, Collection>> popListeners; + private MessageSpy messageSpyCounter; + + final private int OF10 = OFConstants.OFP_VERSION_1_0; + final private int OF13 = OFConstants.OFP_VERSION_1_3; + + private ErrorHandlerQueueImpl errorHandler; + private ExecutorService rpcPool; - private ConcurrentMap>> messageTranslators; /** * @return translator mapping */ - public Map>> getMessageTranslators() { + public Map>>> getMessageTranslators() { return messageTranslators; } @@ -57,31 +122,96 @@ public class MDController implements IMDController { * provisioning of translator mapping */ public void init() { - LOG.debug("Initializing!"); + LOG.debug("init"); messageTranslators = new ConcurrentHashMap<>(); - addMessageTranslator(ErrorMessage.class, 4, new ErrorTranslator()); - addMessageTranslator(ErrorMessage.class, 1, new ErrorTranslator()); + popListeners = new ConcurrentHashMap<>(); + //TODO: move registration to factory + addMessageTranslator(ErrorMessage.class, OF10, new ErrorV10Translator()); + addMessageTranslator(ErrorMessage.class, OF13, new ErrorTranslator()); + addMessageTranslator(FlowRemovedMessage.class, OF10, new FlowRemovedTranslator()); + addMessageTranslator(FlowRemovedMessage.class, OF13, new FlowRemovedTranslator()); + addMessageTranslator(PacketInMessage.class,OF10, new PacketInV10Translator()); + addMessageTranslator(PacketInMessage.class,OF13, new PacketInTranslator()); + addMessageTranslator(PortStatusMessage.class,OF10, new PortStatusMessageToNodeConnectorUpdatedTranslator()); + addMessageTranslator(PortStatusMessage.class,OF13, new PortStatusMessageToNodeConnectorUpdatedTranslator()); + addMessageTranslator(MultipartReplyMessage.class,OF13,new MultiPartReplyPortToNodeConnectorUpdatedTranslator()); + addMessageTranslator(MultipartReplyMessage.class,OF10, new MultiPartMessageDescToNodeUpdatedTranslator()); + addMessageTranslator(MultipartReplyMessage.class,OF13, new MultiPartMessageDescToNodeUpdatedTranslator()); + addMessageTranslator(ExperimenterMessage.class, OF10, new ExperimenterTranslator()); + addMessageTranslator(MultipartReplyMessage.class,OF10, new MultipartReplyTranslator()); + addMessageTranslator(MultipartReplyMessage.class,OF13, new MultipartReplyTranslator()); + addMessageTranslator(MultipartReplyMessage.class,OF13,new MultipartReplyTableFeaturesToTableUpdatedTranslator()); + addMessageTranslator(GetFeaturesOutput.class,OF10, new FeaturesV10ToNodeConnectorUpdatedTranslator()); + + //TODO: move registration to factory + NotificationPopListener notificationPopListener = new NotificationPopListener(); + addMessagePopListener(NodeErrorNotification.class, notificationPopListener); + addMessagePopListener(BadActionErrorNotification.class, notificationPopListener); + addMessagePopListener(BadInstructionErrorNotification.class, notificationPopListener); + addMessagePopListener(BadMatchErrorNotification.class, notificationPopListener); + addMessagePopListener(BadRequestErrorNotification.class, notificationPopListener); + addMessagePopListener(ExperimenterErrorNotification.class, notificationPopListener); + addMessagePopListener(FlowModErrorNotification.class, notificationPopListener); + addMessagePopListener(GroupModErrorNotification.class, notificationPopListener); + addMessagePopListener(HelloFailedErrorNotification.class, notificationPopListener); + addMessagePopListener(MeterModErrorNotification.class, notificationPopListener); + addMessagePopListener(PortModErrorNotification.class, notificationPopListener); + addMessagePopListener(QueueOpErrorNotification.class, notificationPopListener); + addMessagePopListener(RoleRequestErrorNotification.class, notificationPopListener); + addMessagePopListener(SwitchConfigErrorNotification.class, notificationPopListener); + addMessagePopListener(TableFeaturesErrorNotification.class, notificationPopListener); + addMessagePopListener(TableModErrorNotification.class, notificationPopListener); + addMessagePopListener(NodeConnectorUpdated.class,notificationPopListener); + addMessagePopListener(PacketReceived.class,notificationPopListener); + addMessagePopListener(TransmitPacketInput.class, notificationPopListener); + addMessagePopListener(NodeUpdated.class, notificationPopListener); + + addMessagePopListener(SwitchFlowRemoved.class, notificationPopListener); + addMessagePopListener(TableUpdated.class, notificationPopListener); + + //Notification registration for flow statistics + addMessagePopListener(FlowsStatisticsUpdate.class, notificationPopListener); + addMessagePopListener(AggregateFlowStatisticsUpdate.class, notificationPopListener); + + //Notification registrations for group-statistics + addMessagePopListener(GroupStatisticsUpdated.class, notificationPopListener); + addMessagePopListener(GroupFeaturesUpdated.class, notificationPopListener); + addMessagePopListener(GroupDescStatsUpdated.class, notificationPopListener); + + //Notification registrations for meter-statistics + addMessagePopListener(MeterStatisticsUpdated.class, notificationPopListener); + addMessagePopListener(MeterConfigStatsUpdated.class, notificationPopListener); + addMessagePopListener(MeterFeaturesUpdated.class, notificationPopListener); + + //Notification registration for port-statistics + addMessagePopListener(NodeConnectorStatisticsUpdate.class, notificationPopListener); + + //Notification registration for flow-table statistics + addMessagePopListener(FlowTableStatisticsUpdate.class, notificationPopListener); + + //Notification registration for queue-statistics + addMessagePopListener(QueueStatisticsUpdate.class, notificationPopListener); + + //Notification for LLDPSpeaker + LLDPSpeakerPopListener lldpPopListener = new LLDPSpeakerPopListener(); + addMessagePopListener(NodeConnectorUpdated.class,lldpPopListener); // Push the updated Listeners to Session Manager which will be then picked up by ConnectionConductor eventually OFSessionUtil.getSessionManager().setTranslatorMapping(messageTranslators); + OFSessionUtil.getSessionManager().setPopListenerMapping(popListeners); + + // prepare worker pool for rpc + // TODO: get size from configSubsystem + OFSessionUtil.getSessionManager().setRpcPool(Executors.newFixedThreadPool(10)); + } /** - * @param switchConnectionProvider - * the switchConnectionProvider to set - */ - public void setSwitchConnectionProvider(SwitchConnectionProvider switchConnectionProvider) { - this.switchConnectionProvider = switchConnectionProvider; - } - - /** - * @param switchConnectionProviderToUnset - * the switchConnectionProvider to unset + * @param switchConnectionProviders + * the switchConnectionProviders to set */ - public void unsetSwitchConnectionProvider(SwitchConnectionProvider switchConnectionProviderToUnset) { - if (this.switchConnectionProvider == switchConnectionProviderToUnset) { - this.switchConnectionProvider = null; - } + public void setSwitchConnectionProviders(Collection switchConnectionProviders) { + this.switchConnectionProviders = switchConnectionProviders; } /** @@ -91,13 +221,25 @@ public class MDController implements IMDController { */ public void start() { LOG.debug("starting .."); - LOG.debug("switchConnectionProvider: " + switchConnectionProvider); + LOG.debug("switchConnectionProvider: " + switchConnectionProviders); // setup handler - SwitchConnectionHandler switchConnectionHandler = new SwitchConnectionHandlerImpl(); - switchConnectionProvider.setSwitchConnectionHandler(switchConnectionHandler); - // configure and startup library servers - switchConnectionProvider.configure(getConnectionConfiguration()); - Future> srvStarted = switchConnectionProvider.startup(); + SwitchConnectionHandlerImpl switchConnectionHandler = new SwitchConnectionHandlerImpl(); + switchConnectionHandler.setMessageSpy(messageSpyCounter); + + errorHandler = new ErrorHandlerQueueImpl(); + new Thread(errorHandler).start(); + + switchConnectionHandler.setErrorHandler(errorHandler); + switchConnectionHandler.init(); + + List> starterChain = new ArrayList<>(); + for (SwitchConnectionProvider switchConnectionPrv : switchConnectionProviders) { + switchConnectionPrv.setSwitchConnectionHandler(switchConnectionHandler); + ListenableFuture isOnlineFuture = switchConnectionPrv.startup(); + starterChain.add(isOnlineFuture); + } + + Future> srvStarted = Futures.allAsList(starterChain); } /** @@ -106,7 +248,8 @@ public class MDController implements IMDController { private static Collection getConnectionConfiguration() { // TODO:: get config from state manager ConnectionConfiguration configuration = ConnectionConfigurationFactory.getDefault(); - return Lists.newArrayList(configuration); + ConnectionConfiguration configurationLegacy = ConnectionConfigurationFactory.getLegacy(); + return Lists.newArrayList(configuration, configurationLegacy); } /** @@ -117,12 +260,17 @@ public class MDController implements IMDController { */ public void stop() { LOG.debug("stopping"); - Future> srvStopped = switchConnectionProvider.shutdown(); + List> stopChain = new ArrayList<>(); try { - srvStopped.get(5000, TimeUnit.MILLISECONDS); + for (SwitchConnectionProvider switchConnectionPrv : switchConnectionProviders) { + ListenableFuture shutdown = switchConnectionPrv.shutdown(); + stopChain.add(shutdown); + } + Futures.allAsList(stopChain).get(5000, TimeUnit.MILLISECONDS); } catch (InterruptedException | ExecutionException | TimeoutException e) { LOG.error(e.getMessage(), e); } + close(); } /** @@ -132,34 +280,77 @@ public class MDController implements IMDController { * */ public void destroy() { - // do nothing + close(); } @Override - public void addMessageTranslator(Class messageType, int version, IMDMessageTranslator translator) { + public void addMessageTranslator(Class messageType, int version, IMDMessageTranslator> translator) { TranslatorKey tKey = new TranslatorKey(version, messageType.getName()); - - Collection> existingValues = messageTranslators.get(tKey); + + Collection>> existingValues = messageTranslators.get(tKey); if (existingValues == null) { - existingValues = new ArrayList<>(); + existingValues = new LinkedHashSet<>(); messageTranslators.put(tKey, existingValues); } existingValues.add(translator); - LOG.debug("{} is now listened by {}", messageType, translator); + LOG.debug("{} is now translated by {}", messageType, translator); } @Override - public void removeMessageTranslator(Class messageType, int version, IMDMessageTranslator translator) { + public void removeMessageTranslator(Class messageType, int version, IMDMessageTranslator> translator) { TranslatorKey tKey = new TranslatorKey(version, messageType.getName()); - Collection> values = messageTranslators.get(tKey); + Collection>> values = messageTranslators.get(tKey); if (values != null) { values.remove(translator); if (values.isEmpty()) { messageTranslators.remove(tKey); } - LOG.debug("{} is now removed", translator); + LOG.debug("{} is now removed from translators", translator); } } + @Override + public void addMessagePopListener(Class messageType, PopListener popListener) { + Collection> existingValues = popListeners.get(messageType); + if (existingValues == null) { + existingValues = new LinkedHashSet<>(); + popListeners.put(messageType, existingValues); + } + existingValues.add(popListener); + LOG.debug("{} is now popListened by {}", messageType, popListener); + } + + @Override + public void removeMessagePopListener(Class messageType, PopListener popListener) { + Collection> values = popListeners.get(messageType); + if (values != null) { + values.remove(popListener); + if (values.isEmpty()) { + popListeners.remove(messageType); + } + LOG.debug("{} is now removed from popListeners", popListener); + } + } + /** + * @param messageSpyCounter the messageSpyCounter to set + */ + public void setMessageSpyCounter( + MessageSpy messageSpyCounter) { + this.messageSpyCounter = messageSpyCounter; + } + + @Override + public void close() { + LOG.debug("close"); + messageSpyCounter = null; + messageTranslators = null; + popListeners = null; + for (SwitchConnectionProvider switchConnectionPrv : switchConnectionProviders) { + switchConnectionPrv.setSwitchConnectionHandler(null); + } + switchConnectionProviders = null; + OFSessionUtil.releaseSessionManager(); + errorHandler.close(); + } }