2 * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
9 package org.opendaylight.openflowplugin.impl;
11 import com.google.common.base.Preconditions;
12 import com.google.common.util.concurrent.FutureCallback;
13 import com.google.common.util.concurrent.Futures;
14 import io.netty.util.HashedWheelTimer;
15 import java.lang.management.ManagementFactory;
16 import java.util.Collection;
17 import java.util.List;
19 import java.util.Objects;
20 import java.util.concurrent.SynchronousQueue;
21 import java.util.concurrent.ThreadPoolExecutor;
22 import java.util.concurrent.TimeUnit;
23 import java.util.function.Consumer;
24 import java.util.stream.Collectors;
25 import javax.annotation.Nonnull;
26 import javax.management.InstanceAlreadyExistsException;
27 import javax.management.MBeanRegistrationException;
28 import javax.management.MBeanServer;
29 import javax.management.MalformedObjectNameException;
30 import javax.management.NotCompliantMBeanException;
31 import javax.management.ObjectName;
32 import org.opendaylight.controller.md.sal.binding.api.DataBroker;
33 import org.opendaylight.controller.md.sal.binding.api.NotificationPublishService;
34 import org.opendaylight.controller.md.sal.binding.api.NotificationService;
35 import org.opendaylight.controller.md.sal.common.api.clustering.EntityOwnershipService;
36 import org.opendaylight.controller.sal.binding.api.RpcProviderRegistry;
37 import org.opendaylight.mdsal.singleton.common.api.ClusterSingletonServiceProvider;
38 import org.opendaylight.openflowjava.protocol.spi.connection.SwitchConnectionProvider;
39 import org.opendaylight.openflowplugin.api.openflow.OpenFlowPluginConfigurationService;
40 import org.opendaylight.openflowplugin.api.openflow.OpenFlowPluginProvider;
41 import org.opendaylight.openflowplugin.api.openflow.connection.ConnectionManager;
42 import org.opendaylight.openflowplugin.api.openflow.device.DeviceManager;
43 import org.opendaylight.openflowplugin.api.openflow.lifecycle.ContextChainHolder;
44 import org.opendaylight.openflowplugin.api.openflow.rpc.RpcManager;
45 import org.opendaylight.openflowplugin.api.openflow.statistics.StatisticsManager;
46 import org.opendaylight.openflowplugin.api.openflow.statistics.ofpspecific.MessageIntelligenceAgency;
47 import org.opendaylight.openflowplugin.extension.api.ExtensionConverterProviderKeeper;
48 import org.opendaylight.openflowplugin.extension.api.ExtensionConverterRegistrator;
49 import org.opendaylight.openflowplugin.extension.api.OpenFlowPluginExtensionRegistratorProvider;
50 import org.opendaylight.openflowplugin.extension.api.core.extension.ExtensionConverterManager;
51 import org.opendaylight.openflowplugin.impl.connection.ConnectionManagerImpl;
52 import org.opendaylight.openflowplugin.impl.device.DeviceManagerImpl;
53 import org.opendaylight.openflowplugin.impl.device.initialization.DeviceInitializerProvider;
54 import org.opendaylight.openflowplugin.impl.device.initialization.DeviceInitializerProviderFactory;
55 import org.opendaylight.openflowplugin.impl.lifecycle.ContextChainHolderImpl;
56 import org.opendaylight.openflowplugin.impl.protocol.deserialization.DeserializerInjector;
57 import org.opendaylight.openflowplugin.impl.protocol.serialization.SerializerInjector;
58 import org.opendaylight.openflowplugin.impl.rpc.RpcManagerImpl;
59 import org.opendaylight.openflowplugin.impl.statistics.StatisticsManagerImpl;
60 import org.opendaylight.openflowplugin.impl.statistics.ofpspecific.MessageIntelligenceAgencyImpl;
61 import org.opendaylight.openflowplugin.impl.statistics.ofpspecific.MessageIntelligenceAgencyMXBean;
62 import org.opendaylight.openflowplugin.impl.util.TranslatorLibraryUtil;
63 import org.opendaylight.openflowplugin.openflow.md.core.ThreadPoolLoggingExecutor;
64 import org.opendaylight.openflowplugin.openflow.md.core.extension.ExtensionConverterManagerImpl;
65 import org.opendaylight.openflowplugin.openflow.md.core.sal.convertor.ConvertorManager;
66 import org.opendaylight.openflowplugin.openflow.md.core.sal.convertor.ConvertorManagerFactory;
67 import org.opendaylight.openflowplugin.openflow.md.core.session.OFSessionUtil;
68 import org.slf4j.Logger;
69 import org.slf4j.LoggerFactory;
71 public class OpenFlowPluginProviderImpl implements OpenFlowPluginProvider, OpenFlowPluginConfigurationService, OpenFlowPluginExtensionRegistratorProvider {
73 private static final Logger LOG = LoggerFactory.getLogger(OpenFlowPluginProviderImpl.class);
74 private static final MessageIntelligenceAgency messageIntelligenceAgency = new MessageIntelligenceAgencyImpl();
75 private static final int TICKS_PER_WHEEL = 500; // 0.5 sec.
76 private static final long TICK_DURATION = 10;
77 private static final String POOL_NAME = "ofppool";
79 private final HashedWheelTimer hashedWheelTimer = new HashedWheelTimer(TICK_DURATION, TimeUnit.MILLISECONDS, TICKS_PER_WHEEL);
80 private final NotificationService notificationProviderService;
81 private final NotificationPublishService notificationPublishService;
82 private final ExtensionConverterManager extensionConverterManager;
83 private final DataBroker dataBroker;
84 private final Collection<SwitchConnectionProvider> switchConnectionProviders;
85 private final DeviceInitializerProvider deviceInitializerProvider;
86 private final ConvertorManager convertorManager;
87 private final ContextChainHolder contextChainHolder;private int rpcRequestsQuota;
88 private long globalNotificationQuota;
89 private long barrierInterval;
90 private int barrierCountLimit;
91 private long echoReplyTimeout;
92 private DeviceManager deviceManager;
93 private RpcManager rpcManager;
94 private RpcProviderRegistry rpcProviderRegistry;
95 private StatisticsManager statisticsManager;
96 private ConnectionManager connectionManager;
97 private boolean switchFeaturesMandatory;
98 private boolean isStatisticsPollingOn;
99 private boolean isStatisticsRpcEnabled;
100 private boolean isFlowRemovedNotificationOn;
101 private boolean skipTableFeatures;
102 private long basicTimerDelay;
103 private long maximumTimerDelay;
104 private boolean useSingleLayerSerialization;
105 private ThreadPoolExecutor threadPool;
106 private ClusterSingletonServiceProvider singletonServicesProvider;
107 private int threadPoolMinThreads;
108 private int threadPoolMaxThreads;
109 private long threadPoolTimeout;
110 private boolean initialized = false;
112 public static MessageIntelligenceAgency getMessageIntelligenceAgency() {
113 return messageIntelligenceAgency;
116 public OpenFlowPluginProviderImpl(final List<SwitchConnectionProvider> switchConnectionProviders,
117 final DataBroker dataBroker,
118 final RpcProviderRegistry rpcProviderRegistry,
119 final NotificationService notificationProviderService,
120 final NotificationPublishService notificationPublishService,
121 final ClusterSingletonServiceProvider singletonServiceProvider,
122 final EntityOwnershipService entityOwnershipService) {
123 this.switchConnectionProviders = switchConnectionProviders;
124 this.dataBroker = dataBroker;
125 this.rpcProviderRegistry = rpcProviderRegistry;
126 this.notificationProviderService = notificationProviderService;
127 this.notificationPublishService = notificationPublishService;
128 this.singletonServicesProvider = singletonServiceProvider;
129 convertorManager = ConvertorManagerFactory.createDefaultManager();
130 contextChainHolder = new ContextChainHolderImpl(hashedWheelTimer);
131 contextChainHolder.changeEntityOwnershipService(entityOwnershipService);
132 extensionConverterManager = new ExtensionConverterManagerImpl();
133 deviceInitializerProvider = DeviceInitializerProviderFactory.createDefaultProvider();
137 private void startSwitchConnections() {
138 Futures.addCallback(Futures.allAsList(switchConnectionProviders.stream().map(switchConnectionProvider -> {
139 // Inject OpenflowPlugin custom serializers and deserializers into OpenflowJava
140 if (useSingleLayerSerialization) {
141 SerializerInjector.injectSerializers(switchConnectionProvider);
142 DeserializerInjector.injectDeserializers(switchConnectionProvider);
145 // Set handler of incoming connections and start switch connection provider
146 switchConnectionProvider.setSwitchConnectionHandler(connectionManager);
147 return switchConnectionProvider.startup();
148 }).collect(Collectors.toSet())), new FutureCallback<List<Boolean>>() {
150 public void onSuccess(final List<Boolean> result) {
151 LOG.info("All switchConnectionProviders are up and running ({}).", result.size());
155 public void onFailure(@Nonnull final Throwable t) {
156 LOG.warn("Some switchConnectionProviders failed to start.", t);
162 public void initialize() {
163 Preconditions.checkNotNull(dataBroker, "missing data broker");
164 Preconditions.checkNotNull(rpcProviderRegistry, "missing RPC provider registry");
165 Preconditions.checkNotNull(notificationProviderService, "missing notification provider service");
166 Preconditions.checkNotNull(singletonServicesProvider, "missing singleton services provider");
168 // TODO: copied from OpenFlowPluginProvider (Helium) misusesing the old way of distributing extension converters
169 // TODO: rewrite later!
170 OFSessionUtil.getSessionManager().setExtensionConverterProvider(extensionConverterManager);
172 // Creates a thread pool that creates new threads as needed, but will reuse previously
173 // constructed threads when they are available.
174 // Threads that have not been used for x seconds are terminated and removed from the cache.
175 threadPool = new ThreadPoolLoggingExecutor(
176 Preconditions.checkNotNull(threadPoolMinThreads),
177 Preconditions.checkNotNull(threadPoolMaxThreads),
178 Preconditions.checkNotNull(threadPoolTimeout),
179 TimeUnit.SECONDS, new SynchronousQueue<>(), POOL_NAME);
181 connectionManager = new ConnectionManagerImpl(threadPool);
182 connectionManager.setEchoReplyTimeout(echoReplyTimeout);
184 registerMXBean(messageIntelligenceAgency);
186 contextChainHolder.addSingletonServicesProvider(singletonServicesProvider);
188 deviceManager = new DeviceManagerImpl(
190 getMessageIntelligenceAgency(),
191 notificationPublishService,
194 deviceInitializerProvider);
196 deviceManager.setGlobalNotificationQuota(globalNotificationQuota);
197 deviceManager.setSwitchFeaturesMandatory(switchFeaturesMandatory);
198 deviceManager.setBarrierInterval(barrierInterval);
199 deviceManager.setBarrierCountLimit(barrierCountLimit);
200 deviceManager.setFlowRemovedNotificationOn(isFlowRemovedNotificationOn);
201 deviceManager.setSkipTableFeatures(skipTableFeatures);
202 deviceManager.setUseSingleLayerSerialization(useSingleLayerSerialization);
204 ((ExtensionConverterProviderKeeper) deviceManager).setExtensionConverterProvider(extensionConverterManager);
206 rpcManager = new RpcManagerImpl(rpcProviderRegistry, extensionConverterManager, convertorManager, notificationPublishService);
207 rpcManager.setRpcRequestQuota(rpcRequestsQuota);
209 statisticsManager = new StatisticsManagerImpl(rpcProviderRegistry, hashedWheelTimer, convertorManager);
210 statisticsManager.setBasicTimerDelay(basicTimerDelay);
211 statisticsManager.setMaximumTimerDelay(maximumTimerDelay);
212 statisticsManager.setIsStatisticsPollingOn(isStatisticsPollingOn);
214 // Device connection handler moved from device manager to context holder
215 connectionManager.setDeviceConnectedHandler(contextChainHolder);
217 /* Termination Phase ordering - OFP Device Context suite */
218 connectionManager.setDeviceDisconnectedHandler(contextChainHolder);
220 rpcManager.setStatisticsRpcEnabled(isStatisticsRpcEnabled);
222 TranslatorLibraryUtil.injectBasicTranslatorLibrary(deviceManager, convertorManager);
223 deviceManager.initialize();
225 contextChainHolder.addManager(deviceManager);
226 contextChainHolder.addManager(statisticsManager);
227 contextChainHolder.addManager(rpcManager);
229 startSwitchConnections();
235 public void update(@Nonnull final Map<String, Object> properties) {
236 properties.forEach((key, value) -> {
237 final PropertyType propertyType = PropertyType.forValue(key);
239 if (Objects.nonNull(propertyType)) {
240 updateProperty(propertyType, value);
241 } else if (!key.equals("service.pid") && !key.equals("felix.fileinstall.filename")) {
242 LOG.warn("Unsupported configuration property '{}={}'", key, value);
247 private void doPropertyUpdate(final PropertyType propertyType,
248 final boolean modifiable,
249 final Object origValue,
250 final Object newValue,
251 final Consumer<Object> successCallback) {
253 if (Objects.equals(origValue, newValue)) {
254 LOG.debug("{} config parameter is already set to {})", propertyType, origValue);
256 } else if (!modifiable) {
257 LOG.warn("{} update ({} -> {}) is not allowed after controller start", propertyType, origValue, newValue);
262 successCallback.accept(newValue);
263 LOG.info("{} config parameter is updated ({} -> {})", propertyType, origValue, newValue);
267 public void updateProperty(@Nonnull final PropertyType key, @Nonnull final Object value) {
269 final String sValue = value.toString();
270 final Consumer<Object> successCallback;
271 final boolean modifiable;
272 final Object oldValue;
273 final Object newValue;
276 case RPC_REQUESTS_QUOTA:
277 successCallback = (result) -> {
278 rpcRequestsQuota = (int) result;
281 rpcManager.setRpcRequestQuota(rpcRequestsQuota);
285 oldValue = rpcRequestsQuota;
286 newValue = Integer.valueOf(sValue);
289 case SWITCH_FEATURES_MANDATORY:
290 successCallback = (result) -> {
291 switchFeaturesMandatory = (boolean) result;
294 deviceManager.setSwitchFeaturesMandatory(switchFeaturesMandatory);
298 oldValue = switchFeaturesMandatory;
299 newValue = Boolean.valueOf(sValue);
302 case GLOBAL_NOTIFICATION_QUOTA:
303 successCallback = (result) -> {
304 globalNotificationQuota = (long) result;
307 deviceManager.setGlobalNotificationQuota(globalNotificationQuota);
311 oldValue = globalNotificationQuota;
312 newValue = Long.valueOf(sValue);
315 case IS_STATISTICS_POLLING_ON:
316 successCallback = (result) -> {
317 isStatisticsPollingOn = (boolean) result;
320 statisticsManager.setIsStatisticsPollingOn(isStatisticsPollingOn);
324 oldValue = isStatisticsPollingOn;
325 newValue = Boolean.valueOf(sValue);
328 case IS_STATISTICS_RPC_ENABLED:
329 successCallback = (result) -> {
330 isStatisticsRpcEnabled = (boolean) result;
333 rpcManager.setStatisticsRpcEnabled(isStatisticsRpcEnabled);
337 oldValue = isStatisticsRpcEnabled;
338 newValue = Boolean.valueOf(sValue);
341 case BARRIER_INTERVAL_TIMEOUT_LIMIT:
342 successCallback = (result) -> {
343 barrierInterval = (long) result;
346 deviceManager.setBarrierInterval(barrierInterval);
350 oldValue = barrierInterval;
351 newValue = Long.valueOf(sValue);
354 case BARRIER_COUNT_LIMIT:
355 successCallback = (result) -> {
356 barrierCountLimit = (int) result;
359 deviceManager.setBarrierCountLimit(barrierCountLimit);
363 oldValue = barrierCountLimit;
364 newValue = Integer.valueOf(sValue);
367 case ECHO_REPLY_TIMEOUT:
368 successCallback = (result) -> {
369 echoReplyTimeout = (long) result;
372 connectionManager.setEchoReplyTimeout(echoReplyTimeout);
376 oldValue = echoReplyTimeout;
377 newValue = Long.valueOf(sValue);
380 case THREAD_POOL_MIN_THREADS:
381 successCallback = (result) -> threadPoolMinThreads = (int) result;
382 oldValue = threadPoolMinThreads;
383 newValue = Integer.valueOf(sValue);
386 case THREAD_POOL_MAX_THREADS:
387 successCallback = (result) -> threadPoolMaxThreads = (int) result;
388 oldValue = threadPoolMaxThreads;
389 newValue = Integer.valueOf(sValue);
392 case THREAD_POOL_TIMEOUT:
393 successCallback = (result) -> threadPoolTimeout = (long) result;
394 oldValue = threadPoolTimeout;
395 newValue = Long.valueOf(sValue);
398 case ENABLE_FLOW_REMOVED_NOTIFICATION:
399 successCallback = (result) -> {
400 isFlowRemovedNotificationOn = (boolean) result;
403 deviceManager.setFlowRemovedNotificationOn(isFlowRemovedNotificationOn);
407 oldValue = isFlowRemovedNotificationOn;
408 newValue = Boolean.valueOf(sValue);
411 case SKIP_TABLE_FEATURES:
412 successCallback = (result) -> {
413 skipTableFeatures = (boolean) result;
416 deviceManager.setSkipTableFeatures(skipTableFeatures);
420 oldValue = skipTableFeatures;
421 newValue = Boolean.valueOf(sValue);
424 case BASIC_TIMER_DELAY:
425 successCallback = (result) -> {
426 basicTimerDelay = (long) result;
429 statisticsManager.setBasicTimerDelay(basicTimerDelay);
433 oldValue = basicTimerDelay;
434 newValue = Long.valueOf(sValue);
437 case MAXIMUM_TIMER_DELAY:
438 successCallback = (result) -> {
439 maximumTimerDelay = (long) result;
442 statisticsManager.setMaximumTimerDelay(maximumTimerDelay);
446 oldValue = maximumTimerDelay;
447 newValue = Long.valueOf(sValue);
450 case USE_SINGLE_LAYER_SERIALIZATION:
451 successCallback = (result) -> {
452 useSingleLayerSerialization = (boolean) result;
454 switchConnectionProviders.forEach(switchConnectionProvider -> {
455 if (useSingleLayerSerialization) {
456 SerializerInjector.injectSerializers(switchConnectionProvider);
457 DeserializerInjector.injectDeserializers(switchConnectionProvider);
459 DeserializerInjector.revertDeserializers(switchConnectionProvider);
464 oldValue = useSingleLayerSerialization;
465 newValue = Boolean.valueOf(sValue);
469 LOG.warn("Unsupported configuration property '{}={}'", key, sValue);
473 doPropertyUpdate(key, modifiable, oldValue, newValue, successCallback);
474 } catch (final Exception ex) {
475 LOG.warn("Failed to read configuration property '{}={}', error: {}", key, value, ex);
480 public ExtensionConverterRegistrator getExtensionConverterRegistrator() {
481 return extensionConverterManager;
485 public void close() throws Exception {
487 //TODO: consider wrapping each manager into try-catch
488 deviceManager.close();
490 statisticsManager.close();
492 // Manually shutdown all remaining running threads in pool
493 threadPool.shutdown();
496 private static void registerMXBean(final MessageIntelligenceAgency messageIntelligenceAgency) {
497 final MBeanServer mbs = ManagementFactory.getPlatformMBeanServer();
499 final String pathToMxBean = String.format("%s:type=%s",
500 MessageIntelligenceAgencyMXBean.class.getPackage().getName(),
501 MessageIntelligenceAgencyMXBean.class.getSimpleName());
502 final ObjectName name = new ObjectName(pathToMxBean);
503 mbs.registerMBean(messageIntelligenceAgency, name);
504 } catch (MalformedObjectNameException
505 | NotCompliantMBeanException
506 | MBeanRegistrationException
507 | InstanceAlreadyExistsException e) {
508 LOG.warn("Error registering MBean {}", e);