2 * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.openflowplugin.impl;
10 import com.google.common.annotations.VisibleForTesting;
11 import com.google.common.util.concurrent.FutureCallback;
12 import com.google.common.util.concurrent.Futures;
13 import com.google.common.util.concurrent.ListenableFuture;
14 import com.google.common.util.concurrent.MoreExecutors;
15 import com.google.common.util.concurrent.SettableFuture;
16 import io.netty.util.HashedWheelTimer;
17 import io.netty.util.Timer;
18 import java.lang.management.ManagementFactory;
19 import java.util.Collection;
20 import java.util.List;
22 import java.util.concurrent.ExecutionException;
23 import java.util.concurrent.ExecutorService;
24 import java.util.concurrent.Future;
25 import java.util.concurrent.SynchronousQueue;
26 import java.util.concurrent.TimeUnit;
27 import java.util.concurrent.TimeoutException;
28 import java.util.stream.Collectors;
29 import javax.annotation.PostConstruct;
30 import javax.annotation.PreDestroy;
31 import javax.inject.Inject;
32 import javax.inject.Singleton;
33 import javax.management.InstanceAlreadyExistsException;
34 import javax.management.InstanceNotFoundException;
35 import javax.management.MBeanRegistrationException;
36 import javax.management.MBeanServer;
37 import javax.management.MalformedObjectNameException;
38 import javax.management.NotCompliantMBeanException;
39 import javax.management.ObjectName;
40 import org.opendaylight.infrautils.diagstatus.ServiceState;
41 import org.opendaylight.infrautils.ready.SystemReadyListener;
42 import org.opendaylight.infrautils.ready.SystemReadyMonitor;
43 import org.opendaylight.mdsal.binding.api.DataBroker;
44 import org.opendaylight.mdsal.binding.api.NotificationPublishService;
45 import org.opendaylight.mdsal.binding.api.RpcProviderService;
46 import org.opendaylight.mdsal.eos.binding.api.EntityOwnershipService;
47 import org.opendaylight.mdsal.singleton.api.ClusterSingletonServiceProvider;
48 import org.opendaylight.openflowjava.protocol.spi.connection.SwitchConnectionProvider;
49 import org.opendaylight.openflowplugin.api.openflow.FlowGroupInfoHistories;
50 import org.opendaylight.openflowplugin.api.openflow.FlowGroupInfoHistory;
51 import org.opendaylight.openflowplugin.api.openflow.OpenFlowPluginProvider;
52 import org.opendaylight.openflowplugin.api.openflow.configuration.ConfigurationService;
53 import org.opendaylight.openflowplugin.api.openflow.connection.ConnectionManager;
54 import org.opendaylight.openflowplugin.api.openflow.device.DeviceManager;
55 import org.opendaylight.openflowplugin.api.openflow.mastership.MastershipChangeServiceManager;
56 import org.opendaylight.openflowplugin.api.openflow.role.RoleManager;
57 import org.opendaylight.openflowplugin.api.openflow.rpc.RpcManager;
58 import org.opendaylight.openflowplugin.api.openflow.statistics.StatisticsManager;
59 import org.opendaylight.openflowplugin.api.openflow.statistics.ofpspecific.MessageIntelligenceAgency;
60 import org.opendaylight.openflowplugin.extension.api.ExtensionConverterProviderKeeper;
61 import org.opendaylight.openflowplugin.extension.api.ExtensionConverterRegistrator;
62 import org.opendaylight.openflowplugin.extension.api.OpenFlowPluginExtensionRegistratorProvider;
63 import org.opendaylight.openflowplugin.extension.api.core.extension.ExtensionConverterManager;
64 import org.opendaylight.openflowplugin.impl.configuration.OpenFlowProviderConfigImpl;
65 import org.opendaylight.openflowplugin.impl.connection.ConnectionManagerImpl;
66 import org.opendaylight.openflowplugin.impl.device.DeviceManagerImpl;
67 import org.opendaylight.openflowplugin.impl.device.initialization.DeviceInitializerProvider;
68 import org.opendaylight.openflowplugin.impl.device.initialization.DeviceInitializerProviderFactory;
69 import org.opendaylight.openflowplugin.impl.lifecycle.ContextChainHolderImpl;
70 import org.opendaylight.openflowplugin.impl.protocol.deserialization.DeserializerInjector;
71 import org.opendaylight.openflowplugin.impl.protocol.serialization.SerializerInjector;
72 import org.opendaylight.openflowplugin.impl.role.RoleManagerImpl;
73 import org.opendaylight.openflowplugin.impl.rpc.RpcManagerImpl;
74 import org.opendaylight.openflowplugin.impl.statistics.StatisticsManagerImpl;
75 import org.opendaylight.openflowplugin.impl.statistics.ofpspecific.MessageIntelligenceAgencyImpl;
76 import org.opendaylight.openflowplugin.impl.statistics.ofpspecific.MessageIntelligenceAgencyMXBean;
77 import org.opendaylight.openflowplugin.impl.util.ThreadPoolLoggingExecutor;
78 import org.opendaylight.openflowplugin.impl.util.TranslatorLibraryUtil;
79 import org.opendaylight.openflowplugin.openflow.md.core.extension.ExtensionConverterManagerImpl;
80 import org.opendaylight.openflowplugin.openflow.md.core.sal.convertor.ConvertorManager;
81 import org.opendaylight.openflowplugin.openflow.md.core.sal.convertor.ConvertorManagerFactory;
82 import org.opendaylight.openflowplugin.openflow.md.core.session.OFSessionUtil;
83 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeId;
84 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.openflow.provider.config.rev160510.OpenflowProviderConfig;
85 import org.slf4j.Logger;
86 import org.slf4j.LoggerFactory;
89 public class OpenFlowPluginProviderImpl implements
90 OpenFlowPluginProvider,
91 OpenFlowPluginExtensionRegistratorProvider,
92 FlowGroupInfoHistories,
95 private static final Logger LOG = LoggerFactory.getLogger(OpenFlowPluginProviderImpl.class);
97 private static final int TICKS_PER_WHEEL = 500; // 0.5 sec.
98 private static final long TICK_DURATION = 10;
99 private static final String POOL_NAME = "ofppool";
101 private static final MessageIntelligenceAgency MESSAGE_INTELLIGENCE_AGENCY = new MessageIntelligenceAgencyImpl();
102 private static final String MESSAGE_INTELLIGENCE_AGENCY_MX_BEAN_NAME = String
103 .format("%s:type=%s",
104 MessageIntelligenceAgencyMXBean.class.getPackage().getName(),
105 MessageIntelligenceAgencyMXBean.class.getSimpleName());
107 private final HashedWheelTimer hashedWheelTimer =
108 new HashedWheelTimer(TICK_DURATION, TimeUnit.MILLISECONDS, TICKS_PER_WHEEL);
109 private final NotificationPublishService notificationPublishService;
110 private final ExtensionConverterManager extensionConverterManager;
111 private final DataBroker dataBroker;
112 private final Collection<SwitchConnectionProvider> switchConnectionProviders;
113 private final DeviceInitializerProvider deviceInitializerProvider;
114 private final ConvertorManager convertorManager;
115 private final RpcProviderService rpcProviderRegistry;
116 private final ClusterSingletonServiceProvider singletonServicesProvider;
117 private final OpenflowProviderConfig config;
118 private final EntityOwnershipService entityOwnershipService;
119 private final MastershipChangeServiceManager mastershipChangeServiceManager;
120 private DeviceManager deviceManager;
121 private RpcManager rpcManager;
122 private StatisticsManager statisticsManager;
123 private RoleManager roleManager;
124 private ConnectionManager connectionManager;
125 private ExecutorService executorService;
126 private ContextChainHolderImpl contextChainHolder;
127 private final DiagStatusProvider diagStatusProvider;
128 private final SystemReadyMonitor systemReadyMonitor;
129 private final SettableFuture<Void> fullyStarted = SettableFuture.create();
130 private static final String OPENFLOW_SERVICE_NAME = "OPENFLOW";
132 public static MessageIntelligenceAgency getMessageIntelligenceAgency() {
133 return MESSAGE_INTELLIGENCE_AGENCY;
137 public OpenFlowPluginProviderImpl(final ConfigurationService configurationService,
138 final List<SwitchConnectionProvider> switchConnectionProviders,
139 final DataBroker dataBroker,
140 final RpcProviderService rpcProviderRegistry,
141 final NotificationPublishService notificationPublishService,
142 final ClusterSingletonServiceProvider singletonServiceProvider,
143 final EntityOwnershipService entityOwnershipService,
144 final MastershipChangeServiceManager mastershipChangeServiceManager,
145 final DiagStatusProvider diagStatusProvider,
146 final SystemReadyMonitor systemReadyMonitor) {
147 this.switchConnectionProviders = switchConnectionProviders;
148 this.dataBroker = new PingPongDataBroker(dataBroker);
149 this.rpcProviderRegistry = rpcProviderRegistry;
150 this.notificationPublishService = notificationPublishService;
151 singletonServicesProvider = singletonServiceProvider;
152 this.entityOwnershipService = entityOwnershipService;
153 convertorManager = ConvertorManagerFactory.createDefaultManager();
154 extensionConverterManager = new ExtensionConverterManagerImpl();
155 deviceInitializerProvider = DeviceInitializerProviderFactory.createDefaultProvider();
156 config = new OpenFlowProviderConfigImpl(configurationService);
157 this.mastershipChangeServiceManager = mastershipChangeServiceManager;
158 this.diagStatusProvider = diagStatusProvider;
159 this.systemReadyMonitor = systemReadyMonitor;
163 public void onSystemBootReady() {
164 LOG.info("onSystemBootReady() received, starting the switch connections");
165 Futures.addCallback(Futures.allAsList(switchConnectionProviders.stream().map(switchConnectionProvider -> {
166 // Inject OpenFlowPlugin custom serializers and deserializers into OpenFlowJava
167 if (config.getUseSingleLayerSerialization()) {
168 SerializerInjector.injectSerializers(switchConnectionProvider,
169 switchConnectionProvider.getConfiguration().isGroupAddModEnabled());
170 DeserializerInjector.injectDeserializers(switchConnectionProvider);
172 DeserializerInjector.revertDeserializers(switchConnectionProvider);
175 // Set handler of incoming connections and start switch connection provider
176 switchConnectionProvider.setSwitchConnectionHandler(connectionManager);
177 return switchConnectionProvider.startup();
178 }).collect(Collectors.toSet())), new FutureCallback<List<Boolean>>() {
180 public void onSuccess(final List<Boolean> result) {
181 LOG.info("All switchConnectionProviders are up and running ({}).", result.size());
182 diagStatusProvider.reportStatus(ServiceState.OPERATIONAL);
183 fullyStarted.set(null);
187 public void onFailure(final Throwable throwable) {
188 LOG.warn("Some switchConnectionProviders failed to start.", throwable);
189 diagStatusProvider.reportStatus(ServiceState.ERROR, throwable);
190 fullyStarted.setException(throwable);
192 }, MoreExecutors.directExecutor());
196 public Future<Void> getFullyStarted() {
200 private ListenableFuture<List<Boolean>> shutdownSwitchConnections() {
201 final ListenableFuture<List<Boolean>> listListenableFuture =
202 Futures.allAsList(switchConnectionProviders.stream().map(switchConnectionProvider -> {
203 // Revert deserializers to their original state
204 if (config.getUseSingleLayerSerialization()) {
205 DeserializerInjector.revertDeserializers(switchConnectionProvider);
208 // Shutdown switch connection provider
209 return switchConnectionProvider.shutdown();
210 }).collect(Collectors.toSet()));
212 Futures.addCallback(listListenableFuture, new FutureCallback<List<Boolean>>() {
214 public void onSuccess(final List<Boolean> result) {
215 LOG.info("All switchConnectionProviders were successfully shut down ({}).", result.size());
219 public void onFailure(final Throwable throwable) {
220 LOG.warn("Some switchConnectionProviders failed to shutdown.", throwable);
222 }, MoreExecutors.directExecutor());
224 return listListenableFuture;
229 public void initialize() {
230 registerMXBean(MESSAGE_INTELLIGENCE_AGENCY, MESSAGE_INTELLIGENCE_AGENCY_MX_BEAN_NAME);
232 // TODO: copied from OpenFlowPluginProvider (Helium) misusesing the old way of distributing extension converters
233 // TODO: rewrite later!
234 OFSessionUtil.getSessionManager().setExtensionConverterProvider(extensionConverterManager);
236 // Creates a thread pool that creates new threads as needed, but will reuse previously
237 // constructed threads when they are available.
238 // Threads that have not been used for x seconds are terminated and removed from the cache.
239 executorService = new ThreadPoolLoggingExecutor(
240 config.getThreadPoolMinThreads().toJava(),
241 config.getThreadPoolMaxThreads().getValue().toJava(),
242 config.getThreadPoolTimeout().toJava(),
243 TimeUnit.SECONDS, new SynchronousQueue<>(), POOL_NAME);
245 deviceManager = new DeviceManagerImpl(
248 getMessageIntelligenceAgency(),
249 notificationPublishService,
252 deviceInitializerProvider,
255 TranslatorLibraryUtil.injectBasicTranslatorLibrary(deviceManager, convertorManager);
256 ((ExtensionConverterProviderKeeper) deviceManager).setExtensionConverterProvider(extensionConverterManager);
258 rpcManager = new RpcManagerImpl(
261 extensionConverterManager,
263 notificationPublishService);
265 statisticsManager = new StatisticsManagerImpl(
271 roleManager = new RoleManagerImpl(hashedWheelTimer, config, executorService);
273 contextChainHolder = new ContextChainHolderImpl(
275 singletonServicesProvider,
276 entityOwnershipService,
277 mastershipChangeServiceManager,
280 contextChainHolder.addManager(deviceManager);
281 contextChainHolder.addManager(statisticsManager);
282 contextChainHolder.addManager(rpcManager);
283 contextChainHolder.addManager(roleManager);
285 connectionManager = new ConnectionManagerImpl(config, executorService, dataBroker, notificationPublishService);
286 connectionManager.setDeviceConnectedHandler(contextChainHolder);
287 connectionManager.setDeviceDisconnectedHandler(contextChainHolder);
289 deviceManager.setContextChainHolder(contextChainHolder);
290 deviceManager.initialize();
291 systemReadyMonitor.registerListener(this);
292 LOG.info("registered onSystemBootReady() listener for OpenFlowPluginProvider");
296 public ExtensionConverterRegistrator getExtensionConverterRegistrator() {
297 return extensionConverterManager;
301 public Map<NodeId, FlowGroupInfoHistory> getAllFlowGroupHistories() {
302 return deviceManager.getAllFlowGroupHistories();
306 public FlowGroupInfoHistory getFlowGroupHistory(final NodeId nodeId) {
307 return deviceManager.getFlowGroupHistory(nodeId);
312 @SuppressWarnings("checkstyle:IllegalCatch")
313 public void close() {
315 shutdownSwitchConnections().get(10, TimeUnit.SECONDS);
316 } catch (InterruptedException | ExecutionException | TimeoutException e) {
317 LOG.warn("Failed to shut down switch connections in time {}s", 10, e);
320 gracefulShutdown(contextChainHolder);
321 gracefulShutdown(connectionManager);
322 gracefulShutdown(deviceManager);
323 gracefulShutdown(rpcManager);
324 gracefulShutdown(statisticsManager);
325 gracefulShutdown(roleManager);
326 gracefulShutdown(executorService);
327 gracefulShutdown(hashedWheelTimer);
328 unregisterMXBean(MESSAGE_INTELLIGENCE_AGENCY_MX_BEAN_NAME);
329 diagStatusProvider.reportStatus(ServiceState.UNREGISTERED);
331 if (connectionManager != null) {
332 connectionManager.close();
333 connectionManager = null;
335 } catch (Exception e) {
336 LOG.error("Failed to close ConnectionManager", e);
340 @SuppressWarnings("checkstyle:IllegalCatch")
341 private static void gracefulShutdown(final AutoCloseable closeable) {
342 if (closeable != null) {
345 } catch (Exception e) {
346 LOG.warn("Failed to shutdown {} gracefully.", closeable);
351 private static void gracefulShutdown(final Timer timer) {
355 } catch (IllegalStateException e) {
356 LOG.warn("Failed to shutdown {} gracefully.", timer);
361 private static void gracefulShutdown(final ExecutorService executorService) {
362 if (executorService != null) {
364 executorService.shutdownNow();
365 } catch (SecurityException e) {
366 LOG.warn("Failed to shutdown {} gracefully.", executorService);
371 private static void registerMXBean(final Object bean, final String beanName) {
372 final MBeanServer mbs = ManagementFactory.getPlatformMBeanServer();
375 mbs.registerMBean(bean, new ObjectName(beanName));
376 } catch (MalformedObjectNameException
377 | NotCompliantMBeanException
378 | MBeanRegistrationException
379 | InstanceAlreadyExistsException e) {
380 LOG.warn("Error registering MBean {}", beanName, e);
384 private static void unregisterMXBean(final String beanName) {
385 final MBeanServer mbs = ManagementFactory.getPlatformMBeanServer();
388 mbs.unregisterMBean(new ObjectName(beanName));
389 } catch (InstanceNotFoundException
390 | MBeanRegistrationException
391 | MalformedObjectNameException e) {
392 LOG.warn("Error unregistering MBean {}", beanName, e);