import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.stream.Collectors;
-import javax.annotation.Nonnull;
import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
import javax.inject.Inject;
return switchConnectionProvider.startup();
}).collect(Collectors.toSet())), new FutureCallback<List<Boolean>>() {
@Override
- public void onSuccess(@Nonnull final List<Boolean> result) {
+ public void onSuccess(final List<Boolean> result) {
LOG.info("All switchConnectionProviders are up and running ({}).", result.size());
openflowDiagStatusProvider.reportStatus(OPENFLOW_SERVICE_NAME, ServiceState.OPERATIONAL);
fullyStarted.set(null);
}
@Override
- public void onFailure(@Nonnull final Throwable throwable) {
+ public void onFailure(final Throwable throwable) {
LOG.warn("Some switchConnectionProviders failed to start.", throwable);
openflowDiagStatusProvider.reportStatus(OPENFLOW_SERVICE_NAME, throwable);
fullyStarted.setException(throwable);
Futures.addCallback(listListenableFuture, new FutureCallback<List<Boolean>>() {
@Override
- public void onSuccess(@Nonnull final List<Boolean> result) {
+ public void onSuccess(final List<Boolean> result) {
LOG.info("All switchConnectionProviders were successfully shut down ({}).", result.size());
}
@Override
- public void onFailure(@Nonnull final Throwable throwable) {
+ public void onFailure(final Throwable throwable) {
LOG.warn("Some switchConnectionProviders failed to shutdown.", throwable);
}
}, MoreExecutors.directExecutor());
// constructed threads when they are available.
// Threads that have not been used for x seconds are terminated and removed from the cache.
executorService = MoreExecutors.listeningDecorator(new ThreadPoolLoggingExecutor(
- config.getThreadPoolMinThreads(),
- config.getThreadPoolMaxThreads().getValue(),
- config.getThreadPoolTimeout(),
+ config.getThreadPoolMinThreads().toJava(),
+ config.getThreadPoolMaxThreads().getValue().toJava(),
+ config.getThreadPoolTimeout().toJava(),
TimeUnit.SECONDS, new SynchronousQueue<>(), POOL_NAME));
deviceManager = new DeviceManagerImpl(
notificationPublishService,
hashedWheelTimer,
convertorManager,
- deviceInitializerProvider);
+ deviceInitializerProvider,
+ executorService);
TranslatorLibraryUtil.injectBasicTranslatorLibrary(deviceManager, convertorManager);
((ExtensionConverterProviderKeeper) deviceManager).setExtensionConverterProvider(extensionConverterManager);
convertorManager,
executorService);
- roleManager = new RoleManagerImpl(hashedWheelTimer, config);
+ roleManager = new RoleManagerImpl(hashedWheelTimer, config, executorService);
contextChainHolder = new ContextChainHolderImpl(
executorService,
contextChainHolder.addManager(rpcManager);
contextChainHolder.addManager(roleManager);
- connectionManager = new ConnectionManagerImpl(config, executorService);
+ connectionManager = new ConnectionManagerImpl(config, executorService, dataBroker, notificationPublishService);
connectionManager.setDeviceConnectedHandler(contextChainHolder);
connectionManager.setDeviceDisconnectedHandler(contextChainHolder);
@Override
@PreDestroy
+ @SuppressWarnings("checkstyle:IllegalCatch")
public void close() {
try {
shutdownSwitchConnections().get(10, TimeUnit.SECONDS);
gracefulShutdown(hashedWheelTimer);
unregisterMXBean(MESSAGE_INTELLIGENCE_AGENCY_MX_BEAN_NAME);
openflowDiagStatusProvider.reportStatus(ServiceState.UNREGISTERED);
+ try {
+ if (connectionManager != null) {
+ connectionManager.close();
+ connectionManager = null;
+ }
+ } catch (Exception e) {
+ LOG.error("Failed to close ConnectionManager", e);
+ }
}
@SuppressWarnings("checkstyle:IllegalCatch")