import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
+import java.util.Map;
+import java.util.concurrent.SynchronousQueue;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import javax.annotation.Nonnull;
import javax.management.InstanceAlreadyExistsException;
import javax.management.MBeanRegistrationException;
import javax.management.MBeanServer;
import org.opendaylight.controller.md.sal.binding.api.NotificationService;
import org.opendaylight.controller.md.sal.common.api.clustering.EntityOwnershipService;
import org.opendaylight.controller.sal.binding.api.RpcProviderRegistry;
+import org.opendaylight.mdsal.singleton.common.api.ClusterSingletonServiceProvider;
import org.opendaylight.openflowjava.protocol.spi.connection.SwitchConnectionProvider;
import org.opendaylight.openflowplugin.api.openflow.OpenFlowPluginProvider;
import org.opendaylight.openflowplugin.api.openflow.connection.ConnectionManager;
import org.opendaylight.openflowplugin.api.openflow.device.DeviceManager;
+import org.opendaylight.openflowplugin.api.openflow.lifecycle.LifecycleConductor;
import org.opendaylight.openflowplugin.api.openflow.role.RoleManager;
import org.opendaylight.openflowplugin.api.openflow.rpc.RpcManager;
import org.opendaylight.openflowplugin.api.openflow.statistics.StatisticsManager;
import org.opendaylight.openflowplugin.impl.statistics.ofpspecific.MessageIntelligenceAgencyImpl;
import org.opendaylight.openflowplugin.impl.statistics.ofpspecific.MessageIntelligenceAgencyMXBean;
import org.opendaylight.openflowplugin.impl.util.TranslatorLibraryUtil;
+import org.opendaylight.openflowplugin.openflow.md.core.ThreadPoolLoggingExecutor;
import org.opendaylight.openflowplugin.openflow.md.core.extension.ExtensionConverterManagerImpl;
+import org.opendaylight.openflowplugin.openflow.md.core.sal.convertor.ConvertorManager;
+import org.opendaylight.openflowplugin.openflow.md.core.sal.convertor.ConvertorManagerFactory;
import org.opendaylight.openflowplugin.openflow.md.core.session.OFSessionUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-/**
- * Created by Martin Bobak <mbobak@cisco.com> on 27.3.2015.
- */
public class OpenFlowPluginProviderImpl implements OpenFlowPluginProvider, OpenFlowPluginExtensionRegistratorProvider {
private static final Logger LOG = LoggerFactory.getLogger(OpenFlowPluginProviderImpl.class);
private final int rpcRequestsQuota;
private final long globalNotificationQuota;
+ private final ConvertorManager convertorManager;
private long barrierInterval;
private int barrierCountLimit;
private long echoReplyTimeout;
private boolean switchFeaturesMandatory = false;
private boolean isStatisticsPollingOff = false;
private boolean isStatisticsRpcEnabled;
+ private boolean isNotificationFlowRemovedOff = false;
+
+ private final LifecycleConductor conductor;
+ private final ThreadPoolExecutor threadPool;
+ private ClusterSingletonServiceProvider singletonServicesProvider;
- public OpenFlowPluginProviderImpl(final long rpcRequestsQuota, final Long globalNotificationQuota) {
+ public OpenFlowPluginProviderImpl(final long rpcRequestsQuota,
+ final long globalNotificationQuota,
+ final int threadPoolMinThreads,
+ final int threadPoolMaxThreads,
+ final long threadPoolTimeout) {
Preconditions.checkArgument(rpcRequestsQuota > 0 && rpcRequestsQuota <= Integer.MAX_VALUE, "rpcRequestQuota has to be in range <1,%s>", Integer.MAX_VALUE);
this.rpcRequestsQuota = (int) rpcRequestsQuota;
this.globalNotificationQuota = Preconditions.checkNotNull(globalNotificationQuota);
+
+ // Creates a thread pool that creates new threads as needed, but will reuse previously
+ // constructed threads when they are available.
+ // Threads that have not been used for x seconds are terminated and removed from the cache.
+ threadPool = new ThreadPoolLoggingExecutor(
+ Preconditions.checkNotNull(threadPoolMinThreads),
+ Preconditions.checkNotNull(threadPoolMaxThreads),
+ Preconditions.checkNotNull(threadPoolTimeout), TimeUnit.SECONDS,
+ new SynchronousQueue<>(), "ofppool");
+
+ convertorManager = ConvertorManagerFactory.createDefaultManager();
+ conductor = new LifecycleConductorImpl(messageIntelligenceAgency, convertorManager);
}
@Override
}
@Override
- public void onFailure(final Throwable t) {
+ public void onFailure(@Nonnull final Throwable t) {
LOG.warn("Some switchConnectionProviders failed to start.", t);
}
});
}
@Override
- public void setBarrierCountLimit(int barrierCountLimit) {
+ public void setBarrierCountLimit(final int barrierCountLimit) {
this.barrierCountLimit = barrierCountLimit;
}
@Override
- public void setBarrierInterval(long barrierTimeoutLimit) {
+ public void setBarrierInterval(final long barrierTimeoutLimit) {
this.barrierInterval = barrierTimeoutLimit;
}
@Override
- public void setEchoReplyTimeout(long echoReplyTimeout) {
+ public void setEchoReplyTimeout(final long echoReplyTimeout) {
this.echoReplyTimeout = echoReplyTimeout;
}
+ @Override
+ public void setNotificationFlowRemovedOff(boolean isNotificationFlowRemovedOff) {
+ this.isNotificationFlowRemovedOff = isNotificationFlowRemovedOff;
+ }
+
+ public void setClusteringSingletonServicesProvider(ClusterSingletonServiceProvider singletonServicesProvider) {
+ this.singletonServicesProvider = singletonServicesProvider;
+ }
+
@Override
public void setSwitchFeaturesMandatory(final boolean switchFeaturesMandatory) {
@Override
public void initialize() {
-
Preconditions.checkNotNull(dataBroker, "missing data broker");
Preconditions.checkNotNull(rpcProviderRegistry, "missing RPC provider registry");
Preconditions.checkNotNull(notificationProviderService, "missing notification provider service");
+ Preconditions.checkNotNull(singletonServicesProvider, "missing singleton services provider");
extensionConverterManager = new ExtensionConverterManagerImpl();
// TODO: copied from OpenFlowPluginProvider (Helium) misusesing the old way of distributing extension converters
// TODO: rewrite later!
OFSessionUtil.getSessionManager().setExtensionConverterProvider(extensionConverterManager);
- connectionManager = new ConnectionManagerImpl(echoReplyTimeout);
+ connectionManager = new ConnectionManagerImpl(echoReplyTimeout, threadPool);
registerMXBean(messageIntelligenceAgency);
- deviceManager = new DeviceManagerImpl(dataBroker, messageIntelligenceAgency, globalNotificationQuota,
- switchFeaturesMandatory, barrierInterval, barrierCountLimit);
+ deviceManager = new DeviceManagerImpl(dataBroker,
+ globalNotificationQuota,
+ switchFeaturesMandatory,
+ barrierInterval,
+ barrierCountLimit,
+ conductor,
+ isNotificationFlowRemovedOff,
+ convertorManager,
+ singletonServicesProvider);
+ ((ExtensionConverterProviderKeeper) conductor).setExtensionConverterProvider(extensionConverterManager);
((ExtensionConverterProviderKeeper) deviceManager).setExtensionConverterProvider(extensionConverterManager);
- roleManager = new RoleManagerImpl(entityOwnershipService, dataBroker, switchFeaturesMandatory);
- statisticsManager = new StatisticsManagerImpl(rpcProviderRegistry, isStatisticsPollingOff);
- rpcManager = new RpcManagerImpl(rpcProviderRegistry, rpcRequestsQuota);
+ conductor.setSafelyManager(deviceManager);
+ conductor.setNotificationPublishService(notificationPublishService);
+
+ statisticsManager = new StatisticsManagerImpl(rpcProviderRegistry, isStatisticsPollingOff, conductor, convertorManager);
+ roleManager = new RoleManagerImpl(dataBroker, conductor);
+ conductor.setSafelyManager(statisticsManager);
+
+ rpcManager = new RpcManagerImpl(rpcProviderRegistry, rpcRequestsQuota, conductor, extensionConverterManager, convertorManager, notificationPublishService);
+ conductor.setSafelyManager(rpcManager);
+
+ /* Initialization Phase ordering - OFP Device Context suite */
// CM -> DM -> SM -> RPC -> Role -> DM
connectionManager.setDeviceConnectedHandler(deviceManager);
deviceManager.setDeviceInitializationPhaseHandler(statisticsManager);
rpcManager.setDeviceInitializationPhaseHandler(roleManager);
roleManager.setDeviceInitializationPhaseHandler(deviceManager);
- rpcManager.setStatisticsRpcEnabled(isStatisticsRpcEnabled);
- rpcManager.setNotificationPublishService(notificationPublishService);
+ /* Termination Phase ordering - OFP Device Context suite */
+ deviceManager.setDeviceTerminationPhaseHandler(rpcManager);
+ rpcManager.setDeviceTerminationPhaseHandler(statisticsManager);
+ statisticsManager.setDeviceTerminationPhaseHandler(roleManager);
+ roleManager.setDeviceTerminationPhaseHandler(deviceManager);
- deviceManager.setNotificationService(this.notificationProviderService);
- deviceManager.setNotificationPublishService(this.notificationPublishService);
+ rpcManager.setStatisticsRpcEnabled(isStatisticsRpcEnabled);
- TranslatorLibraryUtil.setBasicTranslatorLibrary(deviceManager);
+ TranslatorLibraryUtil.injectBasicTranslatorLibrary(deviceManager, convertorManager);
deviceManager.initialize();
startSwitchConnections();
}
+ @Override
+ public void update(Map<String,Object> props) {
+ LOG.debug("Update managed properties = {}", props.toString());
+ if(deviceManager != null && props.containsKey("notification-flow-removed-off")) {
+ deviceManager.setIsNotificationFlowRemovedOff(Boolean.valueOf(props.get("notification-flow-removed-off").toString()));
+ }
+ }
+
private static void registerMXBean(final MessageIntelligenceAgency messageIntelligenceAgency) {
final MBeanServer mbs = ManagementFactory.getPlatformMBeanServer();
try {
// TODO: needs to close org.opendaylight.openflowplugin.impl.role.OpenflowOwnershipListener after RoleContexts are down
// TODO: must not be executed prior to all living RoleContexts have been closed (via closing living DeviceContexts)
roleManager.close();
+
+ // Manually shutdown all remaining running threads in pool
+ threadPool.shutdown();
}
}