import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import io.netty.util.HashedWheelTimer;
+import io.netty.util.Timer;
import java.lang.management.ManagementFactory;
import java.util.Collection;
import java.util.List;
import java.util.stream.Collectors;
import javax.annotation.Nonnull;
import javax.management.InstanceAlreadyExistsException;
+import javax.management.InstanceNotFoundException;
import javax.management.MBeanRegistrationException;
import javax.management.MBeanServer;
import javax.management.MalformedObjectNameException;
import javax.management.ObjectName;
import org.opendaylight.controller.md.sal.binding.api.DataBroker;
import org.opendaylight.controller.md.sal.binding.api.NotificationPublishService;
-import org.opendaylight.controller.md.sal.binding.api.NotificationService;
import org.opendaylight.controller.md.sal.common.api.clustering.EntityOwnershipService;
import org.opendaylight.controller.sal.binding.api.RpcProviderRegistry;
import org.opendaylight.mdsal.singleton.common.api.ClusterSingletonServiceProvider;
public class OpenFlowPluginProviderImpl implements OpenFlowPluginProvider, OpenFlowPluginConfigurationService, OpenFlowPluginExtensionRegistratorProvider {
private static final Logger LOG = LoggerFactory.getLogger(OpenFlowPluginProviderImpl.class);
- private static final MessageIntelligenceAgency messageIntelligenceAgency = new MessageIntelligenceAgencyImpl();
+
private static final int TICKS_PER_WHEEL = 500; // 0.5 sec.
private static final long TICK_DURATION = 10;
private static final String POOL_NAME = "ofppool";
+ private static final MessageIntelligenceAgency MESSAGE_INTELLIGENCE_AGENCY = new MessageIntelligenceAgencyImpl();
+ private static final String MESSAGE_INTELLIGENCE_AGENCY_MX_BEAN_NAME = String
+ .format("%s:type=%s",
+ MessageIntelligenceAgencyMXBean.class.getPackage().getName(),
+ MessageIntelligenceAgencyMXBean.class.getSimpleName());
+
private final HashedWheelTimer hashedWheelTimer = new HashedWheelTimer(TICK_DURATION, TimeUnit.MILLISECONDS, TICKS_PER_WHEEL);
- private final NotificationService notificationProviderService;
private final NotificationPublishService notificationPublishService;
private final ExtensionConverterManager extensionConverterManager;
private final DataBroker dataBroker;
private final DeviceInitializerProvider deviceInitializerProvider;
private final ConvertorManager convertorManager;
private final ContextChainHolder contextChainHolder;
+ private final RpcProviderRegistry rpcProviderRegistry;
+ private final ClusterSingletonServiceProvider singletonServicesProvider;
private int rpcRequestsQuota;
private long globalNotificationQuota;
private long barrierInterval;
private long echoReplyTimeout;
private DeviceManager deviceManager;
private RpcManager rpcManager;
- private RpcProviderRegistry rpcProviderRegistry;
private StatisticsManager statisticsManager;
private ConnectionManager connectionManager;
private boolean switchFeaturesMandatory;
private long maximumTimerDelay;
private boolean useSingleLayerSerialization;
private ThreadPoolExecutor threadPool;
- private ClusterSingletonServiceProvider singletonServicesProvider;
private int threadPoolMinThreads;
private int threadPoolMaxThreads;
private long threadPoolTimeout;
private boolean initialized = false;
public static MessageIntelligenceAgency getMessageIntelligenceAgency() {
- return messageIntelligenceAgency;
+ return MESSAGE_INTELLIGENCE_AGENCY;
}
- public OpenFlowPluginProviderImpl(final List<SwitchConnectionProvider> switchConnectionProviders,
- final DataBroker dataBroker,
- final RpcProviderRegistry rpcProviderRegistry,
- final NotificationService notificationProviderService,
- final NotificationPublishService notificationPublishService,
- final ClusterSingletonServiceProvider singletonServiceProvider,
- final EntityOwnershipService entityOwnershipService) {
+ OpenFlowPluginProviderImpl(final List<SwitchConnectionProvider> switchConnectionProviders,
+ final DataBroker dataBroker,
+ final RpcProviderRegistry rpcProviderRegistry,
+ final NotificationPublishService notificationPublishService,
+ final ClusterSingletonServiceProvider singletonServiceProvider,
+ final EntityOwnershipService entityOwnershipService) {
this.switchConnectionProviders = switchConnectionProviders;
this.dataBroker = dataBroker;
this.rpcProviderRegistry = rpcProviderRegistry;
- this.notificationProviderService = notificationProviderService;
this.notificationPublishService = notificationPublishService;
this.singletonServicesProvider = singletonServiceProvider;
convertorManager = ConvertorManagerFactory.createDefaultManager();
private void startSwitchConnections() {
Futures.addCallback(Futures.allAsList(switchConnectionProviders.stream().map(switchConnectionProvider -> {
- // Inject OpenflowPlugin custom serializers and deserializers into OpenflowJava
+ // Inject OpenFlowPlugin custom serializers and deserializers into OpenFlowJava
if (useSingleLayerSerialization) {
SerializerInjector.injectSerializers(switchConnectionProvider);
DeserializerInjector.injectDeserializers(switchConnectionProvider);
}
@Override
- public void onFailure(@Nonnull final Throwable t) {
- LOG.warn("Some switchConnectionProviders failed to start.", t);
+ public void onFailure(@Nonnull final Throwable throwable) {
+ LOG.warn("Some switchConnectionProviders failed to start.", throwable);
+ }
+ });
+ }
+
+ private void shutdownSwitchConnections() {
+ Futures.addCallback(Futures.allAsList(switchConnectionProviders.stream().map(switchConnectionProvider -> {
+ // Revert deserializers to their original state
+ if (useSingleLayerSerialization) {
+ DeserializerInjector.revertDeserializers(switchConnectionProvider);
+ }
+
+ // Shutdown switch connection provider
+ return switchConnectionProvider.shutdown();
+ }).collect(Collectors.toSet())), new FutureCallback<List<Boolean>>() {
+ @Override
+ public void onSuccess(final List<Boolean> result) {
+ LOG.info("All switchConnectionProviders were successfully shut down ({}).", result.size());
+ }
+
+ @Override
+ public void onFailure(@Nonnull final Throwable throwable) {
+ LOG.warn("Some switchConnectionProviders failed to shutdown.", throwable);
}
});
}
@Override
public void initialize() {
- Preconditions.checkNotNull(dataBroker, "missing data broker");
- Preconditions.checkNotNull(rpcProviderRegistry, "missing RPC provider registry");
- Preconditions.checkNotNull(notificationProviderService, "missing notification provider service");
- Preconditions.checkNotNull(singletonServicesProvider, "missing singleton services provider");
-
// TODO: copied from OpenFlowPluginProvider (Helium) misusesing the old way of distributing extension converters
// TODO: rewrite later!
OFSessionUtil.getSessionManager().setExtensionConverterProvider(extensionConverterManager);
connectionManager = new ConnectionManagerImpl(threadPool);
connectionManager.setEchoReplyTimeout(echoReplyTimeout);
- registerMXBean(messageIntelligenceAgency);
+ registerMXBean(MESSAGE_INTELLIGENCE_AGENCY, MESSAGE_INTELLIGENCE_AGENCY_MX_BEAN_NAME);
contextChainHolder.addSingletonServicesProvider(singletonServicesProvider);
if (Objects.nonNull(propertyType)) {
updateProperty(propertyType, value);
- } else if (!key.equals("service.pid") && !key.equals("felix.fileinstall.filename")) {
- LOG.warn("Unsupported configuration property '{}={}'", key, value);
}
});
}
@Override
public void close() throws Exception {
initialized = false;
- //TODO: consider wrapping each manager into try-catch
- deviceManager.close();
- rpcManager.close();
- statisticsManager.close();
+ gracefulShutdown(contextChainHolder);
+ gracefulShutdown(deviceManager);
+ gracefulShutdown(rpcManager);
+ gracefulShutdown(statisticsManager);
+ gracefulShutdown(threadPool);
+ gracefulShutdown(hashedWheelTimer);
+ shutdownSwitchConnections();
+ unregisterMXBean(MESSAGE_INTELLIGENCE_AGENCY_MX_BEAN_NAME);
+ }
- // Manually shutdown all remaining running threads in pool
- threadPool.shutdown();
+ private static void gracefulShutdown(final AutoCloseable closeable) {
+ if (Objects.isNull(closeable)) {
+ return;
+ }
+
+ try {
+ closeable.close();
+ } catch (Exception e) {
+ LOG.warn("Failed to shutdown {} gracefully.", closeable);
+ }
+ }
+
+ private static void gracefulShutdown(final Timer timer) {
+ if (Objects.isNull(timer)) {
+ return;
+ }
+
+ try {
+ timer.stop();
+ } catch (Exception e) {
+ LOG.warn("Failed to shutdown {} gracefully.", timer);
+ }
}
- private static void registerMXBean(final MessageIntelligenceAgency messageIntelligenceAgency) {
+ private static void gracefulShutdown(final ThreadPoolExecutor threadPoolExecutor) {
+ if (Objects.isNull(threadPoolExecutor)) {
+ return;
+ }
+
+ try {
+ threadPoolExecutor.shutdown();
+ } catch (Exception e) {
+ LOG.warn("Failed to shutdown {} gracefully.", threadPoolExecutor);
+ }
+ }
+
+ private static void registerMXBean(final Object bean, final String beanName) {
final MBeanServer mbs = ManagementFactory.getPlatformMBeanServer();
+
try {
- final String pathToMxBean = String.format("%s:type=%s",
- MessageIntelligenceAgencyMXBean.class.getPackage().getName(),
- MessageIntelligenceAgencyMXBean.class.getSimpleName());
- final ObjectName name = new ObjectName(pathToMxBean);
- mbs.registerMBean(messageIntelligenceAgency, name);
+ mbs.registerMBean(bean, new ObjectName(beanName));
} catch (MalformedObjectNameException
| NotCompliantMBeanException
| MBeanRegistrationException
LOG.warn("Error registering MBean {}", e);
}
}
+
+ private static void unregisterMXBean(final String beanName) {
+ final MBeanServer mbs = ManagementFactory.getPlatformMBeanServer();
+
+ try {
+ mbs.unregisterMBean(new ObjectName(beanName));
+ } catch (InstanceNotFoundException
+ | MBeanRegistrationException
+ | MalformedObjectNameException e) {
+ LOG.warn("Error unregistering MBean {}", e);
+ }
+ }
}