* terms of the Eclipse Public License v1.0 which accompanies this distribution,
* and is available at http://www.eclipse.org/legal/epl-v10.html
*/
-
package org.opendaylight.openflowplugin.impl;
-import com.google.common.base.Preconditions;
+import com.google.common.annotations.VisibleForTesting;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.ListeningExecutorService;
+import com.google.common.util.concurrent.MoreExecutors;
+import com.google.common.util.concurrent.SettableFuture;
import io.netty.util.HashedWheelTimer;
+import io.netty.util.Timer;
import java.lang.management.ManagementFactory;
import java.util.Collection;
import java.util.List;
-import java.util.Map;
-import java.util.Objects;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
import java.util.concurrent.SynchronousQueue;
-import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
-import java.util.function.Consumer;
+import java.util.concurrent.TimeoutException;
import java.util.stream.Collectors;
import javax.annotation.Nonnull;
+import javax.annotation.PostConstruct;
+import javax.annotation.PreDestroy;
+import javax.inject.Inject;
+import javax.inject.Singleton;
import javax.management.InstanceAlreadyExistsException;
+import javax.management.InstanceNotFoundException;
import javax.management.MBeanRegistrationException;
import javax.management.MBeanServer;
import javax.management.MalformedObjectNameException;
import javax.management.NotCompliantMBeanException;
import javax.management.ObjectName;
-import org.opendaylight.controller.md.sal.binding.api.DataBroker;
-import org.opendaylight.controller.md.sal.binding.api.NotificationPublishService;
-import org.opendaylight.controller.md.sal.binding.api.NotificationService;
-import org.opendaylight.controller.md.sal.common.api.clustering.EntityOwnershipService;
-import org.opendaylight.controller.sal.binding.api.RpcProviderRegistry;
+import org.apache.aries.blueprint.annotation.service.Reference;
+import org.apache.aries.blueprint.annotation.service.Service;
+import org.opendaylight.infrautils.diagstatus.ServiceState;
+import org.opendaylight.infrautils.ready.SystemReadyListener;
+import org.opendaylight.infrautils.ready.SystemReadyMonitor;
+import org.opendaylight.mdsal.binding.api.DataBroker;
+import org.opendaylight.mdsal.binding.api.NotificationPublishService;
+import org.opendaylight.mdsal.binding.api.RpcProviderService;
+import org.opendaylight.mdsal.eos.binding.api.EntityOwnershipService;
import org.opendaylight.mdsal.singleton.common.api.ClusterSingletonServiceProvider;
+import org.opendaylight.openflowjava.protocol.api.connection.OpenflowDiagStatusProvider;
import org.opendaylight.openflowjava.protocol.spi.connection.SwitchConnectionProvider;
-import org.opendaylight.openflowplugin.api.openflow.OpenFlowPluginConfigurationService;
+import org.opendaylight.openflowjava.protocol.spi.connection.SwitchConnectionProviderList;
import org.opendaylight.openflowplugin.api.openflow.OpenFlowPluginProvider;
+import org.opendaylight.openflowplugin.api.openflow.configuration.ConfigurationService;
import org.opendaylight.openflowplugin.api.openflow.connection.ConnectionManager;
import org.opendaylight.openflowplugin.api.openflow.device.DeviceManager;
-import org.opendaylight.openflowplugin.api.openflow.lifecycle.ContextChainHolder;
+import org.opendaylight.openflowplugin.api.openflow.mastership.MastershipChangeServiceManager;
+import org.opendaylight.openflowplugin.api.openflow.role.RoleManager;
import org.opendaylight.openflowplugin.api.openflow.rpc.RpcManager;
import org.opendaylight.openflowplugin.api.openflow.statistics.StatisticsManager;
import org.opendaylight.openflowplugin.api.openflow.statistics.ofpspecific.MessageIntelligenceAgency;
import org.opendaylight.openflowplugin.extension.api.ExtensionConverterRegistrator;
import org.opendaylight.openflowplugin.extension.api.OpenFlowPluginExtensionRegistratorProvider;
import org.opendaylight.openflowplugin.extension.api.core.extension.ExtensionConverterManager;
+import org.opendaylight.openflowplugin.impl.configuration.OpenFlowProviderConfigImpl;
import org.opendaylight.openflowplugin.impl.connection.ConnectionManagerImpl;
import org.opendaylight.openflowplugin.impl.device.DeviceManagerImpl;
import org.opendaylight.openflowplugin.impl.device.initialization.DeviceInitializerProvider;
import org.opendaylight.openflowplugin.impl.lifecycle.ContextChainHolderImpl;
import org.opendaylight.openflowplugin.impl.protocol.deserialization.DeserializerInjector;
import org.opendaylight.openflowplugin.impl.protocol.serialization.SerializerInjector;
+import org.opendaylight.openflowplugin.impl.role.RoleManagerImpl;
import org.opendaylight.openflowplugin.impl.rpc.RpcManagerImpl;
import org.opendaylight.openflowplugin.impl.statistics.StatisticsManagerImpl;
import org.opendaylight.openflowplugin.impl.statistics.ofpspecific.MessageIntelligenceAgencyImpl;
import org.opendaylight.openflowplugin.impl.statistics.ofpspecific.MessageIntelligenceAgencyMXBean;
+import org.opendaylight.openflowplugin.impl.util.ThreadPoolLoggingExecutor;
import org.opendaylight.openflowplugin.impl.util.TranslatorLibraryUtil;
-import org.opendaylight.openflowplugin.openflow.md.core.ThreadPoolLoggingExecutor;
import org.opendaylight.openflowplugin.openflow.md.core.extension.ExtensionConverterManagerImpl;
import org.opendaylight.openflowplugin.openflow.md.core.sal.convertor.ConvertorManager;
import org.opendaylight.openflowplugin.openflow.md.core.sal.convertor.ConvertorManagerFactory;
import org.opendaylight.openflowplugin.openflow.md.core.session.OFSessionUtil;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.openflow.provider.config.rev160510.OpenflowProviderConfig;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-public class OpenFlowPluginProviderImpl implements OpenFlowPluginProvider, OpenFlowPluginConfigurationService, OpenFlowPluginExtensionRegistratorProvider {
+@Singleton
+@Service(classes = { OpenFlowPluginProvider.class, OpenFlowPluginExtensionRegistratorProvider.class })
+public class OpenFlowPluginProviderImpl implements
+ OpenFlowPluginProvider,
+ OpenFlowPluginExtensionRegistratorProvider,
+ SystemReadyListener {
private static final Logger LOG = LoggerFactory.getLogger(OpenFlowPluginProviderImpl.class);
- private static final MessageIntelligenceAgency messageIntelligenceAgency = new MessageIntelligenceAgencyImpl();
+
private static final int TICKS_PER_WHEEL = 500; // 0.5 sec.
private static final long TICK_DURATION = 10;
private static final String POOL_NAME = "ofppool";
- private final HashedWheelTimer hashedWheelTimer = new HashedWheelTimer(TICK_DURATION, TimeUnit.MILLISECONDS, TICKS_PER_WHEEL);
- private final NotificationService notificationProviderService;
+ private static final MessageIntelligenceAgency MESSAGE_INTELLIGENCE_AGENCY = new MessageIntelligenceAgencyImpl();
+ private static final String MESSAGE_INTELLIGENCE_AGENCY_MX_BEAN_NAME = String
+ .format("%s:type=%s",
+ MessageIntelligenceAgencyMXBean.class.getPackage().getName(),
+ MessageIntelligenceAgencyMXBean.class.getSimpleName());
+
+ private final HashedWheelTimer hashedWheelTimer =
+ new HashedWheelTimer(TICK_DURATION, TimeUnit.MILLISECONDS, TICKS_PER_WHEEL);
private final NotificationPublishService notificationPublishService;
private final ExtensionConverterManager extensionConverterManager;
private final DataBroker dataBroker;
private final Collection<SwitchConnectionProvider> switchConnectionProviders;
private final DeviceInitializerProvider deviceInitializerProvider;
private final ConvertorManager convertorManager;
- private final ContextChainHolder contextChainHolder;
- private int rpcRequestsQuota;
- private long globalNotificationQuota;
- private long barrierInterval;
- private int barrierCountLimit;
- private long echoReplyTimeout;
+ private final RpcProviderService rpcProviderRegistry;
+ private final ClusterSingletonServiceProvider singletonServicesProvider;
+ private final OpenflowProviderConfig config;
+ private final EntityOwnershipService entityOwnershipService;
+ private final MastershipChangeServiceManager mastershipChangeServiceManager;
private DeviceManager deviceManager;
private RpcManager rpcManager;
- private RpcProviderRegistry rpcProviderRegistry;
private StatisticsManager statisticsManager;
+ private RoleManager roleManager;
private ConnectionManager connectionManager;
- private boolean switchFeaturesMandatory;
- private boolean isStatisticsPollingOn;
- private boolean isStatisticsRpcEnabled;
- private boolean isFlowRemovedNotificationOn;
- private boolean skipTableFeatures;
- private long basicTimerDelay;
- private long maximumTimerDelay;
- private boolean useSingleLayerSerialization;
- private ThreadPoolExecutor threadPool;
- private ClusterSingletonServiceProvider singletonServicesProvider;
- private int threadPoolMinThreads;
- private int threadPoolMaxThreads;
- private long threadPoolTimeout;
- private boolean initialized = false;
+ private ListeningExecutorService executorService;
+ private ContextChainHolderImpl contextChainHolder;
+ private final OpenflowDiagStatusProvider openflowDiagStatusProvider;
+ private final SettableFuture<Void> fullyStarted = SettableFuture.create();
+ private static final String OPENFLOW_SERVICE_NAME = "OPENFLOW";
public static MessageIntelligenceAgency getMessageIntelligenceAgency() {
- return messageIntelligenceAgency;
+ return MESSAGE_INTELLIGENCE_AGENCY;
}
- public OpenFlowPluginProviderImpl(final List<SwitchConnectionProvider> switchConnectionProviders,
- final DataBroker dataBroker,
- final RpcProviderRegistry rpcProviderRegistry,
- final NotificationService notificationProviderService,
- final NotificationPublishService notificationPublishService,
- final ClusterSingletonServiceProvider singletonServiceProvider,
- final EntityOwnershipService entityOwnershipService) {
+ @Inject
+ public OpenFlowPluginProviderImpl(final ConfigurationService configurationService,
+ final SwitchConnectionProviderList switchConnectionProviders,
+ final PingPongDataBroker pingPongDataBroker,
+ final @Reference RpcProviderService rpcProviderRegistry,
+ final @Reference NotificationPublishService notificationPublishService,
+ final @Reference ClusterSingletonServiceProvider singletonServiceProvider,
+ final @Reference EntityOwnershipService entityOwnershipService,
+ final MastershipChangeServiceManager mastershipChangeServiceManager,
+ final @Reference OpenflowDiagStatusProvider openflowDiagStatusProvider,
+ final @Reference SystemReadyMonitor systemReadyMonitor) {
this.switchConnectionProviders = switchConnectionProviders;
- this.dataBroker = dataBroker;
+ this.dataBroker = pingPongDataBroker;
this.rpcProviderRegistry = rpcProviderRegistry;
- this.notificationProviderService = notificationProviderService;
this.notificationPublishService = notificationPublishService;
this.singletonServicesProvider = singletonServiceProvider;
+ this.entityOwnershipService = entityOwnershipService;
convertorManager = ConvertorManagerFactory.createDefaultManager();
- contextChainHolder = new ContextChainHolderImpl(hashedWheelTimer);
- contextChainHolder.changeEntityOwnershipService(entityOwnershipService);
extensionConverterManager = new ExtensionConverterManagerImpl();
deviceInitializerProvider = DeviceInitializerProviderFactory.createDefaultProvider();
+ config = new OpenFlowProviderConfigImpl(configurationService);
+ this.mastershipChangeServiceManager = mastershipChangeServiceManager;
+ this.openflowDiagStatusProvider = openflowDiagStatusProvider;
+ systemReadyMonitor.registerListener(this);
+ LOG.info("registered onSystemBootReady() listener for deferred startSwitchConnections()");
}
-
- private void startSwitchConnections() {
+ @Override
+ public void onSystemBootReady() {
+ LOG.info("onSystemBootReady() received, starting the switch connections");
Futures.addCallback(Futures.allAsList(switchConnectionProviders.stream().map(switchConnectionProvider -> {
- // Inject OpenflowPlugin custom serializers and deserializers into OpenflowJava
- if (useSingleLayerSerialization) {
- SerializerInjector.injectSerializers(switchConnectionProvider);
+ // Inject OpenFlowPlugin custom serializers and deserializers into OpenFlowJava
+ if (config.isUseSingleLayerSerialization()) {
+ SerializerInjector.injectSerializers(switchConnectionProvider,
+ switchConnectionProvider.getConfiguration().isGroupAddModEnabled());
DeserializerInjector.injectDeserializers(switchConnectionProvider);
} else {
DeserializerInjector.revertDeserializers(switchConnectionProvider);
return switchConnectionProvider.startup();
}).collect(Collectors.toSet())), new FutureCallback<List<Boolean>>() {
@Override
- public void onSuccess(final List<Boolean> result) {
+ public void onSuccess(@Nonnull final List<Boolean> result) {
LOG.info("All switchConnectionProviders are up and running ({}).", result.size());
+ openflowDiagStatusProvider.reportStatus(OPENFLOW_SERVICE_NAME, ServiceState.OPERATIONAL);
+ fullyStarted.set(null);
+ }
+
+ @Override
+ public void onFailure(@Nonnull final Throwable throwable) {
+ LOG.warn("Some switchConnectionProviders failed to start.", throwable);
+ openflowDiagStatusProvider.reportStatus(OPENFLOW_SERVICE_NAME, throwable);
+ fullyStarted.setException(throwable);
+ }
+ }, MoreExecutors.directExecutor());
+ }
+
+ @VisibleForTesting
+ public Future<Void> getFullyStarted() {
+ return fullyStarted;
+ }
+
+ private ListenableFuture<List<Boolean>> shutdownSwitchConnections() {
+ final ListenableFuture<List<Boolean>> listListenableFuture =
+ Futures.allAsList(switchConnectionProviders.stream().map(switchConnectionProvider -> {
+ // Revert deserializers to their original state
+ if (config.isUseSingleLayerSerialization()) {
+ DeserializerInjector.revertDeserializers(switchConnectionProvider);
+ }
+
+ // Shutdown switch connection provider
+ return switchConnectionProvider.shutdown();
+ }).collect(Collectors.toSet()));
+
+ Futures.addCallback(listListenableFuture, new FutureCallback<List<Boolean>>() {
+ @Override
+ public void onSuccess(@Nonnull final List<Boolean> result) {
+ LOG.info("All switchConnectionProviders were successfully shut down ({}).", result.size());
}
@Override
- public void onFailure(@Nonnull final Throwable t) {
- LOG.warn("Some switchConnectionProviders failed to start.", t);
+ public void onFailure(@Nonnull final Throwable throwable) {
+ LOG.warn("Some switchConnectionProviders failed to shutdown.", throwable);
}
- });
+ }, MoreExecutors.directExecutor());
+
+ return listListenableFuture;
}
@Override
+ @PostConstruct
public void initialize() {
- Preconditions.checkNotNull(dataBroker, "missing data broker");
- Preconditions.checkNotNull(rpcProviderRegistry, "missing RPC provider registry");
- Preconditions.checkNotNull(notificationProviderService, "missing notification provider service");
- Preconditions.checkNotNull(singletonServicesProvider, "missing singleton services provider");
+ registerMXBean(MESSAGE_INTELLIGENCE_AGENCY, MESSAGE_INTELLIGENCE_AGENCY_MX_BEAN_NAME);
// TODO: copied from OpenFlowPluginProvider (Helium) misusesing the old way of distributing extension converters
// TODO: rewrite later!
// Creates a thread pool that creates new threads as needed, but will reuse previously
// constructed threads when they are available.
// Threads that have not been used for x seconds are terminated and removed from the cache.
- threadPool = new ThreadPoolLoggingExecutor(
- Preconditions.checkNotNull(threadPoolMinThreads),
- Preconditions.checkNotNull(threadPoolMaxThreads),
- Preconditions.checkNotNull(threadPoolTimeout),
- TimeUnit.SECONDS, new SynchronousQueue<>(), POOL_NAME);
-
- connectionManager = new ConnectionManagerImpl(threadPool);
- connectionManager.setEchoReplyTimeout(echoReplyTimeout);
-
- registerMXBean(messageIntelligenceAgency);
-
- contextChainHolder.addSingletonServicesProvider(singletonServicesProvider);
+ executorService = MoreExecutors.listeningDecorator(new ThreadPoolLoggingExecutor(
+ config.getThreadPoolMinThreads(),
+ config.getThreadPoolMaxThreads().getValue(),
+ config.getThreadPoolTimeout(),
+ TimeUnit.SECONDS, new SynchronousQueue<>(), POOL_NAME));
deviceManager = new DeviceManagerImpl(
+ config,
dataBroker,
getMessageIntelligenceAgency(),
notificationPublishService,
hashedWheelTimer,
convertorManager,
- deviceInitializerProvider,
- useSingleLayerSerialization);
-
- deviceManager.setGlobalNotificationQuota(globalNotificationQuota);
- deviceManager.setSwitchFeaturesMandatory(switchFeaturesMandatory);
- deviceManager.setBarrierInterval(barrierInterval);
- deviceManager.setBarrierCountLimit(barrierCountLimit);
- deviceManager.setFlowRemovedNotificationOn(isFlowRemovedNotificationOn);
- deviceManager.setSkipTableFeatures(skipTableFeatures);
+ deviceInitializerProvider);
+ TranslatorLibraryUtil.injectBasicTranslatorLibrary(deviceManager, convertorManager);
((ExtensionConverterProviderKeeper) deviceManager).setExtensionConverterProvider(extensionConverterManager);
- rpcManager = new RpcManagerImpl(rpcProviderRegistry, extensionConverterManager, convertorManager, notificationPublishService);
- rpcManager.setRpcRequestQuota(rpcRequestsQuota);
-
- statisticsManager = new StatisticsManagerImpl(rpcProviderRegistry, hashedWheelTimer, convertorManager);
- statisticsManager.setBasicTimerDelay(basicTimerDelay);
- statisticsManager.setMaximumTimerDelay(maximumTimerDelay);
- statisticsManager.setIsStatisticsPollingOn(isStatisticsPollingOn);
-
- // Device connection handler moved from device manager to context holder
- connectionManager.setDeviceConnectedHandler(contextChainHolder);
+ rpcManager = new RpcManagerImpl(
+ config,
+ rpcProviderRegistry,
+ extensionConverterManager,
+ convertorManager,
+ notificationPublishService);
- /* Termination Phase ordering - OFP Device Context suite */
- connectionManager.setDeviceDisconnectedHandler(contextChainHolder);
+ statisticsManager = new StatisticsManagerImpl(
+ config,
+ rpcProviderRegistry,
+ convertorManager,
+ executorService);
- rpcManager.setStatisticsRpcEnabled(isStatisticsRpcEnabled);
+ roleManager = new RoleManagerImpl(hashedWheelTimer, config);
- TranslatorLibraryUtil.injectBasicTranslatorLibrary(deviceManager, convertorManager);
- deviceManager.initialize();
+ contextChainHolder = new ContextChainHolderImpl(
+ executorService,
+ singletonServicesProvider,
+ entityOwnershipService,
+ mastershipChangeServiceManager);
contextChainHolder.addManager(deviceManager);
contextChainHolder.addManager(statisticsManager);
contextChainHolder.addManager(rpcManager);
+ contextChainHolder.addManager(roleManager);
- startSwitchConnections();
- initialized = true;
- }
+ connectionManager = new ConnectionManagerImpl(config, executorService);
+ connectionManager.setDeviceConnectedHandler(contextChainHolder);
+ connectionManager.setDeviceDisconnectedHandler(contextChainHolder);
+ deviceManager.setContextChainHolder(contextChainHolder);
+ deviceManager.initialize();
+ }
@Override
- public void update(@Nonnull final Map<String, Object> properties) {
- properties.forEach((key, value) -> {
- final PropertyType propertyType = PropertyType.forValue(key);
-
- if (Objects.nonNull(propertyType)) {
- updateProperty(propertyType, value);
- } else if (!key.equals("service.pid") && !key.equals("felix.fileinstall.filename")) {
- LOG.warn("Unsupported configuration property '{}={}'", key, value);
- }
- });
+ public ExtensionConverterRegistrator getExtensionConverterRegistrator() {
+ return extensionConverterManager;
}
- private void doPropertyUpdate(final PropertyType propertyType,
- final boolean modifiable,
- final Object origValue,
- final Object newValue,
- final Consumer<Object> successCallback) {
- if (initialized) {
- if (Objects.equals(origValue, newValue)) {
- LOG.debug("{} config parameter is already set to {})", propertyType, origValue);
- return;
- } else if (!modifiable) {
- LOG.warn("{} update ({} -> {}) is not allowed after controller start", propertyType, origValue, newValue);
- return;
- }
+ @Override
+ @PreDestroy
+ public void close() {
+ try {
+ shutdownSwitchConnections().get(10, TimeUnit.SECONDS);
+ } catch (InterruptedException | ExecutionException | TimeoutException e) {
+ LOG.warn("Failed to shut down switch connections in time {}s", 10, e);
}
- successCallback.accept(newValue);
- LOG.info("{} config parameter is updated ({} -> {})", propertyType, origValue, newValue);
+ gracefulShutdown(contextChainHolder);
+ gracefulShutdown(deviceManager);
+ gracefulShutdown(rpcManager);
+ gracefulShutdown(statisticsManager);
+ gracefulShutdown(roleManager);
+ gracefulShutdown(executorService);
+ gracefulShutdown(hashedWheelTimer);
+ unregisterMXBean(MESSAGE_INTELLIGENCE_AGENCY_MX_BEAN_NAME);
+ openflowDiagStatusProvider.reportStatus(ServiceState.UNREGISTERED);
}
- @Override
- public void updateProperty(@Nonnull final PropertyType key, @Nonnull final Object value) {
- try {
- final String sValue = value.toString();
- final Consumer<Object> successCallback;
- final boolean modifiable;
- final Object oldValue;
- final Object newValue;
-
- switch (key) {
- case RPC_REQUESTS_QUOTA:
- successCallback = (result) -> {
- rpcRequestsQuota = (int) result;
-
- if (initialized) {
- rpcManager.setRpcRequestQuota(rpcRequestsQuota);
- }
- };
-
- oldValue = rpcRequestsQuota;
- newValue = Integer.valueOf(sValue);
- modifiable = true;
- break;
- case SWITCH_FEATURES_MANDATORY:
- successCallback = (result) -> {
- switchFeaturesMandatory = (boolean) result;
-
- if (initialized) {
- deviceManager.setSwitchFeaturesMandatory(switchFeaturesMandatory);
- }
- };
-
- oldValue = switchFeaturesMandatory;
- newValue = Boolean.valueOf(sValue);
- modifiable = true;
- break;
- case GLOBAL_NOTIFICATION_QUOTA:
- successCallback = (result) -> {
- globalNotificationQuota = (long) result;
-
- if (initialized) {
- deviceManager.setGlobalNotificationQuota(globalNotificationQuota);
- }
- };
-
- oldValue = globalNotificationQuota;
- newValue = Long.valueOf(sValue);
- modifiable = true;
- break;
- case IS_STATISTICS_POLLING_ON:
- successCallback = (result) -> {
- isStatisticsPollingOn = (boolean) result;
-
- if (initialized) {
- statisticsManager.setIsStatisticsPollingOn(isStatisticsPollingOn);
- }
- };
-
- oldValue = isStatisticsPollingOn;
- newValue = Boolean.valueOf(sValue);
- modifiable = true;
- break;
- case IS_STATISTICS_RPC_ENABLED:
- successCallback = (result) -> {
- isStatisticsRpcEnabled = (boolean) result;
-
- if (initialized) {
- rpcManager.setStatisticsRpcEnabled(isStatisticsRpcEnabled);
- }
- };
-
- oldValue = isStatisticsRpcEnabled;
- newValue = Boolean.valueOf(sValue);
- modifiable = true;
- break;
- case BARRIER_INTERVAL_TIMEOUT_LIMIT:
- successCallback = (result) -> {
- barrierInterval = (long) result;
-
- if (initialized) {
- deviceManager.setBarrierInterval(barrierInterval);
- }
- };
-
- oldValue = barrierInterval;
- newValue = Long.valueOf(sValue);
- modifiable = true;
- break;
- case BARRIER_COUNT_LIMIT:
- successCallback = (result) -> {
- barrierCountLimit = (int) result;
-
- if (initialized) {
- deviceManager.setBarrierCountLimit(barrierCountLimit);
- }
- };
-
- oldValue = barrierCountLimit;
- newValue = Integer.valueOf(sValue);
- modifiable = true;
- break;
- case ECHO_REPLY_TIMEOUT:
- successCallback = (result) -> {
- echoReplyTimeout = (long) result;
-
- if (initialized) {
- connectionManager.setEchoReplyTimeout(echoReplyTimeout);
- }
- };
-
- oldValue = echoReplyTimeout;
- newValue = Long.valueOf(sValue);
- modifiable = true;
- break;
- case THREAD_POOL_MIN_THREADS:
- successCallback = (result) -> threadPoolMinThreads = (int) result;
- oldValue = threadPoolMinThreads;
- newValue = Integer.valueOf(sValue);
- modifiable = false;
- break;
- case THREAD_POOL_MAX_THREADS:
- successCallback = (result) -> threadPoolMaxThreads = (int) result;
- oldValue = threadPoolMaxThreads;
- newValue = Integer.valueOf(sValue);
- modifiable = false;
- break;
- case THREAD_POOL_TIMEOUT:
- successCallback = (result) -> threadPoolTimeout = (long) result;
- oldValue = threadPoolTimeout;
- newValue = Long.valueOf(sValue);
- modifiable = false;
- break;
- case ENABLE_FLOW_REMOVED_NOTIFICATION:
- successCallback = (result) -> {
- isFlowRemovedNotificationOn = (boolean) result;
-
- if (initialized) {
- deviceManager.setFlowRemovedNotificationOn(isFlowRemovedNotificationOn);
- }
- };
-
- oldValue = isFlowRemovedNotificationOn;
- newValue = Boolean.valueOf(sValue);
- modifiable = true;
- break;
- case SKIP_TABLE_FEATURES:
- successCallback = (result) -> {
- skipTableFeatures = (boolean) result;
-
- if (initialized) {
- deviceManager.setSkipTableFeatures(skipTableFeatures);
- }
- };
-
- oldValue = skipTableFeatures;
- newValue = Boolean.valueOf(sValue);
- modifiable = true;
- break;
- case BASIC_TIMER_DELAY:
- successCallback = (result) -> {
- basicTimerDelay = (long) result;
-
- if (initialized) {
- statisticsManager.setBasicTimerDelay(basicTimerDelay);
- }
- };
-
- oldValue = basicTimerDelay;
- newValue = Long.valueOf(sValue);
- modifiable = true;
- break;
- case MAXIMUM_TIMER_DELAY:
- successCallback = (result) -> {
- maximumTimerDelay = (long) result;
-
- if (initialized) {
- statisticsManager.setMaximumTimerDelay(maximumTimerDelay);
- }
- };
-
- oldValue = maximumTimerDelay;
- newValue = Long.valueOf(sValue);
- modifiable = true;
- break;
- case USE_SINGLE_LAYER_SERIALIZATION:
- successCallback = (result) -> useSingleLayerSerialization = (boolean) result;
- oldValue = useSingleLayerSerialization;
- newValue = Boolean.valueOf(sValue);
- modifiable = false;
- break;
- default:
- return;
+ @SuppressWarnings("checkstyle:IllegalCatch")
+ private static void gracefulShutdown(final AutoCloseable closeable) {
+ if (closeable != null) {
+ try {
+ closeable.close();
+ } catch (Exception e) {
+ LOG.warn("Failed to shutdown {} gracefully.", closeable);
}
-
- doPropertyUpdate(key, modifiable, oldValue, newValue, successCallback);
- } catch (final Exception ex) {
- LOG.warn("Failed to read configuration property '{}={}', error: {}", key, value, ex);
}
}
- @Override
- public ExtensionConverterRegistrator getExtensionConverterRegistrator() {
- return extensionConverterManager;
+ private static void gracefulShutdown(final Timer timer) {
+ if (timer != null) {
+ try {
+ timer.stop();
+ } catch (IllegalStateException e) {
+ LOG.warn("Failed to shutdown {} gracefully.", timer);
+ }
+ }
}
- @Override
- public void close() throws Exception {
- initialized = false;
- //TODO: consider wrapping each manager into try-catch
- deviceManager.close();
- rpcManager.close();
- statisticsManager.close();
-
- // Manually shutdown all remaining running threads in pool
- threadPool.shutdown();
+ private static void gracefulShutdown(final ExecutorService executorService) {
+ if (executorService != null) {
+ try {
+ executorService.shutdownNow();
+ } catch (SecurityException e) {
+ LOG.warn("Failed to shutdown {} gracefully.", executorService);
+ }
+ }
}
- private static void registerMXBean(final MessageIntelligenceAgency messageIntelligenceAgency) {
+ private static void registerMXBean(final Object bean, final String beanName) {
final MBeanServer mbs = ManagementFactory.getPlatformMBeanServer();
+
try {
- final String pathToMxBean = String.format("%s:type=%s",
- MessageIntelligenceAgencyMXBean.class.getPackage().getName(),
- MessageIntelligenceAgencyMXBean.class.getSimpleName());
- final ObjectName name = new ObjectName(pathToMxBean);
- mbs.registerMBean(messageIntelligenceAgency, name);
+ mbs.registerMBean(bean, new ObjectName(beanName));
} catch (MalformedObjectNameException
| NotCompliantMBeanException
| MBeanRegistrationException
| InstanceAlreadyExistsException e) {
- LOG.warn("Error registering MBean {}", e);
+ LOG.warn("Error registering MBean {}", beanName, e);
+ }
+ }
+
+ private static void unregisterMXBean(final String beanName) {
+ final MBeanServer mbs = ManagementFactory.getPlatformMBeanServer();
+
+ try {
+ mbs.unregisterMBean(new ObjectName(beanName));
+ } catch (InstanceNotFoundException
+ | MBeanRegistrationException
+ | MalformedObjectNameException e) {
+ LOG.warn("Error unregistering MBean {}", beanName, e);
}
}
}