* terms of the Eclipse Public License v1.0 which accompanies this distribution,
* and is available at http://www.eclipse.org/legal/epl-v10.html
*/
-
package org.opendaylight.openflowplugin.impl;
+import com.google.common.annotations.VisibleForTesting;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
+import com.google.common.util.concurrent.SettableFuture;
import io.netty.util.HashedWheelTimer;
import io.netty.util.Timer;
import java.lang.management.ManagementFactory;
import java.util.Collection;
import java.util.List;
-import java.util.Objects;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.stream.Collectors;
-import javax.annotation.Nonnull;
+import javax.annotation.PostConstruct;
+import javax.annotation.PreDestroy;
+import javax.inject.Inject;
+import javax.inject.Singleton;
import javax.management.InstanceAlreadyExistsException;
import javax.management.InstanceNotFoundException;
import javax.management.MBeanRegistrationException;
import javax.management.MalformedObjectNameException;
import javax.management.NotCompliantMBeanException;
import javax.management.ObjectName;
-import org.opendaylight.controller.md.sal.binding.api.DataBroker;
-import org.opendaylight.controller.md.sal.binding.api.NotificationPublishService;
-import org.opendaylight.controller.sal.binding.api.RpcProviderRegistry;
+import org.apache.aries.blueprint.annotation.service.Reference;
+import org.apache.aries.blueprint.annotation.service.Service;
import org.opendaylight.infrautils.diagstatus.ServiceState;
import org.opendaylight.infrautils.ready.SystemReadyListener;
import org.opendaylight.infrautils.ready.SystemReadyMonitor;
+import org.opendaylight.mdsal.binding.api.DataBroker;
+import org.opendaylight.mdsal.binding.api.NotificationPublishService;
+import org.opendaylight.mdsal.binding.api.RpcProviderService;
import org.opendaylight.mdsal.eos.binding.api.EntityOwnershipService;
import org.opendaylight.mdsal.singleton.common.api.ClusterSingletonServiceProvider;
+import org.opendaylight.openflowjava.protocol.api.connection.OpenflowDiagStatusProvider;
import org.opendaylight.openflowjava.protocol.spi.connection.SwitchConnectionProvider;
-import org.opendaylight.openflowplugin.api.diagstatus.OpenflowPluginDiagStatusProvider;
+import org.opendaylight.openflowjava.protocol.spi.connection.SwitchConnectionProviderList;
import org.opendaylight.openflowplugin.api.openflow.OpenFlowPluginProvider;
import org.opendaylight.openflowplugin.api.openflow.configuration.ConfigurationService;
import org.opendaylight.openflowplugin.api.openflow.connection.ConnectionManager;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+@Singleton
+@Service(classes = { OpenFlowPluginProvider.class, OpenFlowPluginExtensionRegistratorProvider.class })
public class OpenFlowPluginProviderImpl implements
OpenFlowPluginProvider,
OpenFlowPluginExtensionRegistratorProvider,
private final Collection<SwitchConnectionProvider> switchConnectionProviders;
private final DeviceInitializerProvider deviceInitializerProvider;
private final ConvertorManager convertorManager;
- private final RpcProviderRegistry rpcProviderRegistry;
+ private final RpcProviderService rpcProviderRegistry;
private final ClusterSingletonServiceProvider singletonServicesProvider;
private final OpenflowProviderConfig config;
private final EntityOwnershipService entityOwnershipService;
private ConnectionManager connectionManager;
private ListeningExecutorService executorService;
private ContextChainHolderImpl contextChainHolder;
- private OpenflowPluginDiagStatusProvider openflowPluginStatusMonitor;
+ private final OpenflowDiagStatusProvider openflowDiagStatusProvider;
+ private final SystemReadyMonitor systemReadyMonitor;
+ private final SettableFuture<Void> fullyStarted = SettableFuture.create();
+ private static final String OPENFLOW_SERVICE_NAME = "OPENFLOW";
public static MessageIntelligenceAgency getMessageIntelligenceAgency() {
return MESSAGE_INTELLIGENCE_AGENCY;
}
- OpenFlowPluginProviderImpl(final ConfigurationService configurationService,
- final List<SwitchConnectionProvider> switchConnectionProviders,
- final DataBroker dataBroker,
- final RpcProviderRegistry rpcProviderRegistry,
- final NotificationPublishService notificationPublishService,
- final ClusterSingletonServiceProvider singletonServiceProvider,
- final EntityOwnershipService entityOwnershipService,
+ @Inject
+ public OpenFlowPluginProviderImpl(final ConfigurationService configurationService,
+ final SwitchConnectionProviderList switchConnectionProviders,
+ final PingPongDataBroker pingPongDataBroker,
+ final @Reference RpcProviderService rpcProviderRegistry,
+ final @Reference NotificationPublishService notificationPublishService,
+ final @Reference ClusterSingletonServiceProvider singletonServiceProvider,
+ final @Reference EntityOwnershipService entityOwnershipService,
final MastershipChangeServiceManager mastershipChangeServiceManager,
- final OpenflowPluginDiagStatusProvider openflowPluginStatusMonitor,
- final SystemReadyMonitor systemReadyMonitor) {
+ final @Reference OpenflowDiagStatusProvider openflowDiagStatusProvider,
+ final @Reference SystemReadyMonitor systemReadyMonitor) {
this.switchConnectionProviders = switchConnectionProviders;
- this.dataBroker = dataBroker;
+ this.dataBroker = pingPongDataBroker;
this.rpcProviderRegistry = rpcProviderRegistry;
this.notificationPublishService = notificationPublishService;
this.singletonServicesProvider = singletonServiceProvider;
deviceInitializerProvider = DeviceInitializerProviderFactory.createDefaultProvider();
config = new OpenFlowProviderConfigImpl(configurationService);
this.mastershipChangeServiceManager = mastershipChangeServiceManager;
- this.openflowPluginStatusMonitor = openflowPluginStatusMonitor;
- systemReadyMonitor.registerListener(this);
- LOG.debug("registered onSystemBootReady() listener for deferred startSwitchConnections()");
+ this.openflowDiagStatusProvider = openflowDiagStatusProvider;
+ this.systemReadyMonitor = systemReadyMonitor;
}
@Override
public void onSystemBootReady() {
- LOG.debug("onSystemBootReady() received, starting the switch connections");
- startSwitchConnections();
- }
-
- private void startSwitchConnections() {
+ LOG.info("onSystemBootReady() received, starting the switch connections");
Futures.addCallback(Futures.allAsList(switchConnectionProviders.stream().map(switchConnectionProvider -> {
// Inject OpenFlowPlugin custom serializers and deserializers into OpenFlowJava
if (config.isUseSingleLayerSerialization()) {
- SerializerInjector.injectSerializers(switchConnectionProvider);
+ SerializerInjector.injectSerializers(switchConnectionProvider,
+ switchConnectionProvider.getConfiguration().isGroupAddModEnabled());
DeserializerInjector.injectDeserializers(switchConnectionProvider);
} else {
DeserializerInjector.revertDeserializers(switchConnectionProvider);
@Override
public void onSuccess(final List<Boolean> result) {
LOG.info("All switchConnectionProviders are up and running ({}).", result.size());
- openflowPluginStatusMonitor.reportStatus(ServiceState.OPERATIONAL, "switch connections started");
+ openflowDiagStatusProvider.reportStatus(OPENFLOW_SERVICE_NAME, ServiceState.OPERATIONAL);
+ fullyStarted.set(null);
}
@Override
- public void onFailure(@Nonnull final Throwable throwable) {
+ public void onFailure(final Throwable throwable) {
LOG.warn("Some switchConnectionProviders failed to start.", throwable);
- openflowPluginStatusMonitor.reportStatus(ServiceState.ERROR, "some switch connections failed to start");
+ openflowDiagStatusProvider.reportStatus(OPENFLOW_SERVICE_NAME, throwable);
+ fullyStarted.setException(throwable);
}
}, MoreExecutors.directExecutor());
}
+ @VisibleForTesting
+ public Future<Void> getFullyStarted() {
+ return fullyStarted;
+ }
+
private ListenableFuture<List<Boolean>> shutdownSwitchConnections() {
final ListenableFuture<List<Boolean>> listListenableFuture =
Futures.allAsList(switchConnectionProviders.stream().map(switchConnectionProvider -> {
}
@Override
- public void onFailure(@Nonnull final Throwable throwable) {
+ public void onFailure(final Throwable throwable) {
LOG.warn("Some switchConnectionProviders failed to shutdown.", throwable);
}
}, MoreExecutors.directExecutor());
}
@Override
+ @PostConstruct
public void initialize() {
registerMXBean(MESSAGE_INTELLIGENCE_AGENCY, MESSAGE_INTELLIGENCE_AGENCY_MX_BEAN_NAME);
// constructed threads when they are available.
// Threads that have not been used for x seconds are terminated and removed from the cache.
executorService = MoreExecutors.listeningDecorator(new ThreadPoolLoggingExecutor(
- config.getThreadPoolMinThreads(),
- config.getThreadPoolMaxThreads().getValue(),
- config.getThreadPoolTimeout(),
+ config.getThreadPoolMinThreads().toJava(),
+ config.getThreadPoolMaxThreads().getValue().toJava(),
+ config.getThreadPoolTimeout().toJava(),
TimeUnit.SECONDS, new SynchronousQueue<>(), POOL_NAME));
deviceManager = new DeviceManagerImpl(
notificationPublishService,
hashedWheelTimer,
convertorManager,
- deviceInitializerProvider);
+ deviceInitializerProvider,
+ executorService);
TranslatorLibraryUtil.injectBasicTranslatorLibrary(deviceManager, convertorManager);
((ExtensionConverterProviderKeeper) deviceManager).setExtensionConverterProvider(extensionConverterManager);
convertorManager,
executorService);
- roleManager = new RoleManagerImpl(hashedWheelTimer);
+ roleManager = new RoleManagerImpl(hashedWheelTimer, config, executorService);
contextChainHolder = new ContextChainHolderImpl(
executorService,
singletonServicesProvider,
entityOwnershipService,
- mastershipChangeServiceManager);
+ mastershipChangeServiceManager,
+ config);
contextChainHolder.addManager(deviceManager);
contextChainHolder.addManager(statisticsManager);
contextChainHolder.addManager(rpcManager);
contextChainHolder.addManager(roleManager);
- connectionManager = new ConnectionManagerImpl(config, executorService);
+ connectionManager = new ConnectionManagerImpl(config, executorService, dataBroker, notificationPublishService);
connectionManager.setDeviceConnectedHandler(contextChainHolder);
connectionManager.setDeviceDisconnectedHandler(contextChainHolder);
+ deviceManager.setContextChainHolder(contextChainHolder);
deviceManager.initialize();
+ systemReadyMonitor.registerListener(this);
+ LOG.info("registered onSystemBootReady() listener for deferred startSwitchConnections()");
}
@Override
}
@Override
+ @PreDestroy
+ @SuppressWarnings("checkstyle:IllegalCatch")
public void close() {
try {
shutdownSwitchConnections().get(10, TimeUnit.SECONDS);
} catch (InterruptedException | ExecutionException | TimeoutException e) {
- LOG.warn("Failed to shut down switch connections in time {}s, error: {}", 10, e);
+ LOG.warn("Failed to shut down switch connections in time {}s", 10, e);
}
gracefulShutdown(contextChainHolder);
+ gracefulShutdown(connectionManager);
gracefulShutdown(deviceManager);
gracefulShutdown(rpcManager);
gracefulShutdown(statisticsManager);
gracefulShutdown(executorService);
gracefulShutdown(hashedWheelTimer);
unregisterMXBean(MESSAGE_INTELLIGENCE_AGENCY_MX_BEAN_NAME);
- openflowPluginStatusMonitor.reportStatus(ServiceState.UNREGISTERED, "service shutting down");
+ openflowDiagStatusProvider.reportStatus(ServiceState.UNREGISTERED);
+ try {
+ if (connectionManager != null) {
+ connectionManager.close();
+ connectionManager = null;
+ }
+ } catch (Exception e) {
+ LOG.error("Failed to close ConnectionManager", e);
+ }
}
@SuppressWarnings("checkstyle:IllegalCatch")
private static void gracefulShutdown(final AutoCloseable closeable) {
- if (Objects.isNull(closeable)) {
- return;
- }
-
- try {
- closeable.close();
- } catch (Exception e) {
- LOG.warn("Failed to shutdown {} gracefully.", closeable);
+ if (closeable != null) {
+ try {
+ closeable.close();
+ } catch (Exception e) {
+ LOG.warn("Failed to shutdown {} gracefully.", closeable);
+ }
}
}
private static void gracefulShutdown(final Timer timer) {
- if (Objects.isNull(timer)) {
- return;
- }
-
- try {
- timer.stop();
- } catch (IllegalStateException e) {
- LOG.warn("Failed to shutdown {} gracefully.", timer);
+ if (timer != null) {
+ try {
+ timer.stop();
+ } catch (IllegalStateException e) {
+ LOG.warn("Failed to shutdown {} gracefully.", timer);
+ }
}
}
private static void gracefulShutdown(final ExecutorService executorService) {
- if (Objects.isNull(executorService)) {
- return;
- }
-
- try {
- executorService.shutdownNow();
- } catch (SecurityException e) {
- LOG.warn("Failed to shutdown {} gracefully.", executorService);
+ if (executorService != null) {
+ try {
+ executorService.shutdownNow();
+ } catch (SecurityException e) {
+ LOG.warn("Failed to shutdown {} gracefully.", executorService);
+ }
}
}
| NotCompliantMBeanException
| MBeanRegistrationException
| InstanceAlreadyExistsException e) {
- LOG.warn("Error registering MBean {}", e);
+ LOG.warn("Error registering MBean {}", beanName, e);
}
}
} catch (InstanceNotFoundException
| MBeanRegistrationException
| MalformedObjectNameException e) {
- LOG.warn("Error unregistering MBean {}", e);
+ LOG.warn("Error unregistering MBean {}", beanName, e);
}
}
}