2 * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.openflowplugin.impl;
10 import com.google.common.annotations.VisibleForTesting;
11 import com.google.common.util.concurrent.FutureCallback;
12 import com.google.common.util.concurrent.Futures;
13 import com.google.common.util.concurrent.ListenableFuture;
14 import com.google.common.util.concurrent.ListeningExecutorService;
15 import com.google.common.util.concurrent.MoreExecutors;
16 import com.google.common.util.concurrent.SettableFuture;
17 import io.netty.util.HashedWheelTimer;
18 import io.netty.util.Timer;
19 import java.lang.management.ManagementFactory;
20 import java.util.Collection;
21 import java.util.List;
22 import java.util.Objects;
23 import java.util.concurrent.ExecutionException;
24 import java.util.concurrent.ExecutorService;
25 import java.util.concurrent.Future;
26 import java.util.concurrent.SynchronousQueue;
27 import java.util.concurrent.TimeUnit;
28 import java.util.concurrent.TimeoutException;
29 import java.util.stream.Collectors;
30 import javax.annotation.Nonnull;
31 import javax.annotation.PostConstruct;
32 import javax.annotation.PreDestroy;
33 import javax.inject.Inject;
34 import javax.inject.Singleton;
35 import javax.management.InstanceAlreadyExistsException;
36 import javax.management.InstanceNotFoundException;
37 import javax.management.MBeanRegistrationException;
38 import javax.management.MBeanServer;
39 import javax.management.MalformedObjectNameException;
40 import javax.management.NotCompliantMBeanException;
41 import javax.management.ObjectName;
42 import org.apache.aries.blueprint.annotation.service.Reference;
43 import org.apache.aries.blueprint.annotation.service.Service;
44 import org.opendaylight.controller.md.sal.binding.api.DataBroker;
45 import org.opendaylight.controller.md.sal.binding.api.NotificationPublishService;
46 import org.opendaylight.controller.sal.binding.api.RpcProviderRegistry;
47 import org.opendaylight.infrautils.diagstatus.ServiceState;
48 import org.opendaylight.infrautils.ready.SystemReadyListener;
49 import org.opendaylight.infrautils.ready.SystemReadyMonitor;
50 import org.opendaylight.mdsal.eos.binding.api.EntityOwnershipService;
51 import org.opendaylight.mdsal.singleton.common.api.ClusterSingletonServiceProvider;
52 import org.opendaylight.openflowjava.protocol.spi.connection.SwitchConnectionProvider;
53 import org.opendaylight.openflowjava.protocol.spi.connection.SwitchConnectionProviderList;
54 import org.opendaylight.openflowplugin.api.openflow.OpenFlowPluginProvider;
55 import org.opendaylight.openflowplugin.api.openflow.configuration.ConfigurationService;
56 import org.opendaylight.openflowplugin.api.openflow.connection.ConnectionManager;
57 import org.opendaylight.openflowplugin.api.openflow.device.DeviceManager;
58 import org.opendaylight.openflowplugin.api.openflow.mastership.MastershipChangeServiceManager;
59 import org.opendaylight.openflowplugin.api.openflow.role.RoleManager;
60 import org.opendaylight.openflowplugin.api.openflow.rpc.RpcManager;
61 import org.opendaylight.openflowplugin.api.openflow.statistics.StatisticsManager;
62 import org.opendaylight.openflowplugin.api.openflow.statistics.ofpspecific.MessageIntelligenceAgency;
63 import org.opendaylight.openflowplugin.extension.api.ExtensionConverterProviderKeeper;
64 import org.opendaylight.openflowplugin.extension.api.ExtensionConverterRegistrator;
65 import org.opendaylight.openflowplugin.extension.api.OpenFlowPluginExtensionRegistratorProvider;
66 import org.opendaylight.openflowplugin.extension.api.core.extension.ExtensionConverterManager;
67 import org.opendaylight.openflowplugin.impl.configuration.OpenFlowProviderConfigImpl;
68 import org.opendaylight.openflowplugin.impl.connection.ConnectionManagerImpl;
69 import org.opendaylight.openflowplugin.impl.device.DeviceManagerImpl;
70 import org.opendaylight.openflowplugin.impl.device.initialization.DeviceInitializerProvider;
71 import org.opendaylight.openflowplugin.impl.device.initialization.DeviceInitializerProviderFactory;
72 import org.opendaylight.openflowplugin.impl.lifecycle.ContextChainHolderImpl;
73 import org.opendaylight.openflowplugin.impl.protocol.deserialization.DeserializerInjector;
74 import org.opendaylight.openflowplugin.impl.protocol.serialization.SerializerInjector;
75 import org.opendaylight.openflowplugin.impl.role.RoleManagerImpl;
76 import org.opendaylight.openflowplugin.impl.rpc.RpcManagerImpl;
77 import org.opendaylight.openflowplugin.impl.statistics.StatisticsManagerImpl;
78 import org.opendaylight.openflowplugin.impl.statistics.ofpspecific.MessageIntelligenceAgencyImpl;
79 import org.opendaylight.openflowplugin.impl.statistics.ofpspecific.MessageIntelligenceAgencyMXBean;
80 import org.opendaylight.openflowplugin.impl.util.ThreadPoolLoggingExecutor;
81 import org.opendaylight.openflowplugin.impl.util.TranslatorLibraryUtil;
82 import org.opendaylight.openflowplugin.openflow.md.core.extension.ExtensionConverterManagerImpl;
83 import org.opendaylight.openflowplugin.openflow.md.core.sal.convertor.ConvertorManager;
84 import org.opendaylight.openflowplugin.openflow.md.core.sal.convertor.ConvertorManagerFactory;
85 import org.opendaylight.openflowplugin.openflow.md.core.session.OFSessionUtil;
86 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.openflow.provider.config.rev160510.OpenflowProviderConfig;
87 import org.slf4j.Logger;
88 import org.slf4j.LoggerFactory;
91 @Service(classes = { OpenFlowPluginProvider.class, OpenFlowPluginExtensionRegistratorProvider.class })
92 public class OpenFlowPluginProviderImpl implements
93 OpenFlowPluginProvider,
94 OpenFlowPluginExtensionRegistratorProvider,
97 private static final Logger LOG = LoggerFactory.getLogger(OpenFlowPluginProviderImpl.class);
99 private static final int TICKS_PER_WHEEL = 500; // 0.5 sec.
100 private static final long TICK_DURATION = 10;
101 private static final String POOL_NAME = "ofppool";
103 private static final MessageIntelligenceAgency MESSAGE_INTELLIGENCE_AGENCY = new MessageIntelligenceAgencyImpl();
104 private static final String MESSAGE_INTELLIGENCE_AGENCY_MX_BEAN_NAME = String
105 .format("%s:type=%s",
106 MessageIntelligenceAgencyMXBean.class.getPackage().getName(),
107 MessageIntelligenceAgencyMXBean.class.getSimpleName());
109 private final HashedWheelTimer hashedWheelTimer =
110 new HashedWheelTimer(TICK_DURATION, TimeUnit.MILLISECONDS, TICKS_PER_WHEEL);
111 private final NotificationPublishService notificationPublishService;
112 private final ExtensionConverterManager extensionConverterManager;
113 private final DataBroker dataBroker;
114 private final Collection<SwitchConnectionProvider> switchConnectionProviders;
115 private final DeviceInitializerProvider deviceInitializerProvider;
116 private final ConvertorManager convertorManager;
117 private final RpcProviderRegistry rpcProviderRegistry;
118 private final ClusterSingletonServiceProvider singletonServicesProvider;
119 private final OpenflowProviderConfig config;
120 private final EntityOwnershipService entityOwnershipService;
121 private final MastershipChangeServiceManager mastershipChangeServiceManager;
122 private DeviceManager deviceManager;
123 private RpcManager rpcManager;
124 private StatisticsManager statisticsManager;
125 private RoleManager roleManager;
126 private ConnectionManager connectionManager;
127 private ListeningExecutorService executorService;
128 private ContextChainHolderImpl contextChainHolder;
129 private final OpenflowPluginDiagStatusProvider openflowPluginStatusMonitor;
130 private final SettableFuture<Void> fullyStarted = SettableFuture.create();
132 public static MessageIntelligenceAgency getMessageIntelligenceAgency() {
133 return MESSAGE_INTELLIGENCE_AGENCY;
137 public OpenFlowPluginProviderImpl(final ConfigurationService configurationService,
138 final SwitchConnectionProviderList switchConnectionProviders,
139 final PingPongDataBroker pingPongDataBroker,
140 final @Reference RpcProviderRegistry rpcProviderRegistry,
141 final @Reference NotificationPublishService notificationPublishService,
142 final @Reference ClusterSingletonServiceProvider singletonServiceProvider,
143 final @Reference EntityOwnershipService entityOwnershipService,
144 final MastershipChangeServiceManager mastershipChangeServiceManager,
145 final OpenflowPluginDiagStatusProvider openflowPluginStatusMonitor,
146 final @Reference SystemReadyMonitor systemReadyMonitor) {
147 this.switchConnectionProviders = switchConnectionProviders;
148 this.dataBroker = pingPongDataBroker;
149 this.rpcProviderRegistry = rpcProviderRegistry;
150 this.notificationPublishService = notificationPublishService;
151 this.singletonServicesProvider = singletonServiceProvider;
152 this.entityOwnershipService = entityOwnershipService;
153 convertorManager = ConvertorManagerFactory.createDefaultManager();
154 extensionConverterManager = new ExtensionConverterManagerImpl();
155 deviceInitializerProvider = DeviceInitializerProviderFactory.createDefaultProvider();
156 config = new OpenFlowProviderConfigImpl(configurationService);
157 this.mastershipChangeServiceManager = mastershipChangeServiceManager;
158 this.openflowPluginStatusMonitor = openflowPluginStatusMonitor;
159 systemReadyMonitor.registerListener(this);
160 LOG.info("registered onSystemBootReady() listener for deferred startSwitchConnections()");
164 public void onSystemBootReady() {
165 LOG.info("onSystemBootReady() received, starting the switch connections");
166 Futures.addCallback(Futures.allAsList(switchConnectionProviders.stream().map(switchConnectionProvider -> {
167 // Inject OpenFlowPlugin custom serializers and deserializers into OpenFlowJava
168 if (config.isUseSingleLayerSerialization()) {
169 SerializerInjector.injectSerializers(switchConnectionProvider,
170 switchConnectionProvider.getConfiguration().isGroupAddModEnabled());
171 DeserializerInjector.injectDeserializers(switchConnectionProvider);
173 DeserializerInjector.revertDeserializers(switchConnectionProvider);
176 // Set handler of incoming connections and start switch connection provider
177 switchConnectionProvider.setSwitchConnectionHandler(connectionManager);
178 return switchConnectionProvider.startup();
179 }).collect(Collectors.toSet())), new FutureCallback<List<Boolean>>() {
181 public void onSuccess(@Nonnull final List<Boolean> result) {
182 LOG.info("All switchConnectionProviders are up and running ({}).", result.size());
183 openflowPluginStatusMonitor.reportStatus(ServiceState.OPERATIONAL);
184 fullyStarted.set(null);
188 public void onFailure(@Nonnull final Throwable throwable) {
189 LOG.warn("Some switchConnectionProviders failed to start.", throwable);
190 openflowPluginStatusMonitor.reportStatus(ServiceState.ERROR, throwable);
191 fullyStarted.setException(throwable);
193 }, MoreExecutors.directExecutor());
197 public Future<Void> getFullyStarted() {
201 private ListenableFuture<List<Boolean>> shutdownSwitchConnections() {
202 final ListenableFuture<List<Boolean>> listListenableFuture =
203 Futures.allAsList(switchConnectionProviders.stream().map(switchConnectionProvider -> {
204 // Revert deserializers to their original state
205 if (config.isUseSingleLayerSerialization()) {
206 DeserializerInjector.revertDeserializers(switchConnectionProvider);
209 // Shutdown switch connection provider
210 return switchConnectionProvider.shutdown();
211 }).collect(Collectors.toSet()));
213 Futures.addCallback(listListenableFuture, new FutureCallback<List<Boolean>>() {
215 public void onSuccess(@Nonnull final List<Boolean> result) {
216 LOG.info("All switchConnectionProviders were successfully shut down ({}).", result.size());
220 public void onFailure(@Nonnull final Throwable throwable) {
221 LOG.warn("Some switchConnectionProviders failed to shutdown.", throwable);
223 }, MoreExecutors.directExecutor());
225 return listListenableFuture;
230 public void initialize() {
231 registerMXBean(MESSAGE_INTELLIGENCE_AGENCY, MESSAGE_INTELLIGENCE_AGENCY_MX_BEAN_NAME);
233 // TODO: copied from OpenFlowPluginProvider (Helium) misusesing the old way of distributing extension converters
234 // TODO: rewrite later!
235 OFSessionUtil.getSessionManager().setExtensionConverterProvider(extensionConverterManager);
237 // Creates a thread pool that creates new threads as needed, but will reuse previously
238 // constructed threads when they are available.
239 // Threads that have not been used for x seconds are terminated and removed from the cache.
240 executorService = MoreExecutors.listeningDecorator(new ThreadPoolLoggingExecutor(
241 config.getThreadPoolMinThreads(),
242 config.getThreadPoolMaxThreads().getValue(),
243 config.getThreadPoolTimeout(),
244 TimeUnit.SECONDS, new SynchronousQueue<>(), POOL_NAME));
246 deviceManager = new DeviceManagerImpl(
249 getMessageIntelligenceAgency(),
250 notificationPublishService,
253 deviceInitializerProvider);
255 TranslatorLibraryUtil.injectBasicTranslatorLibrary(deviceManager, convertorManager);
256 ((ExtensionConverterProviderKeeper) deviceManager).setExtensionConverterProvider(extensionConverterManager);
258 rpcManager = new RpcManagerImpl(
261 extensionConverterManager,
263 notificationPublishService);
265 statisticsManager = new StatisticsManagerImpl(
271 roleManager = new RoleManagerImpl(hashedWheelTimer, config);
273 contextChainHolder = new ContextChainHolderImpl(
275 singletonServicesProvider,
276 entityOwnershipService,
277 mastershipChangeServiceManager);
279 contextChainHolder.addManager(deviceManager);
280 contextChainHolder.addManager(statisticsManager);
281 contextChainHolder.addManager(rpcManager);
282 contextChainHolder.addManager(roleManager);
284 connectionManager = new ConnectionManagerImpl(config, executorService);
285 connectionManager.setDeviceConnectedHandler(contextChainHolder);
286 connectionManager.setDeviceDisconnectedHandler(contextChainHolder);
288 deviceManager.setContextChainHolder(contextChainHolder);
289 deviceManager.initialize();
293 public ExtensionConverterRegistrator getExtensionConverterRegistrator() {
294 return extensionConverterManager;
299 public void close() {
301 shutdownSwitchConnections().get(10, TimeUnit.SECONDS);
302 } catch (InterruptedException | ExecutionException | TimeoutException e) {
303 LOG.warn("Failed to shut down switch connections in time {}s", 10, e);
306 gracefulShutdown(contextChainHolder);
307 gracefulShutdown(deviceManager);
308 gracefulShutdown(rpcManager);
309 gracefulShutdown(statisticsManager);
310 gracefulShutdown(roleManager);
311 gracefulShutdown(executorService);
312 gracefulShutdown(hashedWheelTimer);
313 unregisterMXBean(MESSAGE_INTELLIGENCE_AGENCY_MX_BEAN_NAME);
314 openflowPluginStatusMonitor.reportStatus(ServiceState.UNREGISTERED);
317 @SuppressWarnings("checkstyle:IllegalCatch")
318 private static void gracefulShutdown(final AutoCloseable closeable) {
319 if (Objects.isNull(closeable)) {
325 } catch (Exception e) {
326 LOG.warn("Failed to shutdown {} gracefully.", closeable);
330 private static void gracefulShutdown(final Timer timer) {
331 if (Objects.isNull(timer)) {
337 } catch (IllegalStateException e) {
338 LOG.warn("Failed to shutdown {} gracefully.", timer);
342 private static void gracefulShutdown(final ExecutorService executorService) {
343 if (Objects.isNull(executorService)) {
348 executorService.shutdownNow();
349 } catch (SecurityException e) {
350 LOG.warn("Failed to shutdown {} gracefully.", executorService);
354 private static void registerMXBean(final Object bean, final String beanName) {
355 final MBeanServer mbs = ManagementFactory.getPlatformMBeanServer();
358 mbs.registerMBean(bean, new ObjectName(beanName));
359 } catch (MalformedObjectNameException
360 | NotCompliantMBeanException
361 | MBeanRegistrationException
362 | InstanceAlreadyExistsException e) {
363 LOG.warn("Error registering MBean {}", e);
367 private static void unregisterMXBean(final String beanName) {
368 final MBeanServer mbs = ManagementFactory.getPlatformMBeanServer();
371 mbs.unregisterMBean(new ObjectName(beanName));
372 } catch (InstanceNotFoundException
373 | MBeanRegistrationException
374 | MalformedObjectNameException e) {
375 LOG.warn("Error unregistering MBean {}", e);