X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;f=openflowplugin-impl%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fopenflowplugin%2Fimpl%2FOpenFlowPluginProviderImpl.java;h=034245b55e336149c30b7499b886ce355c9bc34c;hb=HEAD;hp=c919099948de5c035f3686c5da1c5a53d6a13e63;hpb=814a0363071527d4c832abb798561f9656ec0d60;p=openflowplugin.git diff --git a/openflowplugin-impl/src/main/java/org/opendaylight/openflowplugin/impl/OpenFlowPluginProviderImpl.java b/openflowplugin-impl/src/main/java/org/opendaylight/openflowplugin/impl/OpenFlowPluginProviderImpl.java index c919099948..9e81231ce7 100644 --- a/openflowplugin-impl/src/main/java/org/opendaylight/openflowplugin/impl/OpenFlowPluginProviderImpl.java +++ b/openflowplugin-impl/src/main/java/org/opendaylight/openflowplugin/impl/OpenFlowPluginProviderImpl.java @@ -5,49 +5,51 @@ * terms of the Eclipse Public License v1.0 which accompanies this distribution, * and is available at http://www.eclipse.org/legal/epl-v10.html */ - package org.opendaylight.openflowplugin.impl; -import com.google.common.base.Preconditions; +import static java.util.Objects.requireNonNull; + import com.google.common.util.concurrent.FutureCallback; import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.MoreExecutors; import io.netty.util.HashedWheelTimer; -import java.lang.management.ManagementFactory; -import java.util.Collection; +import io.netty.util.Timer; +import java.util.ArrayList; import java.util.List; import java.util.Map; -import java.util.Objects; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; import java.util.concurrent.SynchronousQueue; -import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; -import java.util.function.Consumer; +import java.util.concurrent.TimeoutException; import java.util.stream.Collectors; -import javax.annotation.Nonnull; -import javax.management.InstanceAlreadyExistsException; -import javax.management.MBeanRegistrationException; -import javax.management.MBeanServer; -import javax.management.MalformedObjectNameException; -import javax.management.NotCompliantMBeanException; -import javax.management.ObjectName; -import org.opendaylight.controller.md.sal.binding.api.DataBroker; -import org.opendaylight.controller.md.sal.binding.api.NotificationPublishService; -import org.opendaylight.controller.md.sal.binding.api.NotificationService; -import org.opendaylight.controller.md.sal.common.api.clustering.EntityOwnershipService; -import org.opendaylight.controller.sal.binding.api.RpcProviderRegistry; -import org.opendaylight.mdsal.singleton.common.api.ClusterSingletonServiceProvider; +import javax.annotation.PreDestroy; +import javax.inject.Inject; +import javax.inject.Singleton; +import org.opendaylight.infrautils.diagstatus.ServiceState; +import org.opendaylight.infrautils.ready.SystemReadyListener; +import org.opendaylight.infrautils.ready.SystemReadyMonitor; +import org.opendaylight.mdsal.binding.api.DataBroker; +import org.opendaylight.mdsal.binding.api.NotificationPublishService; +import org.opendaylight.mdsal.binding.api.RpcProviderService; +import org.opendaylight.mdsal.eos.binding.api.EntityOwnershipService; +import org.opendaylight.mdsal.singleton.api.ClusterSingletonServiceProvider; import org.opendaylight.openflowjava.protocol.spi.connection.SwitchConnectionProvider; -import org.opendaylight.openflowplugin.api.openflow.OpenFlowPluginConfigurationService; -import org.opendaylight.openflowplugin.api.openflow.OpenFlowPluginProvider; +import org.opendaylight.openflowplugin.api.openflow.FlowGroupInfoHistories; +import org.opendaylight.openflowplugin.api.openflow.FlowGroupInfoHistory; +import org.opendaylight.openflowplugin.api.openflow.configuration.ConfigurationService; import org.opendaylight.openflowplugin.api.openflow.connection.ConnectionManager; import org.opendaylight.openflowplugin.api.openflow.device.DeviceManager; -import org.opendaylight.openflowplugin.api.openflow.lifecycle.ContextChainHolder; +import org.opendaylight.openflowplugin.api.openflow.mastership.MastershipChangeServiceManager; +import org.opendaylight.openflowplugin.api.openflow.role.RoleManager; import org.opendaylight.openflowplugin.api.openflow.rpc.RpcManager; import org.opendaylight.openflowplugin.api.openflow.statistics.StatisticsManager; import org.opendaylight.openflowplugin.api.openflow.statistics.ofpspecific.MessageIntelligenceAgency; -import org.opendaylight.openflowplugin.extension.api.ExtensionConverterProviderKeeper; import org.opendaylight.openflowplugin.extension.api.ExtensionConverterRegistrator; import org.opendaylight.openflowplugin.extension.api.OpenFlowPluginExtensionRegistratorProvider; import org.opendaylight.openflowplugin.extension.api.core.extension.ExtensionConverterManager; +import org.opendaylight.openflowplugin.impl.configuration.OpenFlowProviderConfigImpl; import org.opendaylight.openflowplugin.impl.connection.ConnectionManagerImpl; import org.opendaylight.openflowplugin.impl.device.DeviceManagerImpl; import org.opendaylight.openflowplugin.impl.device.initialization.DeviceInitializerProvider; @@ -55,118 +57,81 @@ import org.opendaylight.openflowplugin.impl.device.initialization.DeviceInitiali import org.opendaylight.openflowplugin.impl.lifecycle.ContextChainHolderImpl; import org.opendaylight.openflowplugin.impl.protocol.deserialization.DeserializerInjector; import org.opendaylight.openflowplugin.impl.protocol.serialization.SerializerInjector; +import org.opendaylight.openflowplugin.impl.role.RoleManagerImpl; import org.opendaylight.openflowplugin.impl.rpc.RpcManagerImpl; import org.opendaylight.openflowplugin.impl.statistics.StatisticsManagerImpl; -import org.opendaylight.openflowplugin.impl.statistics.ofpspecific.MessageIntelligenceAgencyImpl; -import org.opendaylight.openflowplugin.impl.statistics.ofpspecific.MessageIntelligenceAgencyMXBean; +import org.opendaylight.openflowplugin.impl.util.ThreadPoolLoggingExecutor; import org.opendaylight.openflowplugin.impl.util.TranslatorLibraryUtil; -import org.opendaylight.openflowplugin.openflow.md.core.ThreadPoolLoggingExecutor; import org.opendaylight.openflowplugin.openflow.md.core.extension.ExtensionConverterManagerImpl; import org.opendaylight.openflowplugin.openflow.md.core.sal.convertor.ConvertorManager; import org.opendaylight.openflowplugin.openflow.md.core.sal.convertor.ConvertorManagerFactory; import org.opendaylight.openflowplugin.openflow.md.core.session.OFSessionUtil; +import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeId; +import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.openflow.provider.config.rev160510.OpenflowProviderConfig; +import org.osgi.service.component.annotations.Activate; +import org.osgi.service.component.annotations.Component; +import org.osgi.service.component.annotations.Deactivate; +import org.osgi.service.component.annotations.Reference; +import org.osgi.service.component.annotations.ReferenceCardinality; +import org.osgi.service.component.annotations.ReferencePolicy; +import org.osgi.service.component.annotations.ReferencePolicyOption; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -public class OpenFlowPluginProviderImpl implements OpenFlowPluginProvider, OpenFlowPluginConfigurationService, OpenFlowPluginExtensionRegistratorProvider { - +@Singleton +@Component(immediate = true, service = { + OpenFlowPluginExtensionRegistratorProvider.class, + FlowGroupInfoHistories.class +}) +public final class OpenFlowPluginProviderImpl + implements OpenFlowPluginExtensionRegistratorProvider, FlowGroupInfoHistories, SystemReadyListener, + AutoCloseable { private static final Logger LOG = LoggerFactory.getLogger(OpenFlowPluginProviderImpl.class); - private static final MessageIntelligenceAgency messageIntelligenceAgency = new MessageIntelligenceAgencyImpl(); + private static final int TICKS_PER_WHEEL = 500; // 0.5 sec. private static final long TICK_DURATION = 10; private static final String POOL_NAME = "ofppool"; - private final HashedWheelTimer hashedWheelTimer = new HashedWheelTimer(TICK_DURATION, TimeUnit.MILLISECONDS, TICKS_PER_WHEEL); - private final NotificationService notificationProviderService; - private final NotificationPublishService notificationPublishService; + // TODO: Split this out into a separate component, which requires proper timer cancellation from all users. But is + // that worth the complications? + private final HashedWheelTimer hashedWheelTimer = + new HashedWheelTimer(TICK_DURATION, TimeUnit.MILLISECONDS, TICKS_PER_WHEEL); private final ExtensionConverterManager extensionConverterManager; - private final DataBroker dataBroker; - private final Collection switchConnectionProviders; private final DeviceInitializerProvider deviceInitializerProvider; private final ConvertorManager convertorManager; - private final ContextChainHolder contextChainHolder; - private int rpcRequestsQuota; - private long globalNotificationQuota; - private long barrierInterval; - private int barrierCountLimit; - private long echoReplyTimeout; - private DeviceManager deviceManager; - private RpcManager rpcManager; - private RpcProviderRegistry rpcProviderRegistry; - private StatisticsManager statisticsManager; + private final OpenflowProviderConfig config; + private final DeviceManager deviceManager; + private final RpcManager rpcManager; + private final StatisticsManager statisticsManager; + private final RoleManager roleManager; + private final ExecutorService executorService; + private final ContextChainHolderImpl contextChainHolder; + private final DiagStatusProvider diagStatusProvider; + + private final List connectionProviders = new ArrayList<>(); + + private List startedProviders; private ConnectionManager connectionManager; - private boolean switchFeaturesMandatory; - private boolean isStatisticsPollingOn; - private boolean isStatisticsRpcEnabled; - private boolean isFlowRemovedNotificationOn; - private boolean skipTableFeatures; - private long basicTimerDelay; - private long maximumTimerDelay; - private boolean useSingleLayerSerialization; - private ThreadPoolExecutor threadPool; - private ClusterSingletonServiceProvider singletonServicesProvider; - private int threadPoolMinThreads; - private int threadPoolMaxThreads; - private long threadPoolTimeout; - private boolean initialized = false; - - public static MessageIntelligenceAgency getMessageIntelligenceAgency() { - return messageIntelligenceAgency; - } + private int startingProviders; + + @Inject + @Activate + public OpenFlowPluginProviderImpl(@Reference final ConfigurationService configurationService, + @Reference final DataBroker dataBroker, @Reference final RpcProviderService rpcProviderRegistry, + @Reference final NotificationPublishService notificationPublishService, + @Reference final ClusterSingletonServiceProvider singletonServiceProvider, + @Reference final EntityOwnershipService entityOwnershipService, + @Reference final MastershipChangeServiceManager mastershipChangeServiceManager, + @Reference final MessageIntelligenceAgency messageIntelligenceAgency, + @Reference final DiagStatusProvider diagStatusProvider, + @Reference final SystemReadyMonitor systemReadyMonitor) { + config = new OpenFlowProviderConfigImpl(configurationService); + final var ppdb = new PingPongDataBroker(dataBroker); + this.diagStatusProvider = requireNonNull(diagStatusProvider); - public OpenFlowPluginProviderImpl(final List switchConnectionProviders, - final DataBroker dataBroker, - final RpcProviderRegistry rpcProviderRegistry, - final NotificationService notificationProviderService, - final NotificationPublishService notificationPublishService, - final ClusterSingletonServiceProvider singletonServiceProvider, - final EntityOwnershipService entityOwnershipService) { - this.switchConnectionProviders = switchConnectionProviders; - this.dataBroker = dataBroker; - this.rpcProviderRegistry = rpcProviderRegistry; - this.notificationProviderService = notificationProviderService; - this.notificationPublishService = notificationPublishService; - this.singletonServicesProvider = singletonServiceProvider; convertorManager = ConvertorManagerFactory.createDefaultManager(); - contextChainHolder = new ContextChainHolderImpl(hashedWheelTimer); - contextChainHolder.changeEntityOwnershipService(entityOwnershipService); extensionConverterManager = new ExtensionConverterManagerImpl(); deviceInitializerProvider = DeviceInitializerProviderFactory.createDefaultProvider(); - } - - - private void startSwitchConnections() { - Futures.addCallback(Futures.allAsList(switchConnectionProviders.stream().map(switchConnectionProvider -> { - // Inject OpenflowPlugin custom serializers and deserializers into OpenflowJava - if (useSingleLayerSerialization) { - SerializerInjector.injectSerializers(switchConnectionProvider); - DeserializerInjector.injectDeserializers(switchConnectionProvider); - } else { - DeserializerInjector.revertDeserializers(switchConnectionProvider); - } - - // Set handler of incoming connections and start switch connection provider - switchConnectionProvider.setSwitchConnectionHandler(connectionManager); - return switchConnectionProvider.startup(); - }).collect(Collectors.toSet())), new FutureCallback>() { - @Override - public void onSuccess(final List result) { - LOG.info("All switchConnectionProviders are up and running ({}).", result.size()); - } - - @Override - public void onFailure(@Nonnull final Throwable t) { - LOG.warn("Some switchConnectionProviders failed to start.", t); - } - }); - } - - @Override - public void initialize() { - Preconditions.checkNotNull(dataBroker, "missing data broker"); - Preconditions.checkNotNull(rpcProviderRegistry, "missing RPC provider registry"); - Preconditions.checkNotNull(notificationProviderService, "missing notification provider service"); - Preconditions.checkNotNull(singletonServicesProvider, "missing singleton services provider"); // TODO: copied from OpenFlowPluginProvider (Helium) misusesing the old way of distributing extension converters // TODO: rewrite later! @@ -175,327 +140,238 @@ public class OpenFlowPluginProviderImpl implements OpenFlowPluginProvider, OpenF // Creates a thread pool that creates new threads as needed, but will reuse previously // constructed threads when they are available. // Threads that have not been used for x seconds are terminated and removed from the cache. - threadPool = new ThreadPoolLoggingExecutor( - Preconditions.checkNotNull(threadPoolMinThreads), - Preconditions.checkNotNull(threadPoolMaxThreads), - Preconditions.checkNotNull(threadPoolTimeout), + executorService = new ThreadPoolLoggingExecutor( + config.getThreadPoolMinThreads().toJava(), + config.getThreadPoolMaxThreads().getValue().toJava(), + config.getThreadPoolTimeout().toJava(), TimeUnit.SECONDS, new SynchronousQueue<>(), POOL_NAME); - connectionManager = new ConnectionManagerImpl(threadPool); - connectionManager.setEchoReplyTimeout(echoReplyTimeout); - - registerMXBean(messageIntelligenceAgency); - - contextChainHolder.addSingletonServicesProvider(singletonServicesProvider); - - deviceManager = new DeviceManagerImpl( - dataBroker, - getMessageIntelligenceAgency(), + final var devMgr = new DeviceManagerImpl( + config, + ppdb, + messageIntelligenceAgency, notificationPublishService, hashedWheelTimer, convertorManager, deviceInitializerProvider, - useSingleLayerSerialization); + executorService); + deviceManager = devMgr; - deviceManager.setGlobalNotificationQuota(globalNotificationQuota); - deviceManager.setSwitchFeaturesMandatory(switchFeaturesMandatory); - deviceManager.setBarrierInterval(barrierInterval); - deviceManager.setBarrierCountLimit(barrierCountLimit); - deviceManager.setFlowRemovedNotificationOn(isFlowRemovedNotificationOn); - deviceManager.setSkipTableFeatures(skipTableFeatures); - - ((ExtensionConverterProviderKeeper) deviceManager).setExtensionConverterProvider(extensionConverterManager); - - rpcManager = new RpcManagerImpl(rpcProviderRegistry, extensionConverterManager, convertorManager, notificationPublishService); - rpcManager.setRpcRequestQuota(rpcRequestsQuota); - - statisticsManager = new StatisticsManagerImpl(rpcProviderRegistry, hashedWheelTimer, convertorManager); - statisticsManager.setBasicTimerDelay(basicTimerDelay); - statisticsManager.setMaximumTimerDelay(maximumTimerDelay); - statisticsManager.setIsStatisticsPollingOn(isStatisticsPollingOn); + TranslatorLibraryUtil.injectBasicTranslatorLibrary(deviceManager, convertorManager); + devMgr.setExtensionConverterProvider(extensionConverterManager); - // Device connection handler moved from device manager to context holder - connectionManager.setDeviceConnectedHandler(contextChainHolder); + rpcManager = new RpcManagerImpl( + config, + rpcProviderRegistry, + extensionConverterManager, + convertorManager, + notificationPublishService); - /* Termination Phase ordering - OFP Device Context suite */ - connectionManager.setDeviceDisconnectedHandler(contextChainHolder); + statisticsManager = new StatisticsManagerImpl( + config, + rpcProviderRegistry, + convertorManager, + executorService); - rpcManager.setStatisticsRpcEnabled(isStatisticsRpcEnabled); + roleManager = new RoleManagerImpl(hashedWheelTimer, config, executorService); - TranslatorLibraryUtil.injectBasicTranslatorLibrary(deviceManager, convertorManager); - deviceManager.initialize(); + contextChainHolder = new ContextChainHolderImpl( + executorService, + singletonServiceProvider, + entityOwnershipService, + mastershipChangeServiceManager, + config); contextChainHolder.addManager(deviceManager); contextChainHolder.addManager(statisticsManager); contextChainHolder.addManager(rpcManager); + contextChainHolder.addManager(roleManager); - startSwitchConnections(); - initialized = true; + connectionManager = new ConnectionManagerImpl(config, executorService, ppdb, notificationPublishService); + connectionManager.setDeviceConnectedHandler(contextChainHolder); + connectionManager.setDeviceDisconnectedHandler(contextChainHolder); + + deviceManager.setContextChainHolder(contextChainHolder); + deviceManager.initialize(); + systemReadyMonitor.registerListener(this); + LOG.info("OpenFlowPluginProvider started, waiting for onSystemBootReady()"); } + @Reference(cardinality = ReferenceCardinality.AT_LEAST_ONE, + policy = ReferencePolicy.DYNAMIC, policyOption = ReferencePolicyOption.GREEDY) + public synchronized void bindConnectionProvider(final SwitchConnectionProvider switchConnectionProvider) { + connectionProviders.add(switchConnectionProvider); + LOG.info("Added connection provider {}", switchConnectionProvider); - @Override - public void update(@Nonnull final Map properties) { - properties.forEach((key, value) -> { - final PropertyType propertyType = PropertyType.forValue(key); - - if (Objects.nonNull(propertyType)) { - updateProperty(propertyType, value); - } else if (!key.equals("service.pid") && !key.equals("felix.fileinstall.filename")) { - LOG.warn("Unsupported configuration property '{}={}'", key, value); - } - }); + if (startedProviders != null) { + LOG.info("Starting latecomer connection provider {}", switchConnectionProvider); + startingProviders += 1; + startProvider(switchConnectionProvider); + } } - private void doPropertyUpdate(final PropertyType propertyType, - final boolean modifiable, - final Object origValue, - final Object newValue, - final Consumer successCallback) { - if (initialized) { - if (Objects.equals(origValue, newValue)) { - LOG.debug("{} config parameter is already set to {})", propertyType, origValue); - return; - } else if (!modifiable) { - LOG.warn("{} update ({} -> {}) is not allowed after controller start", propertyType, origValue, newValue); - return; - } + public synchronized void unbindConnectionProvider(final SwitchConnectionProvider switchConnectionProvider) { + connectionProviders.remove(switchConnectionProvider); + if (startedProviders != null && startedProviders.remove(switchConnectionProvider)) { + switchConnectionProvider.shutdown(); + } + LOG.info("Removed connection provider {}", switchConnectionProvider); + } + + private ListenableFuture startProvider(final SwitchConnectionProvider provider) { + // Inject OpenFlowPlugin custom serializers and deserializers into OpenFlowJava + if (config.getUseSingleLayerSerialization()) { + SerializerInjector.injectSerializers(provider, provider.getConfiguration().isGroupAddModEnabled()); + DeserializerInjector.injectDeserializers(provider); + } else { + DeserializerInjector.revertDeserializers(provider); } - successCallback.accept(newValue); - LOG.info("{} config parameter is updated ({} -> {})", propertyType, origValue, newValue); + // Set handler of incoming connections and start switch connection provider + final var future = provider.startup(connectionManager); + startedProviders.add(provider); + Futures.addCallback(future, new FutureCallback<>() { + @Override + public void onSuccess(final Void result) { + LOG.info("Connection provider {} started", provider); + connectionStarted(); + } + + @Override + public void onFailure(final Throwable cause) { + LOG.warn("Connection provider {} failed to start", provider, cause); + connectionFailed(cause); + } + }, MoreExecutors.directExecutor()); + return future; } @Override - public void updateProperty(@Nonnull final PropertyType key, @Nonnull final Object value) { - try { - final String sValue = value.toString(); - final Consumer successCallback; - final boolean modifiable; - final Object oldValue; - final Object newValue; - - switch (key) { - case RPC_REQUESTS_QUOTA: - successCallback = (result) -> { - rpcRequestsQuota = (int) result; - - if (initialized) { - rpcManager.setRpcRequestQuota(rpcRequestsQuota); - } - }; - - oldValue = rpcRequestsQuota; - newValue = Integer.valueOf(sValue); - modifiable = true; - break; - case SWITCH_FEATURES_MANDATORY: - successCallback = (result) -> { - switchFeaturesMandatory = (boolean) result; - - if (initialized) { - deviceManager.setSwitchFeaturesMandatory(switchFeaturesMandatory); - } - }; - - oldValue = switchFeaturesMandatory; - newValue = Boolean.valueOf(sValue); - modifiable = true; - break; - case GLOBAL_NOTIFICATION_QUOTA: - successCallback = (result) -> { - globalNotificationQuota = (long) result; - - if (initialized) { - deviceManager.setGlobalNotificationQuota(globalNotificationQuota); - } - }; - - oldValue = globalNotificationQuota; - newValue = Long.valueOf(sValue); - modifiable = true; - break; - case IS_STATISTICS_POLLING_ON: - successCallback = (result) -> { - isStatisticsPollingOn = (boolean) result; - - if (initialized) { - statisticsManager.setIsStatisticsPollingOn(isStatisticsPollingOn); - } - }; - - oldValue = isStatisticsPollingOn; - newValue = Boolean.valueOf(sValue); - modifiable = true; - break; - case IS_STATISTICS_RPC_ENABLED: - successCallback = (result) -> { - isStatisticsRpcEnabled = (boolean) result; - - if (initialized) { - rpcManager.setStatisticsRpcEnabled(isStatisticsRpcEnabled); - } - }; - - oldValue = isStatisticsRpcEnabled; - newValue = Boolean.valueOf(sValue); - modifiable = true; - break; - case BARRIER_INTERVAL_TIMEOUT_LIMIT: - successCallback = (result) -> { - barrierInterval = (long) result; - - if (initialized) { - deviceManager.setBarrierInterval(barrierInterval); - } - }; - - oldValue = barrierInterval; - newValue = Long.valueOf(sValue); - modifiable = true; - break; - case BARRIER_COUNT_LIMIT: - successCallback = (result) -> { - barrierCountLimit = (int) result; - - if (initialized) { - deviceManager.setBarrierCountLimit(barrierCountLimit); - } - }; - - oldValue = barrierCountLimit; - newValue = Integer.valueOf(sValue); - modifiable = true; - break; - case ECHO_REPLY_TIMEOUT: - successCallback = (result) -> { - echoReplyTimeout = (long) result; - - if (initialized) { - connectionManager.setEchoReplyTimeout(echoReplyTimeout); - } - }; - - oldValue = echoReplyTimeout; - newValue = Long.valueOf(sValue); - modifiable = true; - break; - case THREAD_POOL_MIN_THREADS: - successCallback = (result) -> threadPoolMinThreads = (int) result; - oldValue = threadPoolMinThreads; - newValue = Integer.valueOf(sValue); - modifiable = false; - break; - case THREAD_POOL_MAX_THREADS: - successCallback = (result) -> threadPoolMaxThreads = (int) result; - oldValue = threadPoolMaxThreads; - newValue = Integer.valueOf(sValue); - modifiable = false; - break; - case THREAD_POOL_TIMEOUT: - successCallback = (result) -> threadPoolTimeout = (long) result; - oldValue = threadPoolTimeout; - newValue = Long.valueOf(sValue); - modifiable = false; - break; - case ENABLE_FLOW_REMOVED_NOTIFICATION: - successCallback = (result) -> { - isFlowRemovedNotificationOn = (boolean) result; - - if (initialized) { - deviceManager.setFlowRemovedNotificationOn(isFlowRemovedNotificationOn); - } - }; - - oldValue = isFlowRemovedNotificationOn; - newValue = Boolean.valueOf(sValue); - modifiable = true; - break; - case SKIP_TABLE_FEATURES: - successCallback = (result) -> { - skipTableFeatures = (boolean) result; - - if (initialized) { - deviceManager.setSkipTableFeatures(skipTableFeatures); - } - }; - - oldValue = skipTableFeatures; - newValue = Boolean.valueOf(sValue); - modifiable = true; - break; - case BASIC_TIMER_DELAY: - successCallback = (result) -> { - basicTimerDelay = (long) result; - - if (initialized) { - statisticsManager.setBasicTimerDelay(basicTimerDelay); - } - }; - - oldValue = basicTimerDelay; - newValue = Long.valueOf(sValue); - modifiable = true; - break; - case MAXIMUM_TIMER_DELAY: - successCallback = (result) -> { - maximumTimerDelay = (long) result; - - if (initialized) { - statisticsManager.setMaximumTimerDelay(maximumTimerDelay); - } - }; - - oldValue = maximumTimerDelay; - newValue = Long.valueOf(sValue); - modifiable = true; - break; - case USE_SINGLE_LAYER_SERIALIZATION: - successCallback = (result) -> useSingleLayerSerialization = (boolean) result; - oldValue = useSingleLayerSerialization; - newValue = Boolean.valueOf(sValue); - modifiable = false; - break; - default: - return; - } + public synchronized void onSystemBootReady() { + LOG.info("onSystemBootReady() received, starting the switch connections"); + + final var size = connectionProviders.size(); + startedProviders = new ArrayList<>(size); + startingProviders = size; + connectionProviders.forEach(this::startProvider); + } + + private synchronized void connectionFailed(final Throwable cause) { + // Decrement below zero, so we do not arrive to zero + startingProviders = -1; + diagStatusProvider.reportStatus(ServiceState.ERROR, cause); + } - doPropertyUpdate(key, modifiable, oldValue, newValue, successCallback); - } catch (final Exception ex) { - LOG.warn("Failed to read configuration property '{}={}', error: {}", key, value, ex); + private synchronized void connectionStarted() { + if (--startingProviders == 0 && startedProviders.equals(connectionProviders)) { + LOG.info("All switchConnectionProviders are up and running ({}).", startedProviders.size()); + diagStatusProvider.reportStatus(ServiceState.OPERATIONAL); } } + private ListenableFuture> shutdownSwitchConnections() { + final var future = Futures.allAsList(startedProviders.stream() + .map(switchConnectionProvider -> { + // Revert deserializers to their original state + if (config.getUseSingleLayerSerialization()) { + DeserializerInjector.revertDeserializers(switchConnectionProvider); + } + + // Shutdown switch connection provider + return switchConnectionProvider.shutdown(); + }).collect(Collectors.toList())); + startedProviders.clear(); + + Futures.addCallback(future, new FutureCallback<>() { + @Override + public void onSuccess(final List result) { + LOG.info("All switchConnectionProviders were successfully shut down ({}).", result.size()); + } + + @Override + public void onFailure(final Throwable throwable) { + LOG.warn("Some switchConnectionProviders failed to shutdown.", throwable); + } + }, MoreExecutors.directExecutor()); + + return future; + } + @Override public ExtensionConverterRegistrator getExtensionConverterRegistrator() { return extensionConverterManager; } @Override - public void close() throws Exception { - initialized = false; - //TODO: consider wrapping each manager into try-catch - deviceManager.close(); - rpcManager.close(); - statisticsManager.close(); - - // Manually shutdown all remaining running threads in pool - threadPool.shutdown(); + public Map getAllFlowGroupHistories() { + return deviceManager.getAllFlowGroupHistories(); } - private static void registerMXBean(final MessageIntelligenceAgency messageIntelligenceAgency) { - final MBeanServer mbs = ManagementFactory.getPlatformMBeanServer(); + @Override + public FlowGroupInfoHistory getFlowGroupHistory(final NodeId nodeId) { + return deviceManager.getFlowGroupHistory(nodeId); + } + + @Override + @PreDestroy + @Deactivate + @SuppressWarnings("checkstyle:IllegalCatch") + public synchronized void close() { + LOG.info("OpenFlowPluginProvider stopping"); + try { + shutdownSwitchConnections().get(10, TimeUnit.SECONDS); + } catch (InterruptedException | ExecutionException | TimeoutException e) { + LOG.warn("Failed to shut down switch connections in time {}s", 10, e); + } + + gracefulShutdown(contextChainHolder); + gracefulShutdown(connectionManager); + gracefulShutdown(deviceManager); + gracefulShutdown(rpcManager); + gracefulShutdown(statisticsManager); + gracefulShutdown(roleManager); + gracefulShutdown(executorService); + gracefulShutdown(hashedWheelTimer); + diagStatusProvider.reportStatus(ServiceState.UNREGISTERED); try { - final String pathToMxBean = String.format("%s:type=%s", - MessageIntelligenceAgencyMXBean.class.getPackage().getName(), - MessageIntelligenceAgencyMXBean.class.getSimpleName()); - final ObjectName name = new ObjectName(pathToMxBean); - mbs.registerMBean(messageIntelligenceAgency, name); - } catch (MalformedObjectNameException - | NotCompliantMBeanException - | MBeanRegistrationException - | InstanceAlreadyExistsException e) { - LOG.warn("Error registering MBean {}", e); + if (connectionManager != null) { + connectionManager.close(); + connectionManager = null; + } + } catch (Exception e) { + LOG.error("Failed to close ConnectionManager", e); + } + LOG.info("OpenFlowPluginProvider stopped"); + } + + @SuppressWarnings("checkstyle:IllegalCatch") + private static void gracefulShutdown(final AutoCloseable closeable) { + if (closeable != null) { + try { + closeable.close(); + } catch (Exception e) { + LOG.warn("Failed to shutdown {} gracefully.", closeable); + } + } + } + + private static void gracefulShutdown(final Timer timer) { + if (timer != null) { + try { + timer.stop(); + } catch (IllegalStateException e) { + LOG.warn("Failed to shutdown {} gracefully.", timer); + } + } + } + + private static void gracefulShutdown(final ExecutorService executorService) { + if (executorService != null) { + try { + executorService.shutdownNow(); + } catch (SecurityException e) { + LOG.warn("Failed to shutdown {} gracefully.", executorService); + } } } }