X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;f=openflowplugin-impl%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fopenflowplugin%2Fimpl%2FOpenFlowPluginProviderImpl.java;h=ba8f806c083633de38305fc5cef8122a4c72df37;hb=333c60b53ceb66c32e68b5e7a9b96d3fd49c3002;hp=4908331a52c274ff284839b3dd4ae0edc1019a13;hpb=0d5980ab7f1b45b5226e8d235fd4672e64dfbbe4;p=openflowplugin.git diff --git a/openflowplugin-impl/src/main/java/org/opendaylight/openflowplugin/impl/OpenFlowPluginProviderImpl.java b/openflowplugin-impl/src/main/java/org/opendaylight/openflowplugin/impl/OpenFlowPluginProviderImpl.java index 4908331a52..ba8f806c08 100644 --- a/openflowplugin-impl/src/main/java/org/opendaylight/openflowplugin/impl/OpenFlowPluginProviderImpl.java +++ b/openflowplugin-impl/src/main/java/org/opendaylight/openflowplugin/impl/OpenFlowPluginProviderImpl.java @@ -17,6 +17,11 @@ import java.lang.management.ManagementFactory; import java.util.ArrayList; import java.util.Collection; import java.util.List; +import java.util.Map; +import java.util.concurrent.SynchronousQueue; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; +import javax.annotation.Nonnull; import javax.management.InstanceAlreadyExistsException; import javax.management.MBeanRegistrationException; import javax.management.MBeanServer; @@ -28,10 +33,13 @@ import org.opendaylight.controller.md.sal.binding.api.NotificationPublishService import org.opendaylight.controller.md.sal.binding.api.NotificationService; import org.opendaylight.controller.md.sal.common.api.clustering.EntityOwnershipService; import org.opendaylight.controller.sal.binding.api.RpcProviderRegistry; +import org.opendaylight.mdsal.singleton.common.api.ClusterSingletonServiceProvider; import org.opendaylight.openflowjava.protocol.spi.connection.SwitchConnectionProvider; import org.opendaylight.openflowplugin.api.openflow.OpenFlowPluginProvider; import org.opendaylight.openflowplugin.api.openflow.connection.ConnectionManager; import org.opendaylight.openflowplugin.api.openflow.device.DeviceManager; +import org.opendaylight.openflowplugin.api.openflow.lifecycle.LifecycleConductor; +import org.opendaylight.openflowplugin.api.openflow.lifecycle.RoleChangeListener; import org.opendaylight.openflowplugin.api.openflow.role.RoleManager; import org.opendaylight.openflowplugin.api.openflow.rpc.RpcManager; import org.opendaylight.openflowplugin.api.openflow.statistics.StatisticsManager; @@ -48,14 +56,14 @@ import org.opendaylight.openflowplugin.impl.statistics.StatisticsManagerImpl; import org.opendaylight.openflowplugin.impl.statistics.ofpspecific.MessageIntelligenceAgencyImpl; import org.opendaylight.openflowplugin.impl.statistics.ofpspecific.MessageIntelligenceAgencyMXBean; import org.opendaylight.openflowplugin.impl.util.TranslatorLibraryUtil; +import org.opendaylight.openflowplugin.openflow.md.core.ThreadPoolLoggingExecutor; import org.opendaylight.openflowplugin.openflow.md.core.extension.ExtensionConverterManagerImpl; +import org.opendaylight.openflowplugin.openflow.md.core.sal.convertor.ConvertorManager; +import org.opendaylight.openflowplugin.openflow.md.core.sal.convertor.ConvertorManagerFactory; import org.opendaylight.openflowplugin.openflow.md.core.session.OFSessionUtil; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -/** - * Created by Martin Bobak <mbobak@cisco.com> on 27.3.2015. - */ public class OpenFlowPluginProviderImpl implements OpenFlowPluginProvider, OpenFlowPluginExtensionRegistratorProvider { private static final Logger LOG = LoggerFactory.getLogger(OpenFlowPluginProviderImpl.class); @@ -63,6 +71,10 @@ public class OpenFlowPluginProviderImpl implements OpenFlowPluginProvider, OpenF private final int rpcRequestsQuota; private final long globalNotificationQuota; + private final ConvertorManager convertorManager; + private long barrierInterval; + private int barrierCountLimit; + private long echoReplyTimeout; private DeviceManager deviceManager; private RoleManager roleManager; private RpcManager rpcManager; @@ -80,11 +92,33 @@ public class OpenFlowPluginProviderImpl implements OpenFlowPluginProvider, OpenF private boolean switchFeaturesMandatory = false; private boolean isStatisticsPollingOff = false; private boolean isStatisticsRpcEnabled; - - public OpenFlowPluginProviderImpl(final long rpcRequestsQuota, final Long globalNotificationQuota) { + private boolean isNotificationFlowRemovedOff = false; + private Map managedProperties; + + private final LifecycleConductor conductor; + private final ThreadPoolExecutor threadPool; + private ClusterSingletonServiceProvider singletonServicesProvider; + + public OpenFlowPluginProviderImpl(final long rpcRequestsQuota, + final long globalNotificationQuota, + final int threadPoolMinThreads, + final int threadPoolMaxThreads, + final long threadPoolTimeout) { Preconditions.checkArgument(rpcRequestsQuota > 0 && rpcRequestsQuota <= Integer.MAX_VALUE, "rpcRequestQuota has to be in range <1,%s>", Integer.MAX_VALUE); this.rpcRequestsQuota = (int) rpcRequestsQuota; this.globalNotificationQuota = Preconditions.checkNotNull(globalNotificationQuota); + + // Creates a thread pool that creates new threads as needed, but will reuse previously + // constructed threads when they are available. + // Threads that have not been used for x seconds are terminated and removed from the cache. + threadPool = new ThreadPoolLoggingExecutor( + Preconditions.checkNotNull(threadPoolMinThreads), + Preconditions.checkNotNull(threadPoolMaxThreads), + Preconditions.checkNotNull(threadPoolTimeout), TimeUnit.SECONDS, + new SynchronousQueue<>(), "ofppool"); + + convertorManager = ConvertorManagerFactory.createDefaultManager(); + conductor = new LifecycleConductorImpl(messageIntelligenceAgency, convertorManager); } @Override @@ -114,7 +148,7 @@ public class OpenFlowPluginProviderImpl implements OpenFlowPluginProvider, OpenF } @Override - public void onFailure(final Throwable t) { + public void onFailure(@Nonnull final Throwable t) { LOG.warn("Some switchConnectionProviders failed to start.", t); } }); @@ -130,6 +164,31 @@ public class OpenFlowPluginProviderImpl implements OpenFlowPluginProvider, OpenF this.entityOwnershipService = entityOwnershipService; } + @Override + public void setBarrierCountLimit(final int barrierCountLimit) { + this.barrierCountLimit = barrierCountLimit; + } + + @Override + public void setBarrierInterval(final long barrierTimeoutLimit) { + this.barrierInterval = barrierTimeoutLimit; + } + + @Override + public void setEchoReplyTimeout(final long echoReplyTimeout) { + this.echoReplyTimeout = echoReplyTimeout; + } + + @Override + public void setNotificationFlowRemovedOff(boolean isNotificationFlowRemovedOff) { + this.isNotificationFlowRemovedOff = isNotificationFlowRemovedOff; + } + + public void setClusteringSingletonServicesProvider(ClusterSingletonServiceProvider singletonServicesProvider) { + this.singletonServicesProvider = singletonServicesProvider; + } + + @Override public void setSwitchFeaturesMandatory(final boolean switchFeaturesMandatory) { this.switchFeaturesMandatory = switchFeaturesMandatory; @@ -156,44 +215,76 @@ public class OpenFlowPluginProviderImpl implements OpenFlowPluginProvider, OpenF @Override public void initialize() { - Preconditions.checkNotNull(dataBroker, "missing data broker"); Preconditions.checkNotNull(rpcProviderRegistry, "missing RPC provider registry"); Preconditions.checkNotNull(notificationProviderService, "missing notification provider service"); + Preconditions.checkNotNull(singletonServicesProvider, "missing singleton services provider"); extensionConverterManager = new ExtensionConverterManagerImpl(); // TODO: copied from OpenFlowPluginProvider (Helium) misusesing the old way of distributing extension converters // TODO: rewrite later! OFSessionUtil.getSessionManager().setExtensionConverterProvider(extensionConverterManager); - connectionManager = new ConnectionManagerImpl(); + connectionManager = new ConnectionManagerImpl(echoReplyTimeout, threadPool); registerMXBean(messageIntelligenceAgency); - deviceManager = new DeviceManagerImpl(dataBroker, messageIntelligenceAgency, switchFeaturesMandatory, globalNotificationQuota); + deviceManager = new DeviceManagerImpl(dataBroker, + globalNotificationQuota, + switchFeaturesMandatory, + barrierInterval, + barrierCountLimit, + conductor, + isNotificationFlowRemovedOff, + convertorManager, + singletonServicesProvider); + ((ExtensionConverterProviderKeeper) conductor).setExtensionConverterProvider(extensionConverterManager); ((ExtensionConverterProviderKeeper) deviceManager).setExtensionConverterProvider(extensionConverterManager); - roleManager = new RoleManagerImpl(rpcProviderRegistry, entityOwnershipService, switchFeaturesMandatory); - statisticsManager = new StatisticsManagerImpl(rpcProviderRegistry, isStatisticsPollingOff); - rpcManager = new RpcManagerImpl(rpcProviderRegistry, rpcRequestsQuota); - // CM -> DM -> Role -> SM -> RPC -> DM + conductor.setSafelyManager(deviceManager); + conductor.setNotificationPublishService(notificationPublishService); + + roleManager = new RoleManagerImpl(entityOwnershipService, dataBroker, conductor); + statisticsManager = new StatisticsManagerImpl(rpcProviderRegistry, isStatisticsPollingOff, conductor, convertorManager); + conductor.setSafelyManager(statisticsManager); + + rpcManager = new RpcManagerImpl(rpcProviderRegistry, rpcRequestsQuota, conductor, extensionConverterManager, convertorManager); + conductor.setSafelyManager(rpcManager); + + roleManager.addRoleChangeListener((RoleChangeListener) conductor); + + /* Initialization Phase ordering - OFP Device Context suite */ + // CM -> DM -> SM -> RPC -> Role -> DM connectionManager.setDeviceConnectedHandler(deviceManager); - deviceManager.setDeviceInitializationPhaseHandler(roleManager); - roleManager.setDeviceInitializationPhaseHandler(statisticsManager); + deviceManager.setDeviceInitializationPhaseHandler(statisticsManager); statisticsManager.setDeviceInitializationPhaseHandler(rpcManager); - rpcManager.setDeviceInitializationPhaseHandler(deviceManager); - rpcManager.setStatisticsRpcEnabled(isStatisticsRpcEnabled); - rpcManager.setNotificationPublishService(notificationPublishService); + rpcManager.setDeviceInitializationPhaseHandler(roleManager); + roleManager.setDeviceInitializationPhaseHandler(deviceManager); + + /* Termination Phase ordering - OFP Device Context suite */ + deviceManager.setDeviceTerminationPhaseHandler(rpcManager); + rpcManager.setDeviceTerminationPhaseHandler(statisticsManager); + statisticsManager.setDeviceTerminationPhaseHandler(roleManager); + roleManager.setDeviceTerminationPhaseHandler(deviceManager); - deviceManager.setNotificationService(this.notificationProviderService); - deviceManager.setNotificationPublishService(this.notificationPublishService); + rpcManager.setStatisticsRpcEnabled(isStatisticsRpcEnabled); - TranslatorLibraryUtil.setBasicTranslatorLibrary(deviceManager); + TranslatorLibraryUtil.injectBasicTranslatorLibrary(deviceManager, convertorManager); deviceManager.initialize(); startSwitchConnections(); } + @Override + public void update(Map props) { + LOG.debug("Update managed properties = {}", props.toString()); + this.managedProperties = props; + + if(deviceManager != null && props.containsKey("notification-flow-removed-off")) { + deviceManager.setIsNotificationFlowRemovedOff(Boolean.valueOf(props.get("notification-flow-removed-off").toString())); + } + } + private static void registerMXBean(final MessageIntelligenceAgency messageIntelligenceAgency) { final MBeanServer mbs = ManagementFactory.getPlatformMBeanServer(); try { @@ -232,9 +323,16 @@ public class OpenFlowPluginProviderImpl implements OpenFlowPluginProvider, OpenF @Override public void close() throws Exception { + //TODO: consider wrapping each manager into try-catch deviceManager.close(); rpcManager.close(); statisticsManager.close(); + + // TODO: needs to close org.opendaylight.openflowplugin.impl.role.OpenflowOwnershipListener after RoleContexts are down + // TODO: must not be executed prior to all living RoleContexts have been closed (via closing living DeviceContexts) roleManager.close(); + + // Manually shutdown all remaining running threads in pool + threadPool.shutdown(); } }