2 * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
9 package org.opendaylight.openflowplugin.impl;
11 import com.google.common.base.Preconditions;
12 import com.google.common.util.concurrent.FutureCallback;
13 import com.google.common.util.concurrent.Futures;
14 import io.netty.util.HashedWheelTimer;
15 import io.netty.util.Timer;
16 import java.lang.management.ManagementFactory;
17 import java.util.Collection;
18 import java.util.List;
20 import java.util.Objects;
21 import java.util.concurrent.SynchronousQueue;
22 import java.util.concurrent.ThreadPoolExecutor;
23 import java.util.concurrent.TimeUnit;
24 import java.util.function.Consumer;
25 import java.util.stream.Collectors;
26 import javax.annotation.Nonnull;
27 import javax.management.InstanceAlreadyExistsException;
28 import javax.management.InstanceNotFoundException;
29 import javax.management.MBeanRegistrationException;
30 import javax.management.MBeanServer;
31 import javax.management.MalformedObjectNameException;
32 import javax.management.NotCompliantMBeanException;
33 import javax.management.ObjectName;
34 import org.opendaylight.controller.md.sal.binding.api.DataBroker;
35 import org.opendaylight.controller.md.sal.binding.api.NotificationPublishService;
36 import org.opendaylight.controller.md.sal.common.api.clustering.EntityOwnershipService;
37 import org.opendaylight.controller.sal.binding.api.RpcProviderRegistry;
38 import org.opendaylight.mdsal.singleton.common.api.ClusterSingletonServiceProvider;
39 import org.opendaylight.openflowjava.protocol.spi.connection.SwitchConnectionProvider;
40 import org.opendaylight.openflowplugin.api.openflow.OpenFlowPluginConfigurationService;
41 import org.opendaylight.openflowplugin.api.openflow.OpenFlowPluginProvider;
42 import org.opendaylight.openflowplugin.api.openflow.connection.ConnectionManager;
43 import org.opendaylight.openflowplugin.api.openflow.device.DeviceManager;
44 import org.opendaylight.openflowplugin.api.openflow.lifecycle.ContextChainHolder;
45 import org.opendaylight.openflowplugin.api.openflow.rpc.RpcManager;
46 import org.opendaylight.openflowplugin.api.openflow.statistics.StatisticsManager;
47 import org.opendaylight.openflowplugin.api.openflow.statistics.ofpspecific.MessageIntelligenceAgency;
48 import org.opendaylight.openflowplugin.extension.api.ExtensionConverterProviderKeeper;
49 import org.opendaylight.openflowplugin.extension.api.ExtensionConverterRegistrator;
50 import org.opendaylight.openflowplugin.extension.api.OpenFlowPluginExtensionRegistratorProvider;
51 import org.opendaylight.openflowplugin.extension.api.core.extension.ExtensionConverterManager;
52 import org.opendaylight.openflowplugin.impl.connection.ConnectionManagerImpl;
53 import org.opendaylight.openflowplugin.impl.device.DeviceManagerImpl;
54 import org.opendaylight.openflowplugin.impl.device.initialization.DeviceInitializerProvider;
55 import org.opendaylight.openflowplugin.impl.device.initialization.DeviceInitializerProviderFactory;
56 import org.opendaylight.openflowplugin.impl.lifecycle.ContextChainHolderImpl;
57 import org.opendaylight.openflowplugin.impl.protocol.deserialization.DeserializerInjector;
58 import org.opendaylight.openflowplugin.impl.protocol.serialization.SerializerInjector;
59 import org.opendaylight.openflowplugin.impl.rpc.RpcManagerImpl;
60 import org.opendaylight.openflowplugin.impl.statistics.StatisticsManagerImpl;
61 import org.opendaylight.openflowplugin.impl.statistics.ofpspecific.MessageIntelligenceAgencyImpl;
62 import org.opendaylight.openflowplugin.impl.statistics.ofpspecific.MessageIntelligenceAgencyMXBean;
63 import org.opendaylight.openflowplugin.impl.util.TranslatorLibraryUtil;
64 import org.opendaylight.openflowplugin.openflow.md.core.ThreadPoolLoggingExecutor;
65 import org.opendaylight.openflowplugin.openflow.md.core.extension.ExtensionConverterManagerImpl;
66 import org.opendaylight.openflowplugin.openflow.md.core.sal.convertor.ConvertorManager;
67 import org.opendaylight.openflowplugin.openflow.md.core.sal.convertor.ConvertorManagerFactory;
68 import org.opendaylight.openflowplugin.openflow.md.core.session.OFSessionUtil;
69 import org.slf4j.Logger;
70 import org.slf4j.LoggerFactory;
72 public class OpenFlowPluginProviderImpl implements OpenFlowPluginProvider, OpenFlowPluginConfigurationService, OpenFlowPluginExtensionRegistratorProvider {
74 private static final Logger LOG = LoggerFactory.getLogger(OpenFlowPluginProviderImpl.class);
76 private static final int TICKS_PER_WHEEL = 500; // 0.5 sec.
77 private static final long TICK_DURATION = 10;
78 private static final String POOL_NAME = "ofppool";
80 private static final MessageIntelligenceAgency MESSAGE_INTELLIGENCE_AGENCY = new MessageIntelligenceAgencyImpl();
81 private static final String MESSAGE_INTELLIGENCE_AGENCY_MX_BEAN_NAME = String
83 MessageIntelligenceAgencyMXBean.class.getPackage().getName(),
84 MessageIntelligenceAgencyMXBean.class.getSimpleName());
86 private final HashedWheelTimer hashedWheelTimer = new HashedWheelTimer(TICK_DURATION, TimeUnit.MILLISECONDS, TICKS_PER_WHEEL);
87 private final NotificationPublishService notificationPublishService;
88 private final ExtensionConverterManager extensionConverterManager;
89 private final DataBroker dataBroker;
90 private final Collection<SwitchConnectionProvider> switchConnectionProviders;
91 private final DeviceInitializerProvider deviceInitializerProvider;
92 private final ConvertorManager convertorManager;
93 private final ContextChainHolder contextChainHolder;
94 private final RpcProviderRegistry rpcProviderRegistry;
95 private final ClusterSingletonServiceProvider singletonServicesProvider;
96 private int rpcRequestsQuota;
97 private long globalNotificationQuota;
98 private long barrierInterval;
99 private int barrierCountLimit;
100 private long echoReplyTimeout;
101 private DeviceManager deviceManager;
102 private RpcManager rpcManager;
103 private StatisticsManager statisticsManager;
104 private ConnectionManager connectionManager;
105 private boolean switchFeaturesMandatory;
106 private boolean isStatisticsPollingOn;
107 private boolean isStatisticsRpcEnabled;
108 private boolean isFlowRemovedNotificationOn;
109 private boolean skipTableFeatures;
110 private long basicTimerDelay;
111 private long maximumTimerDelay;
112 private boolean useSingleLayerSerialization;
113 private ThreadPoolExecutor threadPool;
114 private int threadPoolMinThreads;
115 private int threadPoolMaxThreads;
116 private long threadPoolTimeout;
117 private boolean initialized = false;
119 public static MessageIntelligenceAgency getMessageIntelligenceAgency() {
120 return MESSAGE_INTELLIGENCE_AGENCY;
123 OpenFlowPluginProviderImpl(final List<SwitchConnectionProvider> switchConnectionProviders,
124 final DataBroker dataBroker,
125 final RpcProviderRegistry rpcProviderRegistry,
126 final NotificationPublishService notificationPublishService,
127 final ClusterSingletonServiceProvider singletonServiceProvider,
128 final EntityOwnershipService entityOwnershipService) {
129 this.switchConnectionProviders = switchConnectionProviders;
130 this.dataBroker = dataBroker;
131 this.rpcProviderRegistry = rpcProviderRegistry;
132 this.notificationPublishService = notificationPublishService;
133 this.singletonServicesProvider = singletonServiceProvider;
134 convertorManager = ConvertorManagerFactory.createDefaultManager();
135 contextChainHolder = new ContextChainHolderImpl(hashedWheelTimer);
136 contextChainHolder.changeEntityOwnershipService(entityOwnershipService);
137 extensionConverterManager = new ExtensionConverterManagerImpl();
138 deviceInitializerProvider = DeviceInitializerProviderFactory.createDefaultProvider();
142 private void startSwitchConnections() {
143 Futures.addCallback(Futures.allAsList(switchConnectionProviders.stream().map(switchConnectionProvider -> {
144 // Inject OpenFlowPlugin custom serializers and deserializers into OpenFlowJava
145 if (useSingleLayerSerialization) {
146 SerializerInjector.injectSerializers(switchConnectionProvider);
147 DeserializerInjector.injectDeserializers(switchConnectionProvider);
149 DeserializerInjector.revertDeserializers(switchConnectionProvider);
152 // Set handler of incoming connections and start switch connection provider
153 switchConnectionProvider.setSwitchConnectionHandler(connectionManager);
154 return switchConnectionProvider.startup();
155 }).collect(Collectors.toSet())), new FutureCallback<List<Boolean>>() {
157 public void onSuccess(final List<Boolean> result) {
158 LOG.info("All switchConnectionProviders are up and running ({}).", result.size());
162 public void onFailure(@Nonnull final Throwable throwable) {
163 LOG.warn("Some switchConnectionProviders failed to start.", throwable);
168 private void shutdownSwitchConnections() {
169 Futures.addCallback(Futures.allAsList(switchConnectionProviders.stream().map(switchConnectionProvider -> {
170 // Revert deserializers to their original state
171 if (useSingleLayerSerialization) {
172 DeserializerInjector.revertDeserializers(switchConnectionProvider);
175 // Shutdown switch connection provider
176 return switchConnectionProvider.shutdown();
177 }).collect(Collectors.toSet())), new FutureCallback<List<Boolean>>() {
179 public void onSuccess(final List<Boolean> result) {
180 LOG.info("All switchConnectionProviders were successfully shut down ({}).", result.size());
184 public void onFailure(@Nonnull final Throwable throwable) {
185 LOG.warn("Some switchConnectionProviders failed to shutdown.", throwable);
191 public void initialize() {
192 // TODO: copied from OpenFlowPluginProvider (Helium) misusesing the old way of distributing extension converters
193 // TODO: rewrite later!
194 OFSessionUtil.getSessionManager().setExtensionConverterProvider(extensionConverterManager);
196 // Creates a thread pool that creates new threads as needed, but will reuse previously
197 // constructed threads when they are available.
198 // Threads that have not been used for x seconds are terminated and removed from the cache.
199 threadPool = new ThreadPoolLoggingExecutor(
200 Preconditions.checkNotNull(threadPoolMinThreads),
201 Preconditions.checkNotNull(threadPoolMaxThreads),
202 Preconditions.checkNotNull(threadPoolTimeout),
203 TimeUnit.SECONDS, new SynchronousQueue<>(), POOL_NAME);
205 connectionManager = new ConnectionManagerImpl(threadPool);
206 connectionManager.setEchoReplyTimeout(echoReplyTimeout);
208 registerMXBean(MESSAGE_INTELLIGENCE_AGENCY, MESSAGE_INTELLIGENCE_AGENCY_MX_BEAN_NAME);
210 contextChainHolder.addSingletonServicesProvider(singletonServicesProvider);
212 deviceManager = new DeviceManagerImpl(
214 getMessageIntelligenceAgency(),
215 notificationPublishService,
218 deviceInitializerProvider,
219 useSingleLayerSerialization);
221 deviceManager.setGlobalNotificationQuota(globalNotificationQuota);
222 deviceManager.setSwitchFeaturesMandatory(switchFeaturesMandatory);
223 deviceManager.setBarrierInterval(barrierInterval);
224 deviceManager.setBarrierCountLimit(barrierCountLimit);
225 deviceManager.setFlowRemovedNotificationOn(isFlowRemovedNotificationOn);
226 deviceManager.setSkipTableFeatures(skipTableFeatures);
228 ((ExtensionConverterProviderKeeper) deviceManager).setExtensionConverterProvider(extensionConverterManager);
230 rpcManager = new RpcManagerImpl(rpcProviderRegistry, extensionConverterManager, convertorManager, notificationPublishService);
231 rpcManager.setRpcRequestQuota(rpcRequestsQuota);
233 statisticsManager = new StatisticsManagerImpl(rpcProviderRegistry, hashedWheelTimer, convertorManager);
234 statisticsManager.setBasicTimerDelay(basicTimerDelay);
235 statisticsManager.setMaximumTimerDelay(maximumTimerDelay);
236 statisticsManager.setIsStatisticsPollingOn(isStatisticsPollingOn);
238 // Device connection handler moved from device manager to context holder
239 connectionManager.setDeviceConnectedHandler(contextChainHolder);
241 /* Termination Phase ordering - OFP Device Context suite */
242 connectionManager.setDeviceDisconnectedHandler(contextChainHolder);
244 rpcManager.setStatisticsRpcEnabled(isStatisticsRpcEnabled);
246 TranslatorLibraryUtil.injectBasicTranslatorLibrary(deviceManager, convertorManager);
247 deviceManager.initialize();
249 contextChainHolder.addManager(deviceManager);
250 contextChainHolder.addManager(statisticsManager);
251 contextChainHolder.addManager(rpcManager);
253 startSwitchConnections();
259 public void update(@Nonnull final Map<String, Object> properties) {
260 properties.forEach((key, value) -> {
261 final PropertyType propertyType = PropertyType.forValue(key);
263 if (Objects.nonNull(propertyType)) {
264 updateProperty(propertyType, value);
269 private void doPropertyUpdate(final PropertyType propertyType,
270 final boolean modifiable,
271 final Object origValue,
272 final Object newValue,
273 final Consumer<Object> successCallback) {
275 if (Objects.equals(origValue, newValue)) {
276 LOG.debug("{} config parameter is already set to {})", propertyType, origValue);
278 } else if (!modifiable) {
279 LOG.warn("{} update ({} -> {}) is not allowed after controller start", propertyType, origValue, newValue);
284 successCallback.accept(newValue);
285 LOG.info("{} config parameter is updated ({} -> {})", propertyType, origValue, newValue);
289 public void updateProperty(@Nonnull final PropertyType key, @Nonnull final Object value) {
291 final String sValue = value.toString();
292 final Consumer<Object> successCallback;
293 final boolean modifiable;
294 final Object oldValue;
295 final Object newValue;
298 case RPC_REQUESTS_QUOTA:
299 successCallback = (result) -> {
300 rpcRequestsQuota = (int) result;
303 rpcManager.setRpcRequestQuota(rpcRequestsQuota);
307 oldValue = rpcRequestsQuota;
308 newValue = Integer.valueOf(sValue);
311 case SWITCH_FEATURES_MANDATORY:
312 successCallback = (result) -> {
313 switchFeaturesMandatory = (boolean) result;
316 deviceManager.setSwitchFeaturesMandatory(switchFeaturesMandatory);
320 oldValue = switchFeaturesMandatory;
321 newValue = Boolean.valueOf(sValue);
324 case GLOBAL_NOTIFICATION_QUOTA:
325 successCallback = (result) -> {
326 globalNotificationQuota = (long) result;
329 deviceManager.setGlobalNotificationQuota(globalNotificationQuota);
333 oldValue = globalNotificationQuota;
334 newValue = Long.valueOf(sValue);
337 case IS_STATISTICS_POLLING_ON:
338 successCallback = (result) -> {
339 isStatisticsPollingOn = (boolean) result;
342 statisticsManager.setIsStatisticsPollingOn(isStatisticsPollingOn);
346 oldValue = isStatisticsPollingOn;
347 newValue = Boolean.valueOf(sValue);
350 case IS_STATISTICS_RPC_ENABLED:
351 successCallback = (result) -> {
352 isStatisticsRpcEnabled = (boolean) result;
355 rpcManager.setStatisticsRpcEnabled(isStatisticsRpcEnabled);
359 oldValue = isStatisticsRpcEnabled;
360 newValue = Boolean.valueOf(sValue);
363 case BARRIER_INTERVAL_TIMEOUT_LIMIT:
364 successCallback = (result) -> {
365 barrierInterval = (long) result;
368 deviceManager.setBarrierInterval(barrierInterval);
372 oldValue = barrierInterval;
373 newValue = Long.valueOf(sValue);
376 case BARRIER_COUNT_LIMIT:
377 successCallback = (result) -> {
378 barrierCountLimit = (int) result;
381 deviceManager.setBarrierCountLimit(barrierCountLimit);
385 oldValue = barrierCountLimit;
386 newValue = Integer.valueOf(sValue);
389 case ECHO_REPLY_TIMEOUT:
390 successCallback = (result) -> {
391 echoReplyTimeout = (long) result;
394 connectionManager.setEchoReplyTimeout(echoReplyTimeout);
398 oldValue = echoReplyTimeout;
399 newValue = Long.valueOf(sValue);
402 case THREAD_POOL_MIN_THREADS:
403 successCallback = (result) -> threadPoolMinThreads = (int) result;
404 oldValue = threadPoolMinThreads;
405 newValue = Integer.valueOf(sValue);
408 case THREAD_POOL_MAX_THREADS:
409 successCallback = (result) -> threadPoolMaxThreads = (int) result;
410 oldValue = threadPoolMaxThreads;
411 newValue = Integer.valueOf(sValue);
414 case THREAD_POOL_TIMEOUT:
415 successCallback = (result) -> threadPoolTimeout = (long) result;
416 oldValue = threadPoolTimeout;
417 newValue = Long.valueOf(sValue);
420 case ENABLE_FLOW_REMOVED_NOTIFICATION:
421 successCallback = (result) -> {
422 isFlowRemovedNotificationOn = (boolean) result;
425 deviceManager.setFlowRemovedNotificationOn(isFlowRemovedNotificationOn);
429 oldValue = isFlowRemovedNotificationOn;
430 newValue = Boolean.valueOf(sValue);
433 case SKIP_TABLE_FEATURES:
434 successCallback = (result) -> {
435 skipTableFeatures = (boolean) result;
438 deviceManager.setSkipTableFeatures(skipTableFeatures);
442 oldValue = skipTableFeatures;
443 newValue = Boolean.valueOf(sValue);
446 case BASIC_TIMER_DELAY:
447 successCallback = (result) -> {
448 basicTimerDelay = (long) result;
451 statisticsManager.setBasicTimerDelay(basicTimerDelay);
455 oldValue = basicTimerDelay;
456 newValue = Long.valueOf(sValue);
459 case MAXIMUM_TIMER_DELAY:
460 successCallback = (result) -> {
461 maximumTimerDelay = (long) result;
464 statisticsManager.setMaximumTimerDelay(maximumTimerDelay);
468 oldValue = maximumTimerDelay;
469 newValue = Long.valueOf(sValue);
472 case USE_SINGLE_LAYER_SERIALIZATION:
473 successCallback = (result) -> useSingleLayerSerialization = (boolean) result;
474 oldValue = useSingleLayerSerialization;
475 newValue = Boolean.valueOf(sValue);
482 doPropertyUpdate(key, modifiable, oldValue, newValue, successCallback);
483 } catch (final Exception ex) {
484 LOG.warn("Failed to read configuration property '{}={}', error: {}", key, value, ex);
489 public ExtensionConverterRegistrator getExtensionConverterRegistrator() {
490 return extensionConverterManager;
494 public void close() throws Exception {
496 gracefulShutdown(contextChainHolder);
497 gracefulShutdown(deviceManager);
498 gracefulShutdown(rpcManager);
499 gracefulShutdown(statisticsManager);
500 gracefulShutdown(threadPool);
501 gracefulShutdown(hashedWheelTimer);
502 shutdownSwitchConnections();
503 unregisterMXBean(MESSAGE_INTELLIGENCE_AGENCY_MX_BEAN_NAME);
506 private static void gracefulShutdown(final AutoCloseable closeable) {
507 if (Objects.isNull(closeable)) {
513 } catch (Exception e) {
514 LOG.warn("Failed to shutdown {} gracefully.", closeable);
518 private static void gracefulShutdown(final Timer timer) {
519 if (Objects.isNull(timer)) {
525 } catch (Exception e) {
526 LOG.warn("Failed to shutdown {} gracefully.", timer);
530 private static void gracefulShutdown(final ThreadPoolExecutor threadPoolExecutor) {
531 if (Objects.isNull(threadPoolExecutor)) {
536 threadPoolExecutor.shutdown();
537 } catch (Exception e) {
538 LOG.warn("Failed to shutdown {} gracefully.", threadPoolExecutor);
542 private static void registerMXBean(final Object bean, final String beanName) {
543 final MBeanServer mbs = ManagementFactory.getPlatformMBeanServer();
546 mbs.registerMBean(bean, new ObjectName(beanName));
547 } catch (MalformedObjectNameException
548 | NotCompliantMBeanException
549 | MBeanRegistrationException
550 | InstanceAlreadyExistsException e) {
551 LOG.warn("Error registering MBean {}", e);
555 private static void unregisterMXBean(final String beanName) {
556 final MBeanServer mbs = ManagementFactory.getPlatformMBeanServer();
559 mbs.unregisterMBean(new ObjectName(beanName));
560 } catch (InstanceNotFoundException
561 | MBeanRegistrationException
562 | MalformedObjectNameException e) {
563 LOG.warn("Error unregistering MBean {}", e);