X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;f=openflowplugin-impl%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fopenflowplugin%2Fimpl%2FOpenFlowPluginProviderImpl.java;h=bc59a99103fc24f0de4c540df1c0f42e91412272;hb=HEAD;hp=0fd92a7c0de2f511f5b4ff65400d1871c472b769;hpb=3e68fd46f080d620be54dbf1965d0afb9433b7a0;p=openflowplugin.git diff --git a/openflowplugin-impl/src/main/java/org/opendaylight/openflowplugin/impl/OpenFlowPluginProviderImpl.java b/openflowplugin-impl/src/main/java/org/opendaylight/openflowplugin/impl/OpenFlowPluginProviderImpl.java index 0fd92a7c0d..9e81231ce7 100644 --- a/openflowplugin-impl/src/main/java/org/opendaylight/openflowplugin/impl/OpenFlowPluginProviderImpl.java +++ b/openflowplugin-impl/src/main/java/org/opendaylight/openflowplugin/impl/OpenFlowPluginProviderImpl.java @@ -7,39 +7,26 @@ */ package org.opendaylight.openflowplugin.impl; -import com.google.common.annotations.VisibleForTesting; +import static java.util.Objects.requireNonNull; + import com.google.common.util.concurrent.FutureCallback; import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; -import com.google.common.util.concurrent.ListeningExecutorService; import com.google.common.util.concurrent.MoreExecutors; -import com.google.common.util.concurrent.SettableFuture; import io.netty.util.HashedWheelTimer; import io.netty.util.Timer; -import java.lang.management.ManagementFactory; -import java.util.Collection; +import java.util.ArrayList; import java.util.List; +import java.util.Map; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; -import java.util.concurrent.Future; import java.util.concurrent.SynchronousQueue; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import java.util.stream.Collectors; -import javax.annotation.Nonnull; -import javax.annotation.PostConstruct; import javax.annotation.PreDestroy; import javax.inject.Inject; import javax.inject.Singleton; -import javax.management.InstanceAlreadyExistsException; -import javax.management.InstanceNotFoundException; -import javax.management.MBeanRegistrationException; -import javax.management.MBeanServer; -import javax.management.MalformedObjectNameException; -import javax.management.NotCompliantMBeanException; -import javax.management.ObjectName; -import org.apache.aries.blueprint.annotation.service.Reference; -import org.apache.aries.blueprint.annotation.service.Service; import org.opendaylight.infrautils.diagstatus.ServiceState; import org.opendaylight.infrautils.ready.SystemReadyListener; import org.opendaylight.infrautils.ready.SystemReadyMonitor; @@ -47,11 +34,10 @@ import org.opendaylight.mdsal.binding.api.DataBroker; import org.opendaylight.mdsal.binding.api.NotificationPublishService; import org.opendaylight.mdsal.binding.api.RpcProviderService; import org.opendaylight.mdsal.eos.binding.api.EntityOwnershipService; -import org.opendaylight.mdsal.singleton.common.api.ClusterSingletonServiceProvider; -import org.opendaylight.openflowjava.protocol.api.connection.OpenflowDiagStatusProvider; +import org.opendaylight.mdsal.singleton.api.ClusterSingletonServiceProvider; import org.opendaylight.openflowjava.protocol.spi.connection.SwitchConnectionProvider; -import org.opendaylight.openflowjava.protocol.spi.connection.SwitchConnectionProviderList; -import org.opendaylight.openflowplugin.api.openflow.OpenFlowPluginProvider; +import org.opendaylight.openflowplugin.api.openflow.FlowGroupInfoHistories; +import org.opendaylight.openflowplugin.api.openflow.FlowGroupInfoHistory; import org.opendaylight.openflowplugin.api.openflow.configuration.ConfigurationService; import org.opendaylight.openflowplugin.api.openflow.connection.ConnectionManager; import org.opendaylight.openflowplugin.api.openflow.device.DeviceManager; @@ -60,7 +46,6 @@ import org.opendaylight.openflowplugin.api.openflow.role.RoleManager; import org.opendaylight.openflowplugin.api.openflow.rpc.RpcManager; import org.opendaylight.openflowplugin.api.openflow.statistics.StatisticsManager; import org.opendaylight.openflowplugin.api.openflow.statistics.ofpspecific.MessageIntelligenceAgency; -import org.opendaylight.openflowplugin.extension.api.ExtensionConverterProviderKeeper; import org.opendaylight.openflowplugin.extension.api.ExtensionConverterRegistrator; import org.opendaylight.openflowplugin.extension.api.OpenFlowPluginExtensionRegistratorProvider; import org.opendaylight.openflowplugin.extension.api.core.extension.ExtensionConverterManager; @@ -75,161 +60,78 @@ import org.opendaylight.openflowplugin.impl.protocol.serialization.SerializerInj import org.opendaylight.openflowplugin.impl.role.RoleManagerImpl; import org.opendaylight.openflowplugin.impl.rpc.RpcManagerImpl; import org.opendaylight.openflowplugin.impl.statistics.StatisticsManagerImpl; -import org.opendaylight.openflowplugin.impl.statistics.ofpspecific.MessageIntelligenceAgencyImpl; -import org.opendaylight.openflowplugin.impl.statistics.ofpspecific.MessageIntelligenceAgencyMXBean; import org.opendaylight.openflowplugin.impl.util.ThreadPoolLoggingExecutor; import org.opendaylight.openflowplugin.impl.util.TranslatorLibraryUtil; import org.opendaylight.openflowplugin.openflow.md.core.extension.ExtensionConverterManagerImpl; import org.opendaylight.openflowplugin.openflow.md.core.sal.convertor.ConvertorManager; import org.opendaylight.openflowplugin.openflow.md.core.sal.convertor.ConvertorManagerFactory; import org.opendaylight.openflowplugin.openflow.md.core.session.OFSessionUtil; +import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeId; import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.openflow.provider.config.rev160510.OpenflowProviderConfig; +import org.osgi.service.component.annotations.Activate; +import org.osgi.service.component.annotations.Component; +import org.osgi.service.component.annotations.Deactivate; +import org.osgi.service.component.annotations.Reference; +import org.osgi.service.component.annotations.ReferenceCardinality; +import org.osgi.service.component.annotations.ReferencePolicy; +import org.osgi.service.component.annotations.ReferencePolicyOption; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @Singleton -@Service(classes = { OpenFlowPluginProvider.class, OpenFlowPluginExtensionRegistratorProvider.class }) -public class OpenFlowPluginProviderImpl implements - OpenFlowPluginProvider, - OpenFlowPluginExtensionRegistratorProvider, - SystemReadyListener { - +@Component(immediate = true, service = { + OpenFlowPluginExtensionRegistratorProvider.class, + FlowGroupInfoHistories.class +}) +public final class OpenFlowPluginProviderImpl + implements OpenFlowPluginExtensionRegistratorProvider, FlowGroupInfoHistories, SystemReadyListener, + AutoCloseable { private static final Logger LOG = LoggerFactory.getLogger(OpenFlowPluginProviderImpl.class); private static final int TICKS_PER_WHEEL = 500; // 0.5 sec. private static final long TICK_DURATION = 10; private static final String POOL_NAME = "ofppool"; - private static final MessageIntelligenceAgency MESSAGE_INTELLIGENCE_AGENCY = new MessageIntelligenceAgencyImpl(); - private static final String MESSAGE_INTELLIGENCE_AGENCY_MX_BEAN_NAME = String - .format("%s:type=%s", - MessageIntelligenceAgencyMXBean.class.getPackage().getName(), - MessageIntelligenceAgencyMXBean.class.getSimpleName()); - + // TODO: Split this out into a separate component, which requires proper timer cancellation from all users. But is + // that worth the complications? private final HashedWheelTimer hashedWheelTimer = new HashedWheelTimer(TICK_DURATION, TimeUnit.MILLISECONDS, TICKS_PER_WHEEL); - private final NotificationPublishService notificationPublishService; private final ExtensionConverterManager extensionConverterManager; - private final DataBroker dataBroker; - private final Collection switchConnectionProviders; private final DeviceInitializerProvider deviceInitializerProvider; private final ConvertorManager convertorManager; - private final RpcProviderService rpcProviderRegistry; - private final ClusterSingletonServiceProvider singletonServicesProvider; private final OpenflowProviderConfig config; - private final EntityOwnershipService entityOwnershipService; - private final MastershipChangeServiceManager mastershipChangeServiceManager; - private DeviceManager deviceManager; - private RpcManager rpcManager; - private StatisticsManager statisticsManager; - private RoleManager roleManager; + private final DeviceManager deviceManager; + private final RpcManager rpcManager; + private final StatisticsManager statisticsManager; + private final RoleManager roleManager; + private final ExecutorService executorService; + private final ContextChainHolderImpl contextChainHolder; + private final DiagStatusProvider diagStatusProvider; + + private final List connectionProviders = new ArrayList<>(); + + private List startedProviders; private ConnectionManager connectionManager; - private ListeningExecutorService executorService; - private ContextChainHolderImpl contextChainHolder; - private final OpenflowDiagStatusProvider openflowDiagStatusProvider; - private final SettableFuture fullyStarted = SettableFuture.create(); - private static final String OPENFLOW_SERVICE_NAME = "OPENFLOW"; - - public static MessageIntelligenceAgency getMessageIntelligenceAgency() { - return MESSAGE_INTELLIGENCE_AGENCY; - } + private int startingProviders; @Inject - public OpenFlowPluginProviderImpl(final ConfigurationService configurationService, - final SwitchConnectionProviderList switchConnectionProviders, - final PingPongDataBroker pingPongDataBroker, - final @Reference RpcProviderService rpcProviderRegistry, - final @Reference NotificationPublishService notificationPublishService, - final @Reference ClusterSingletonServiceProvider singletonServiceProvider, - final @Reference EntityOwnershipService entityOwnershipService, - final MastershipChangeServiceManager mastershipChangeServiceManager, - final @Reference OpenflowDiagStatusProvider openflowDiagStatusProvider, - final @Reference SystemReadyMonitor systemReadyMonitor) { - this.switchConnectionProviders = switchConnectionProviders; - this.dataBroker = pingPongDataBroker; - this.rpcProviderRegistry = rpcProviderRegistry; - this.notificationPublishService = notificationPublishService; - this.singletonServicesProvider = singletonServiceProvider; - this.entityOwnershipService = entityOwnershipService; + @Activate + public OpenFlowPluginProviderImpl(@Reference final ConfigurationService configurationService, + @Reference final DataBroker dataBroker, @Reference final RpcProviderService rpcProviderRegistry, + @Reference final NotificationPublishService notificationPublishService, + @Reference final ClusterSingletonServiceProvider singletonServiceProvider, + @Reference final EntityOwnershipService entityOwnershipService, + @Reference final MastershipChangeServiceManager mastershipChangeServiceManager, + @Reference final MessageIntelligenceAgency messageIntelligenceAgency, + @Reference final DiagStatusProvider diagStatusProvider, + @Reference final SystemReadyMonitor systemReadyMonitor) { + config = new OpenFlowProviderConfigImpl(configurationService); + final var ppdb = new PingPongDataBroker(dataBroker); + this.diagStatusProvider = requireNonNull(diagStatusProvider); + convertorManager = ConvertorManagerFactory.createDefaultManager(); extensionConverterManager = new ExtensionConverterManagerImpl(); deviceInitializerProvider = DeviceInitializerProviderFactory.createDefaultProvider(); - config = new OpenFlowProviderConfigImpl(configurationService); - this.mastershipChangeServiceManager = mastershipChangeServiceManager; - this.openflowDiagStatusProvider = openflowDiagStatusProvider; - systemReadyMonitor.registerListener(this); - LOG.info("registered onSystemBootReady() listener for deferred startSwitchConnections()"); - } - - @Override - public void onSystemBootReady() { - LOG.info("onSystemBootReady() received, starting the switch connections"); - Futures.addCallback(Futures.allAsList(switchConnectionProviders.stream().map(switchConnectionProvider -> { - // Inject OpenFlowPlugin custom serializers and deserializers into OpenFlowJava - if (config.isUseSingleLayerSerialization()) { - SerializerInjector.injectSerializers(switchConnectionProvider, - switchConnectionProvider.getConfiguration().isGroupAddModEnabled()); - DeserializerInjector.injectDeserializers(switchConnectionProvider); - } else { - DeserializerInjector.revertDeserializers(switchConnectionProvider); - } - - // Set handler of incoming connections and start switch connection provider - switchConnectionProvider.setSwitchConnectionHandler(connectionManager); - return switchConnectionProvider.startup(); - }).collect(Collectors.toSet())), new FutureCallback>() { - @Override - public void onSuccess(@Nonnull final List result) { - LOG.info("All switchConnectionProviders are up and running ({}).", result.size()); - openflowDiagStatusProvider.reportStatus(OPENFLOW_SERVICE_NAME, ServiceState.OPERATIONAL); - fullyStarted.set(null); - } - - @Override - public void onFailure(@Nonnull final Throwable throwable) { - LOG.warn("Some switchConnectionProviders failed to start.", throwable); - openflowDiagStatusProvider.reportStatus(OPENFLOW_SERVICE_NAME, throwable); - fullyStarted.setException(throwable); - } - }, MoreExecutors.directExecutor()); - } - - @VisibleForTesting - public Future getFullyStarted() { - return fullyStarted; - } - - private ListenableFuture> shutdownSwitchConnections() { - final ListenableFuture> listListenableFuture = - Futures.allAsList(switchConnectionProviders.stream().map(switchConnectionProvider -> { - // Revert deserializers to their original state - if (config.isUseSingleLayerSerialization()) { - DeserializerInjector.revertDeserializers(switchConnectionProvider); - } - - // Shutdown switch connection provider - return switchConnectionProvider.shutdown(); - }).collect(Collectors.toSet())); - - Futures.addCallback(listListenableFuture, new FutureCallback>() { - @Override - public void onSuccess(@Nonnull final List result) { - LOG.info("All switchConnectionProviders were successfully shut down ({}).", result.size()); - } - - @Override - public void onFailure(@Nonnull final Throwable throwable) { - LOG.warn("Some switchConnectionProviders failed to shutdown.", throwable); - } - }, MoreExecutors.directExecutor()); - - return listListenableFuture; - } - - @Override - @PostConstruct - public void initialize() { - registerMXBean(MESSAGE_INTELLIGENCE_AGENCY, MESSAGE_INTELLIGENCE_AGENCY_MX_BEAN_NAME); // TODO: copied from OpenFlowPluginProvider (Helium) misusesing the old way of distributing extension converters // TODO: rewrite later! @@ -238,23 +140,25 @@ public class OpenFlowPluginProviderImpl implements // Creates a thread pool that creates new threads as needed, but will reuse previously // constructed threads when they are available. // Threads that have not been used for x seconds are terminated and removed from the cache. - executorService = MoreExecutors.listeningDecorator(new ThreadPoolLoggingExecutor( - config.getThreadPoolMinThreads(), - config.getThreadPoolMaxThreads().getValue(), - config.getThreadPoolTimeout(), - TimeUnit.SECONDS, new SynchronousQueue<>(), POOL_NAME)); + executorService = new ThreadPoolLoggingExecutor( + config.getThreadPoolMinThreads().toJava(), + config.getThreadPoolMaxThreads().getValue().toJava(), + config.getThreadPoolTimeout().toJava(), + TimeUnit.SECONDS, new SynchronousQueue<>(), POOL_NAME); - deviceManager = new DeviceManagerImpl( + final var devMgr = new DeviceManagerImpl( config, - dataBroker, - getMessageIntelligenceAgency(), + ppdb, + messageIntelligenceAgency, notificationPublishService, hashedWheelTimer, convertorManager, - deviceInitializerProvider); + deviceInitializerProvider, + executorService); + deviceManager = devMgr; TranslatorLibraryUtil.injectBasicTranslatorLibrary(deviceManager, convertorManager); - ((ExtensionConverterProviderKeeper) deviceManager).setExtensionConverterProvider(extensionConverterManager); + devMgr.setExtensionConverterProvider(extensionConverterManager); rpcManager = new RpcManagerImpl( config, @@ -269,25 +173,128 @@ public class OpenFlowPluginProviderImpl implements convertorManager, executorService); - roleManager = new RoleManagerImpl(hashedWheelTimer, config); + roleManager = new RoleManagerImpl(hashedWheelTimer, config, executorService); contextChainHolder = new ContextChainHolderImpl( executorService, - singletonServicesProvider, + singletonServiceProvider, entityOwnershipService, - mastershipChangeServiceManager); + mastershipChangeServiceManager, + config); contextChainHolder.addManager(deviceManager); contextChainHolder.addManager(statisticsManager); contextChainHolder.addManager(rpcManager); contextChainHolder.addManager(roleManager); - connectionManager = new ConnectionManagerImpl(config, executorService); + connectionManager = new ConnectionManagerImpl(config, executorService, ppdb, notificationPublishService); connectionManager.setDeviceConnectedHandler(contextChainHolder); connectionManager.setDeviceDisconnectedHandler(contextChainHolder); deviceManager.setContextChainHolder(contextChainHolder); deviceManager.initialize(); + systemReadyMonitor.registerListener(this); + LOG.info("OpenFlowPluginProvider started, waiting for onSystemBootReady()"); + } + + @Reference(cardinality = ReferenceCardinality.AT_LEAST_ONE, + policy = ReferencePolicy.DYNAMIC, policyOption = ReferencePolicyOption.GREEDY) + public synchronized void bindConnectionProvider(final SwitchConnectionProvider switchConnectionProvider) { + connectionProviders.add(switchConnectionProvider); + LOG.info("Added connection provider {}", switchConnectionProvider); + + if (startedProviders != null) { + LOG.info("Starting latecomer connection provider {}", switchConnectionProvider); + startingProviders += 1; + startProvider(switchConnectionProvider); + } + } + + public synchronized void unbindConnectionProvider(final SwitchConnectionProvider switchConnectionProvider) { + connectionProviders.remove(switchConnectionProvider); + if (startedProviders != null && startedProviders.remove(switchConnectionProvider)) { + switchConnectionProvider.shutdown(); + } + LOG.info("Removed connection provider {}", switchConnectionProvider); + } + + private ListenableFuture startProvider(final SwitchConnectionProvider provider) { + // Inject OpenFlowPlugin custom serializers and deserializers into OpenFlowJava + if (config.getUseSingleLayerSerialization()) { + SerializerInjector.injectSerializers(provider, provider.getConfiguration().isGroupAddModEnabled()); + DeserializerInjector.injectDeserializers(provider); + } else { + DeserializerInjector.revertDeserializers(provider); + } + + // Set handler of incoming connections and start switch connection provider + final var future = provider.startup(connectionManager); + startedProviders.add(provider); + Futures.addCallback(future, new FutureCallback<>() { + @Override + public void onSuccess(final Void result) { + LOG.info("Connection provider {} started", provider); + connectionStarted(); + } + + @Override + public void onFailure(final Throwable cause) { + LOG.warn("Connection provider {} failed to start", provider, cause); + connectionFailed(cause); + } + }, MoreExecutors.directExecutor()); + return future; + } + + @Override + public synchronized void onSystemBootReady() { + LOG.info("onSystemBootReady() received, starting the switch connections"); + + final var size = connectionProviders.size(); + startedProviders = new ArrayList<>(size); + startingProviders = size; + connectionProviders.forEach(this::startProvider); + } + + private synchronized void connectionFailed(final Throwable cause) { + // Decrement below zero, so we do not arrive to zero + startingProviders = -1; + diagStatusProvider.reportStatus(ServiceState.ERROR, cause); + } + + private synchronized void connectionStarted() { + if (--startingProviders == 0 && startedProviders.equals(connectionProviders)) { + LOG.info("All switchConnectionProviders are up and running ({}).", startedProviders.size()); + diagStatusProvider.reportStatus(ServiceState.OPERATIONAL); + } + } + + private ListenableFuture> shutdownSwitchConnections() { + final var future = Futures.allAsList(startedProviders.stream() + .map(switchConnectionProvider -> { + // Revert deserializers to their original state + if (config.getUseSingleLayerSerialization()) { + DeserializerInjector.revertDeserializers(switchConnectionProvider); + } + + // Shutdown switch connection provider + return switchConnectionProvider.shutdown(); + }).collect(Collectors.toList())); + startedProviders.clear(); + + Futures.addCallback(future, new FutureCallback<>() { + @Override + public void onSuccess(final List result) { + LOG.info("All switchConnectionProviders were successfully shut down ({}).", result.size()); + } + + @Override + public void onFailure(final Throwable throwable) { + LOG.warn("Some switchConnectionProviders failed to shutdown.", throwable); + } + }, MoreExecutors.directExecutor()); + + return future; } @Override @@ -295,9 +302,22 @@ public class OpenFlowPluginProviderImpl implements return extensionConverterManager; } + @Override + public Map getAllFlowGroupHistories() { + return deviceManager.getAllFlowGroupHistories(); + } + + @Override + public FlowGroupInfoHistory getFlowGroupHistory(final NodeId nodeId) { + return deviceManager.getFlowGroupHistory(nodeId); + } + @Override @PreDestroy - public void close() { + @Deactivate + @SuppressWarnings("checkstyle:IllegalCatch") + public synchronized void close() { + LOG.info("OpenFlowPluginProvider stopping"); try { shutdownSwitchConnections().get(10, TimeUnit.SECONDS); } catch (InterruptedException | ExecutionException | TimeoutException e) { @@ -305,14 +325,23 @@ public class OpenFlowPluginProviderImpl implements } gracefulShutdown(contextChainHolder); + gracefulShutdown(connectionManager); gracefulShutdown(deviceManager); gracefulShutdown(rpcManager); gracefulShutdown(statisticsManager); gracefulShutdown(roleManager); gracefulShutdown(executorService); gracefulShutdown(hashedWheelTimer); - unregisterMXBean(MESSAGE_INTELLIGENCE_AGENCY_MX_BEAN_NAME); - openflowDiagStatusProvider.reportStatus(ServiceState.UNREGISTERED); + diagStatusProvider.reportStatus(ServiceState.UNREGISTERED); + try { + if (connectionManager != null) { + connectionManager.close(); + connectionManager = null; + } + } catch (Exception e) { + LOG.error("Failed to close ConnectionManager", e); + } + LOG.info("OpenFlowPluginProvider stopped"); } @SuppressWarnings("checkstyle:IllegalCatch") @@ -345,29 +374,4 @@ public class OpenFlowPluginProviderImpl implements } } } - - private static void registerMXBean(final Object bean, final String beanName) { - final MBeanServer mbs = ManagementFactory.getPlatformMBeanServer(); - - try { - mbs.registerMBean(bean, new ObjectName(beanName)); - } catch (MalformedObjectNameException - | NotCompliantMBeanException - | MBeanRegistrationException - | InstanceAlreadyExistsException e) { - LOG.warn("Error registering MBean {}", e); - } - } - - private static void unregisterMXBean(final String beanName) { - final MBeanServer mbs = ManagementFactory.getPlatformMBeanServer(); - - try { - mbs.unregisterMBean(new ObjectName(beanName)); - } catch (InstanceNotFoundException - | MBeanRegistrationException - | MalformedObjectNameException e) { - LOG.warn("Error unregistering MBean {}", e); - } - } }