X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;f=openflowplugin-impl%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fopenflowplugin%2Fimpl%2FOpenFlowPluginProviderImpl.java;h=41bee01d5975d497f3a25b445eba4b9b47c9645f;hb=74500b868f8cdce042c53d7bdd994a9c7ff7314c;hp=646a5b78d8524929aaceed64132f24904b96de61;hpb=bbf47715c938b4c40e7a6389872b07fd29b6638a;p=openflowplugin.git diff --git a/openflowplugin-impl/src/main/java/org/opendaylight/openflowplugin/impl/OpenFlowPluginProviderImpl.java b/openflowplugin-impl/src/main/java/org/opendaylight/openflowplugin/impl/OpenFlowPluginProviderImpl.java index 646a5b78d8..41bee01d59 100644 --- a/openflowplugin-impl/src/main/java/org/opendaylight/openflowplugin/impl/OpenFlowPluginProviderImpl.java +++ b/openflowplugin-impl/src/main/java/org/opendaylight/openflowplugin/impl/OpenFlowPluginProviderImpl.java @@ -13,10 +13,16 @@ import com.google.common.base.Preconditions; import com.google.common.util.concurrent.FutureCallback; import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; +import io.netty.util.HashedWheelTimer; import java.lang.management.ManagementFactory; import java.util.ArrayList; import java.util.Collection; import java.util.List; +import java.util.Map; +import java.util.concurrent.SynchronousQueue; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; +import javax.annotation.Nonnull; import javax.management.InstanceAlreadyExistsException; import javax.management.MBeanRegistrationException; import javax.management.MBeanServer; @@ -26,8 +32,8 @@ import javax.management.ObjectName; import org.opendaylight.controller.md.sal.binding.api.DataBroker; import org.opendaylight.controller.md.sal.binding.api.NotificationPublishService; import org.opendaylight.controller.md.sal.binding.api.NotificationService; -import org.opendaylight.controller.md.sal.common.api.clustering.EntityOwnershipService; import org.opendaylight.controller.sal.binding.api.RpcProviderRegistry; +import org.opendaylight.mdsal.singleton.common.api.ClusterSingletonServiceProvider; import org.opendaylight.openflowjava.protocol.spi.connection.SwitchConnectionProvider; import org.opendaylight.openflowplugin.api.openflow.OpenFlowPluginProvider; import org.opendaylight.openflowplugin.api.openflow.connection.ConnectionManager; @@ -36,8 +42,10 @@ import org.opendaylight.openflowplugin.api.openflow.role.RoleManager; import org.opendaylight.openflowplugin.api.openflow.rpc.RpcManager; import org.opendaylight.openflowplugin.api.openflow.statistics.StatisticsManager; import org.opendaylight.openflowplugin.api.openflow.statistics.ofpspecific.MessageIntelligenceAgency; +import org.opendaylight.openflowplugin.extension.api.ExtensionConverterProviderKeeper; import org.opendaylight.openflowplugin.extension.api.ExtensionConverterRegistrator; import org.opendaylight.openflowplugin.extension.api.OpenFlowPluginExtensionRegistratorProvider; +import org.opendaylight.openflowplugin.extension.api.core.extension.ExtensionConverterManager; import org.opendaylight.openflowplugin.impl.connection.ConnectionManagerImpl; import org.opendaylight.openflowplugin.impl.device.DeviceManagerImpl; import org.opendaylight.openflowplugin.impl.role.RoleManagerImpl; @@ -46,22 +54,29 @@ import org.opendaylight.openflowplugin.impl.statistics.StatisticsManagerImpl; import org.opendaylight.openflowplugin.impl.statistics.ofpspecific.MessageIntelligenceAgencyImpl; import org.opendaylight.openflowplugin.impl.statistics.ofpspecific.MessageIntelligenceAgencyMXBean; import org.opendaylight.openflowplugin.impl.util.TranslatorLibraryUtil; -import org.opendaylight.openflowplugin.openflow.md.core.extension.ExtensionConverterManager; +import org.opendaylight.openflowplugin.openflow.md.core.ThreadPoolLoggingExecutor; import org.opendaylight.openflowplugin.openflow.md.core.extension.ExtensionConverterManagerImpl; +import org.opendaylight.openflowplugin.openflow.md.core.sal.convertor.ConvertorManager; +import org.opendaylight.openflowplugin.openflow.md.core.sal.convertor.ConvertorManagerFactory; import org.opendaylight.openflowplugin.openflow.md.core.session.OFSessionUtil; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -/** - * Created by Martin Bobak <mbobak@cisco.com> on 27.3.2015. - */ public class OpenFlowPluginProviderImpl implements OpenFlowPluginProvider, OpenFlowPluginExtensionRegistratorProvider { private static final Logger LOG = LoggerFactory.getLogger(OpenFlowPluginProviderImpl.class); private static final MessageIntelligenceAgency messageIntelligenceAgency = new MessageIntelligenceAgencyImpl(); + private static final int TICKS_PER_WHEEL = 500; + private static final long TICK_DURATION = 10; // 0.5 sec. + + private final HashedWheelTimer hashedWheelTimer = new HashedWheelTimer(TICK_DURATION, TimeUnit.MILLISECONDS, TICKS_PER_WHEEL); private final int rpcRequestsQuota; private final long globalNotificationQuota; + private final ConvertorManager convertorManager; + private long barrierInterval; + private int barrierCountLimit; + private long echoReplyTimeout; private DeviceManager deviceManager; private RoleManager roleManager; private RpcManager rpcManager; @@ -70,19 +85,36 @@ public class OpenFlowPluginProviderImpl implements OpenFlowPluginProvider, OpenF private ConnectionManager connectionManager; private NotificationService notificationProviderService; private NotificationPublishService notificationPublishService; - private EntityOwnershipService entityOwnershipService; - private ExtensionConverterManager extensionConverterManager; - private DataBroker dataBroker; private Collection switchConnectionProviders; private boolean switchFeaturesMandatory = false; private boolean isStatisticsPollingOff = false; - - public OpenFlowPluginProviderImpl(final long rpcRequestsQuota, final Long globalNotificationQuota) { + private boolean isStatisticsRpcEnabled; + private boolean isNotificationFlowRemovedOff = false; + private boolean skipTableFeatures = true; + + private final ThreadPoolExecutor threadPool; + private ClusterSingletonServiceProvider singletonServicesProvider; + + public OpenFlowPluginProviderImpl(final long rpcRequestsQuota, + final long globalNotificationQuota, + final int threadPoolMinThreads, + final int threadPoolMaxThreads, + final long threadPoolTimeout) { Preconditions.checkArgument(rpcRequestsQuota > 0 && rpcRequestsQuota <= Integer.MAX_VALUE, "rpcRequestQuota has to be in range <1,%s>", Integer.MAX_VALUE); this.rpcRequestsQuota = (int) rpcRequestsQuota; this.globalNotificationQuota = Preconditions.checkNotNull(globalNotificationQuota); + + // Creates a thread pool that creates new threads as needed, but will reuse previously + // constructed threads when they are available. + // Threads that have not been used for x seconds are terminated and removed from the cache. + threadPool = new ThreadPoolLoggingExecutor( + Preconditions.checkNotNull(threadPoolMinThreads), + Preconditions.checkNotNull(threadPoolMaxThreads), + Preconditions.checkNotNull(threadPoolTimeout), TimeUnit.SECONDS, + new SynchronousQueue<>(), "ofppool"); + convertorManager = ConvertorManagerFactory.createDefaultManager(); } @Override @@ -112,21 +144,48 @@ public class OpenFlowPluginProviderImpl implements OpenFlowPluginProvider, OpenF } @Override - public void onFailure(final Throwable t) { + public void onFailure(@Nonnull final Throwable t) { LOG.warn("Some switchConnectionProviders failed to start.", t); } }); } + @Override public boolean isSwitchFeaturesMandatory() { return switchFeaturesMandatory; } @Override - public void setEntityOwnershipService(EntityOwnershipService entityOwnershipService) { - this.entityOwnershipService = entityOwnershipService; + public void setBarrierCountLimit(final int barrierCountLimit) { + this.barrierCountLimit = barrierCountLimit; + } + + @Override + public void setBarrierInterval(final long barrierTimeoutLimit) { + this.barrierInterval = barrierTimeoutLimit; + } + + @Override + public void setEchoReplyTimeout(final long echoReplyTimeout) { + this.echoReplyTimeout = echoReplyTimeout; + } + + @Override + public void setNotificationFlowRemovedOff(boolean isNotificationFlowRemovedOff) { + this.isNotificationFlowRemovedOff = isNotificationFlowRemovedOff; + } + + @Override + public void setClusteringSingletonServicesProvider(ClusterSingletonServiceProvider singletonServicesProvider) { + this.singletonServicesProvider = singletonServicesProvider; } + @Override + public void setSkipTableFeatures(final boolean skipTableFeatures){ + this.skipTableFeatures = skipTableFeatures; + } + + @Override public void setSwitchFeaturesMandatory(final boolean switchFeaturesMandatory) { this.switchFeaturesMandatory = switchFeaturesMandatory; } @@ -152,48 +211,79 @@ public class OpenFlowPluginProviderImpl implements OpenFlowPluginProvider, OpenF @Override public void initialize() { - Preconditions.checkNotNull(dataBroker, "missing data broker"); Preconditions.checkNotNull(rpcProviderRegistry, "missing RPC provider registry"); Preconditions.checkNotNull(notificationProviderService, "missing notification provider service"); + Preconditions.checkNotNull(singletonServicesProvider, "missing singleton services provider"); extensionConverterManager = new ExtensionConverterManagerImpl(); // TODO: copied from OpenFlowPluginProvider (Helium) misusesing the old way of distributing extension converters // TODO: rewrite later! OFSessionUtil.getSessionManager().setExtensionConverterProvider(extensionConverterManager); - connectionManager = new ConnectionManagerImpl(); + connectionManager = new ConnectionManagerImpl(echoReplyTimeout, threadPool); registerMXBean(messageIntelligenceAgency); - deviceManager = new DeviceManagerImpl(dataBroker, messageIntelligenceAgency, switchFeaturesMandatory, globalNotificationQuota); - roleManager = new RoleManagerImpl(rpcProviderRegistry, entityOwnershipService); - statisticsManager = new StatisticsManagerImpl(rpcProviderRegistry, isStatisticsPollingOff); - rpcManager = new RpcManagerImpl(rpcProviderRegistry, rpcRequestsQuota); - - // CM -> DM -> Role -> SM -> RPC -> DM + deviceManager = new DeviceManagerImpl(dataBroker, + globalNotificationQuota, + switchFeaturesMandatory, + barrierInterval, + barrierCountLimit, + getMessageIntelligenceAgency(), + isNotificationFlowRemovedOff, + singletonServicesProvider, + notificationPublishService, + hashedWheelTimer, + convertorManager, + skipTableFeatures); + + ((ExtensionConverterProviderKeeper) deviceManager).setExtensionConverterProvider(extensionConverterManager); + + rpcManager = new RpcManagerImpl(rpcProviderRegistry, rpcRequestsQuota, extensionConverterManager, convertorManager, notificationPublishService); + roleManager = new RoleManagerImpl(dataBroker, hashedWheelTimer); + statisticsManager = new StatisticsManagerImpl(rpcProviderRegistry, isStatisticsPollingOff, hashedWheelTimer, convertorManager); + + /* Initialization Phase ordering - OFP Device Context suite */ + // CM -> DM -> SM -> RPC -> Role -> DM connectionManager.setDeviceConnectedHandler(deviceManager); - deviceManager.setDeviceInitializationPhaseHandler(roleManager); - roleManager.setDeviceInitializationPhaseHandler(statisticsManager); + deviceManager.setDeviceInitializationPhaseHandler(statisticsManager); statisticsManager.setDeviceInitializationPhaseHandler(rpcManager); - rpcManager.setDeviceInitializationPhaseHandler(deviceManager); + rpcManager.setDeviceInitializationPhaseHandler(roleManager); + roleManager.setDeviceInitializationPhaseHandler(deviceManager); - deviceManager.setNotificationService(this.notificationProviderService); - deviceManager.setNotificationPublishService(this.notificationPublishService); + /* Termination Phase ordering - OFP Device Context suite */ + deviceManager.setDeviceTerminationPhaseHandler(rpcManager); + rpcManager.setDeviceTerminationPhaseHandler(statisticsManager); + statisticsManager.setDeviceTerminationPhaseHandler(roleManager); + roleManager.setDeviceTerminationPhaseHandler(deviceManager); - TranslatorLibraryUtil.setBasicTranslatorLibrary(deviceManager); + rpcManager.setStatisticsRpcEnabled(isStatisticsRpcEnabled); + + TranslatorLibraryUtil.injectBasicTranslatorLibrary(deviceManager, convertorManager); deviceManager.initialize(); startSwitchConnections(); } + @Override + public void update(Map props) { + LOG.debug("Update managed properties = {}", props.toString()); + if(deviceManager != null && props.containsKey("notification-flow-removed-off")) { + deviceManager.setIsNotificationFlowRemovedOff(Boolean.valueOf(props.get("notification-flow-removed-off").toString())); + } + if(deviceManager != null && props.containsKey("skip-table-features")) { + deviceManager.setSkipTableFeatures(Boolean.valueOf(props.get("skip-table-features").toString())); + } + } + private static void registerMXBean(final MessageIntelligenceAgency messageIntelligenceAgency) { - MBeanServer mbs = ManagementFactory.getPlatformMBeanServer(); + final MBeanServer mbs = ManagementFactory.getPlatformMBeanServer(); try { - String pathToMxBean = String.format("%s:type=%s", + final String pathToMxBean = String.format("%s:type=%s", MessageIntelligenceAgencyMXBean.class.getPackage().getName(), MessageIntelligenceAgencyMXBean.class.getSimpleName()); - ObjectName name = new ObjectName(pathToMxBean); + final ObjectName name = new ObjectName(pathToMxBean); mbs.registerMBean(messageIntelligenceAgency, name); } catch (MalformedObjectNameException | NotCompliantMBeanException @@ -218,11 +308,23 @@ public class OpenFlowPluginProviderImpl implements OpenFlowPluginProvider, OpenF return extensionConverterManager; } + @Override + public void setIsStatisticsRpcEnabled(final boolean isStatisticsRpcEnabled) { + this.isStatisticsRpcEnabled = isStatisticsRpcEnabled; + } + @Override public void close() throws Exception { - //TODO: close all contexts, switchConnections (, managers) + //TODO: consider wrapping each manager into try-catch + deviceManager.close(); rpcManager.close(); statisticsManager.close(); + + // TODO: needs to close org.opendaylight.openflowplugin.impl.role.OpenflowOwnershipListener after RoleContexts are down + // TODO: must not be executed prior to all living RoleContexts have been closed (via closing living DeviceContexts) roleManager.close(); + + // Manually shutdown all remaining running threads in pool + threadPool.shutdown(); } }