X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;f=openflowplugin-impl%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fopenflowplugin%2Fimpl%2FOpenFlowPluginProviderImpl.java;h=bb522226ba7f4daafaa2aa1b21272a24f7cb6c99;hb=13e1d5e6c0237b9378d60526dd8c1d79db6d2b49;hp=b8edaff73d2b7b52cafff05774fe6074d812b504;hpb=2ff395d9032c376942570c581ee4ebbe01c41ab7;p=openflowplugin.git diff --git a/openflowplugin-impl/src/main/java/org/opendaylight/openflowplugin/impl/OpenFlowPluginProviderImpl.java b/openflowplugin-impl/src/main/java/org/opendaylight/openflowplugin/impl/OpenFlowPluginProviderImpl.java index b8edaff73d..bb522226ba 100644 --- a/openflowplugin-impl/src/main/java/org/opendaylight/openflowplugin/impl/OpenFlowPluginProviderImpl.java +++ b/openflowplugin-impl/src/main/java/org/opendaylight/openflowplugin/impl/OpenFlowPluginProviderImpl.java @@ -5,25 +5,31 @@ * terms of the Eclipse Public License v1.0 which accompanies this distribution, * and is available at http://www.eclipse.org/legal/epl-v10.html */ - package org.opendaylight.openflowplugin.impl; +import com.google.common.annotations.VisibleForTesting; import com.google.common.util.concurrent.FutureCallback; import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.ListeningExecutorService; +import com.google.common.util.concurrent.MoreExecutors; +import com.google.common.util.concurrent.SettableFuture; import io.netty.util.HashedWheelTimer; import io.netty.util.Timer; import java.lang.management.ManagementFactory; import java.util.Collection; import java.util.List; -import java.util.Objects; import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Future; import java.util.concurrent.SynchronousQueue; -import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import java.util.stream.Collectors; -import javax.annotation.Nonnull; +import javax.annotation.PostConstruct; +import javax.annotation.PreDestroy; +import javax.inject.Inject; +import javax.inject.Singleton; import javax.management.InstanceAlreadyExistsException; import javax.management.InstanceNotFoundException; import javax.management.MBeanRegistrationException; @@ -31,17 +37,25 @@ import javax.management.MBeanServer; import javax.management.MalformedObjectNameException; import javax.management.NotCompliantMBeanException; import javax.management.ObjectName; -import org.opendaylight.controller.md.sal.binding.api.DataBroker; -import org.opendaylight.controller.md.sal.binding.api.NotificationPublishService; -import org.opendaylight.controller.md.sal.common.api.clustering.EntityOwnershipService; -import org.opendaylight.controller.sal.binding.api.RpcProviderRegistry; +import org.apache.aries.blueprint.annotation.service.Reference; +import org.apache.aries.blueprint.annotation.service.Service; +import org.opendaylight.infrautils.diagstatus.ServiceState; +import org.opendaylight.infrautils.ready.SystemReadyListener; +import org.opendaylight.infrautils.ready.SystemReadyMonitor; +import org.opendaylight.mdsal.binding.api.DataBroker; +import org.opendaylight.mdsal.binding.api.NotificationPublishService; +import org.opendaylight.mdsal.binding.api.RpcProviderService; +import org.opendaylight.mdsal.eos.binding.api.EntityOwnershipService; import org.opendaylight.mdsal.singleton.common.api.ClusterSingletonServiceProvider; +import org.opendaylight.openflowjava.protocol.api.connection.OpenflowDiagStatusProvider; import org.opendaylight.openflowjava.protocol.spi.connection.SwitchConnectionProvider; +import org.opendaylight.openflowjava.protocol.spi.connection.SwitchConnectionProviderList; import org.opendaylight.openflowplugin.api.openflow.OpenFlowPluginProvider; import org.opendaylight.openflowplugin.api.openflow.configuration.ConfigurationService; import org.opendaylight.openflowplugin.api.openflow.connection.ConnectionManager; import org.opendaylight.openflowplugin.api.openflow.device.DeviceManager; import org.opendaylight.openflowplugin.api.openflow.mastership.MastershipChangeServiceManager; +import org.opendaylight.openflowplugin.api.openflow.role.RoleManager; import org.opendaylight.openflowplugin.api.openflow.rpc.RpcManager; import org.opendaylight.openflowplugin.api.openflow.statistics.StatisticsManager; import org.opendaylight.openflowplugin.api.openflow.statistics.ofpspecific.MessageIntelligenceAgency; @@ -57,12 +71,13 @@ import org.opendaylight.openflowplugin.impl.device.initialization.DeviceInitiali import org.opendaylight.openflowplugin.impl.lifecycle.ContextChainHolderImpl; import org.opendaylight.openflowplugin.impl.protocol.deserialization.DeserializerInjector; import org.opendaylight.openflowplugin.impl.protocol.serialization.SerializerInjector; +import org.opendaylight.openflowplugin.impl.role.RoleManagerImpl; import org.opendaylight.openflowplugin.impl.rpc.RpcManagerImpl; import org.opendaylight.openflowplugin.impl.statistics.StatisticsManagerImpl; import org.opendaylight.openflowplugin.impl.statistics.ofpspecific.MessageIntelligenceAgencyImpl; import org.opendaylight.openflowplugin.impl.statistics.ofpspecific.MessageIntelligenceAgencyMXBean; +import org.opendaylight.openflowplugin.impl.util.ThreadPoolLoggingExecutor; import org.opendaylight.openflowplugin.impl.util.TranslatorLibraryUtil; -import org.opendaylight.openflowplugin.openflow.md.core.ThreadPoolLoggingExecutor; import org.opendaylight.openflowplugin.openflow.md.core.extension.ExtensionConverterManagerImpl; import org.opendaylight.openflowplugin.openflow.md.core.sal.convertor.ConvertorManager; import org.opendaylight.openflowplugin.openflow.md.core.sal.convertor.ConvertorManagerFactory; @@ -71,9 +86,12 @@ import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.openflow import org.slf4j.Logger; import org.slf4j.LoggerFactory; +@Singleton +@Service(classes = { OpenFlowPluginProvider.class, OpenFlowPluginExtensionRegistratorProvider.class }) public class OpenFlowPluginProviderImpl implements OpenFlowPluginProvider, - OpenFlowPluginExtensionRegistratorProvider { + OpenFlowPluginExtensionRegistratorProvider, + SystemReadyListener { private static final Logger LOG = LoggerFactory.getLogger(OpenFlowPluginProviderImpl.class); @@ -87,14 +105,15 @@ public class OpenFlowPluginProviderImpl implements MessageIntelligenceAgencyMXBean.class.getPackage().getName(), MessageIntelligenceAgencyMXBean.class.getSimpleName()); - private final HashedWheelTimer hashedWheelTimer = new HashedWheelTimer(TICK_DURATION, TimeUnit.MILLISECONDS, TICKS_PER_WHEEL); + private final HashedWheelTimer hashedWheelTimer = + new HashedWheelTimer(TICK_DURATION, TimeUnit.MILLISECONDS, TICKS_PER_WHEEL); private final NotificationPublishService notificationPublishService; private final ExtensionConverterManager extensionConverterManager; private final DataBroker dataBroker; private final Collection switchConnectionProviders; private final DeviceInitializerProvider deviceInitializerProvider; private final ConvertorManager convertorManager; - private final RpcProviderRegistry rpcProviderRegistry; + private final RpcProviderService rpcProviderRegistry; private final ClusterSingletonServiceProvider singletonServicesProvider; private final OpenflowProviderConfig config; private final EntityOwnershipService entityOwnershipService; @@ -102,24 +121,31 @@ public class OpenFlowPluginProviderImpl implements private DeviceManager deviceManager; private RpcManager rpcManager; private StatisticsManager statisticsManager; + private RoleManager roleManager; private ConnectionManager connectionManager; - private ThreadPoolExecutor threadPool; + private ListeningExecutorService executorService; private ContextChainHolderImpl contextChainHolder; + private final OpenflowDiagStatusProvider openflowDiagStatusProvider; + private final SettableFuture fullyStarted = SettableFuture.create(); + private static final String OPENFLOW_SERVICE_NAME = "OPENFLOW"; public static MessageIntelligenceAgency getMessageIntelligenceAgency() { return MESSAGE_INTELLIGENCE_AGENCY; } - OpenFlowPluginProviderImpl(final ConfigurationService configurationService, - final List switchConnectionProviders, - final DataBroker dataBroker, - final RpcProviderRegistry rpcProviderRegistry, - final NotificationPublishService notificationPublishService, - final ClusterSingletonServiceProvider singletonServiceProvider, - final EntityOwnershipService entityOwnershipService, - final MastershipChangeServiceManager mastershipChangeServiceManager) { + @Inject + public OpenFlowPluginProviderImpl(final ConfigurationService configurationService, + final SwitchConnectionProviderList switchConnectionProviders, + final PingPongDataBroker pingPongDataBroker, + final @Reference RpcProviderService rpcProviderRegistry, + final @Reference NotificationPublishService notificationPublishService, + final @Reference ClusterSingletonServiceProvider singletonServiceProvider, + final @Reference EntityOwnershipService entityOwnershipService, + final MastershipChangeServiceManager mastershipChangeServiceManager, + final @Reference OpenflowDiagStatusProvider openflowDiagStatusProvider, + final @Reference SystemReadyMonitor systemReadyMonitor) { this.switchConnectionProviders = switchConnectionProviders; - this.dataBroker = dataBroker; + this.dataBroker = pingPongDataBroker; this.rpcProviderRegistry = rpcProviderRegistry; this.notificationPublishService = notificationPublishService; this.singletonServicesProvider = singletonServiceProvider; @@ -129,14 +155,19 @@ public class OpenFlowPluginProviderImpl implements deviceInitializerProvider = DeviceInitializerProviderFactory.createDefaultProvider(); config = new OpenFlowProviderConfigImpl(configurationService); this.mastershipChangeServiceManager = mastershipChangeServiceManager; + this.openflowDiagStatusProvider = openflowDiagStatusProvider; + systemReadyMonitor.registerListener(this); + LOG.info("registered onSystemBootReady() listener for deferred startSwitchConnections()"); } - - private void startSwitchConnections() { + @Override + public void onSystemBootReady() { + LOG.info("onSystemBootReady() received, starting the switch connections"); Futures.addCallback(Futures.allAsList(switchConnectionProviders.stream().map(switchConnectionProvider -> { // Inject OpenFlowPlugin custom serializers and deserializers into OpenFlowJava if (config.isUseSingleLayerSerialization()) { - SerializerInjector.injectSerializers(switchConnectionProvider); + SerializerInjector.injectSerializers(switchConnectionProvider, + switchConnectionProvider.getConfiguration().isGroupAddModEnabled()); DeserializerInjector.injectDeserializers(switchConnectionProvider); } else { DeserializerInjector.revertDeserializers(switchConnectionProvider); @@ -149,25 +180,35 @@ public class OpenFlowPluginProviderImpl implements @Override public void onSuccess(final List result) { LOG.info("All switchConnectionProviders are up and running ({}).", result.size()); + openflowDiagStatusProvider.reportStatus(OPENFLOW_SERVICE_NAME, ServiceState.OPERATIONAL); + fullyStarted.set(null); } @Override - public void onFailure(@Nonnull final Throwable throwable) { + public void onFailure(final Throwable throwable) { LOG.warn("Some switchConnectionProviders failed to start.", throwable); + openflowDiagStatusProvider.reportStatus(OPENFLOW_SERVICE_NAME, throwable); + fullyStarted.setException(throwable); } - }); + }, MoreExecutors.directExecutor()); + } + + @VisibleForTesting + public Future getFullyStarted() { + return fullyStarted; } private ListenableFuture> shutdownSwitchConnections() { - final ListenableFuture> listListenableFuture = Futures.allAsList(switchConnectionProviders.stream().map(switchConnectionProvider -> { - // Revert deserializers to their original state - if (config.isUseSingleLayerSerialization()) { - DeserializerInjector.revertDeserializers(switchConnectionProvider); - } + final ListenableFuture> listListenableFuture = + Futures.allAsList(switchConnectionProviders.stream().map(switchConnectionProvider -> { + // Revert deserializers to their original state + if (config.isUseSingleLayerSerialization()) { + DeserializerInjector.revertDeserializers(switchConnectionProvider); + } - // Shutdown switch connection provider - return switchConnectionProvider.shutdown(); - }).collect(Collectors.toSet())); + // Shutdown switch connection provider + return switchConnectionProvider.shutdown(); + }).collect(Collectors.toSet())); Futures.addCallback(listListenableFuture, new FutureCallback>() { @Override @@ -176,15 +217,16 @@ public class OpenFlowPluginProviderImpl implements } @Override - public void onFailure(@Nonnull final Throwable throwable) { + public void onFailure(final Throwable throwable) { LOG.warn("Some switchConnectionProviders failed to shutdown.", throwable); } - }); + }, MoreExecutors.directExecutor()); return listListenableFuture; } @Override + @PostConstruct public void initialize() { registerMXBean(MESSAGE_INTELLIGENCE_AGENCY, MESSAGE_INTELLIGENCE_AGENCY_MX_BEAN_NAME); @@ -195,11 +237,11 @@ public class OpenFlowPluginProviderImpl implements // Creates a thread pool that creates new threads as needed, but will reuse previously // constructed threads when they are available. // Threads that have not been used for x seconds are terminated and removed from the cache. - threadPool = new ThreadPoolLoggingExecutor( - config.getThreadPoolMinThreads(), - config.getThreadPoolMaxThreads().getValue(), - config.getThreadPoolTimeout(), - TimeUnit.SECONDS, new SynchronousQueue<>(), POOL_NAME); + executorService = MoreExecutors.listeningDecorator(new ThreadPoolLoggingExecutor( + config.getThreadPoolMinThreads().toJava(), + config.getThreadPoolMaxThreads().getValue().toJava(), + config.getThreadPoolTimeout().toJava(), + TimeUnit.SECONDS, new SynchronousQueue<>(), POOL_NAME)); deviceManager = new DeviceManagerImpl( config, @@ -208,7 +250,8 @@ public class OpenFlowPluginProviderImpl implements notificationPublishService, hashedWheelTimer, convertorManager, - deviceInitializerProvider); + deviceInitializerProvider, + executorService); TranslatorLibraryUtil.injectBasicTranslatorLibrary(deviceManager, convertorManager); ((ExtensionConverterProviderKeeper) deviceManager).setExtensionConverterProvider(extensionConverterManager); @@ -223,29 +266,28 @@ public class OpenFlowPluginProviderImpl implements statisticsManager = new StatisticsManagerImpl( config, rpcProviderRegistry, - hashedWheelTimer, - convertorManager); + convertorManager, + executorService); + + roleManager = new RoleManagerImpl(hashedWheelTimer, config, executorService); contextChainHolder = new ContextChainHolderImpl( - hashedWheelTimer, - threadPool, + executorService, singletonServicesProvider, entityOwnershipService, - mastershipChangeServiceManager - ); - - statisticsManager.setReconciliationFrameworkRegistrar(mastershipChangeServiceManager); + mastershipChangeServiceManager); contextChainHolder.addManager(deviceManager); contextChainHolder.addManager(statisticsManager); contextChainHolder.addManager(rpcManager); + contextChainHolder.addManager(roleManager); - connectionManager = new ConnectionManagerImpl(config, threadPool); + connectionManager = new ConnectionManagerImpl(config, executorService); connectionManager.setDeviceConnectedHandler(contextChainHolder); connectionManager.setDeviceDisconnectedHandler(contextChainHolder); + deviceManager.setContextChainHolder(contextChainHolder); deviceManager.initialize(); - startSwitchConnections(); } @Override @@ -254,55 +296,53 @@ public class OpenFlowPluginProviderImpl implements } @Override + @PreDestroy public void close() { try { shutdownSwitchConnections().get(10, TimeUnit.SECONDS); } catch (InterruptedException | ExecutionException | TimeoutException e) { - LOG.warn("Failed to shut down switch connections in time {}s, error: {}", 10, e); + LOG.warn("Failed to shut down switch connections in time {}s", 10, e); } gracefulShutdown(contextChainHolder); gracefulShutdown(deviceManager); gracefulShutdown(rpcManager); gracefulShutdown(statisticsManager); - gracefulShutdown(threadPool); + gracefulShutdown(roleManager); + gracefulShutdown(executorService); gracefulShutdown(hashedWheelTimer); unregisterMXBean(MESSAGE_INTELLIGENCE_AGENCY_MX_BEAN_NAME); + openflowDiagStatusProvider.reportStatus(ServiceState.UNREGISTERED); } + @SuppressWarnings("checkstyle:IllegalCatch") private static void gracefulShutdown(final AutoCloseable closeable) { - if (Objects.isNull(closeable)) { - return; - } - - try { - closeable.close(); - } catch (Exception e) { - LOG.warn("Failed to shutdown {} gracefully.", closeable); + if (closeable != null) { + try { + closeable.close(); + } catch (Exception e) { + LOG.warn("Failed to shutdown {} gracefully.", closeable); + } } } private static void gracefulShutdown(final Timer timer) { - if (Objects.isNull(timer)) { - return; - } - - try { - timer.stop(); - } catch (Exception e) { - LOG.warn("Failed to shutdown {} gracefully.", timer); + if (timer != null) { + try { + timer.stop(); + } catch (IllegalStateException e) { + LOG.warn("Failed to shutdown {} gracefully.", timer); + } } } - private static void gracefulShutdown(final ThreadPoolExecutor threadPoolExecutor) { - if (Objects.isNull(threadPoolExecutor)) { - return; - } - - try { - threadPoolExecutor.shutdownNow(); - } catch (Exception e) { - LOG.warn("Failed to shutdown {} gracefully.", threadPoolExecutor); + private static void gracefulShutdown(final ExecutorService executorService) { + if (executorService != null) { + try { + executorService.shutdownNow(); + } catch (SecurityException e) { + LOG.warn("Failed to shutdown {} gracefully.", executorService); + } } } @@ -315,7 +355,7 @@ public class OpenFlowPluginProviderImpl implements | NotCompliantMBeanException | MBeanRegistrationException | InstanceAlreadyExistsException e) { - LOG.warn("Error registering MBean {}", e); + LOG.warn("Error registering MBean {}", beanName, e); } } @@ -327,7 +367,7 @@ public class OpenFlowPluginProviderImpl implements } catch (InstanceNotFoundException | MBeanRegistrationException | MalformedObjectNameException e) { - LOG.warn("Error unregistering MBean {}", e); + LOG.warn("Error unregistering MBean {}", beanName, e); } } }