2 * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
9 package org.opendaylight.openflowplugin.impl;
11 import com.google.common.util.concurrent.FutureCallback;
12 import com.google.common.util.concurrent.Futures;
13 import com.google.common.util.concurrent.ListenableFuture;
14 import com.google.common.util.concurrent.ListeningExecutorService;
15 import com.google.common.util.concurrent.MoreExecutors;
16 import io.netty.util.HashedWheelTimer;
17 import io.netty.util.Timer;
18 import java.lang.management.ManagementFactory;
19 import java.util.Collection;
20 import java.util.List;
21 import java.util.Objects;
22 import java.util.concurrent.ExecutionException;
23 import java.util.concurrent.ExecutorService;
24 import java.util.concurrent.SynchronousQueue;
25 import java.util.concurrent.TimeUnit;
26 import java.util.concurrent.TimeoutException;
27 import java.util.stream.Collectors;
28 import javax.annotation.Nonnull;
29 import javax.management.InstanceAlreadyExistsException;
30 import javax.management.InstanceNotFoundException;
31 import javax.management.MBeanRegistrationException;
32 import javax.management.MBeanServer;
33 import javax.management.MalformedObjectNameException;
34 import javax.management.NotCompliantMBeanException;
35 import javax.management.ObjectName;
36 import org.opendaylight.controller.md.sal.binding.api.DataBroker;
37 import org.opendaylight.controller.md.sal.binding.api.NotificationPublishService;
38 import org.opendaylight.controller.sal.binding.api.RpcProviderRegistry;
39 import org.opendaylight.infrautils.diagstatus.ServiceState;
40 import org.opendaylight.infrautils.ready.SystemReadyListener;
41 import org.opendaylight.infrautils.ready.SystemReadyMonitor;
42 import org.opendaylight.mdsal.eos.binding.api.EntityOwnershipService;
43 import org.opendaylight.mdsal.singleton.common.api.ClusterSingletonServiceProvider;
44 import org.opendaylight.openflowjava.protocol.spi.connection.SwitchConnectionProvider;
45 import org.opendaylight.openflowplugin.api.diagstatus.OpenflowPluginDiagStatusProvider;
46 import org.opendaylight.openflowplugin.api.openflow.OpenFlowPluginProvider;
47 import org.opendaylight.openflowplugin.api.openflow.configuration.ConfigurationService;
48 import org.opendaylight.openflowplugin.api.openflow.connection.ConnectionManager;
49 import org.opendaylight.openflowplugin.api.openflow.device.DeviceManager;
50 import org.opendaylight.openflowplugin.api.openflow.mastership.MastershipChangeServiceManager;
51 import org.opendaylight.openflowplugin.api.openflow.role.RoleManager;
52 import org.opendaylight.openflowplugin.api.openflow.rpc.RpcManager;
53 import org.opendaylight.openflowplugin.api.openflow.statistics.StatisticsManager;
54 import org.opendaylight.openflowplugin.api.openflow.statistics.ofpspecific.MessageIntelligenceAgency;
55 import org.opendaylight.openflowplugin.extension.api.ExtensionConverterProviderKeeper;
56 import org.opendaylight.openflowplugin.extension.api.ExtensionConverterRegistrator;
57 import org.opendaylight.openflowplugin.extension.api.OpenFlowPluginExtensionRegistratorProvider;
58 import org.opendaylight.openflowplugin.extension.api.core.extension.ExtensionConverterManager;
59 import org.opendaylight.openflowplugin.impl.configuration.OpenFlowProviderConfigImpl;
60 import org.opendaylight.openflowplugin.impl.connection.ConnectionManagerImpl;
61 import org.opendaylight.openflowplugin.impl.device.DeviceManagerImpl;
62 import org.opendaylight.openflowplugin.impl.device.initialization.DeviceInitializerProvider;
63 import org.opendaylight.openflowplugin.impl.device.initialization.DeviceInitializerProviderFactory;
64 import org.opendaylight.openflowplugin.impl.lifecycle.ContextChainHolderImpl;
65 import org.opendaylight.openflowplugin.impl.protocol.deserialization.DeserializerInjector;
66 import org.opendaylight.openflowplugin.impl.protocol.serialization.SerializerInjector;
67 import org.opendaylight.openflowplugin.impl.role.RoleManagerImpl;
68 import org.opendaylight.openflowplugin.impl.rpc.RpcManagerImpl;
69 import org.opendaylight.openflowplugin.impl.statistics.StatisticsManagerImpl;
70 import org.opendaylight.openflowplugin.impl.statistics.ofpspecific.MessageIntelligenceAgencyImpl;
71 import org.opendaylight.openflowplugin.impl.statistics.ofpspecific.MessageIntelligenceAgencyMXBean;
72 import org.opendaylight.openflowplugin.impl.util.ThreadPoolLoggingExecutor;
73 import org.opendaylight.openflowplugin.impl.util.TranslatorLibraryUtil;
74 import org.opendaylight.openflowplugin.openflow.md.core.extension.ExtensionConverterManagerImpl;
75 import org.opendaylight.openflowplugin.openflow.md.core.sal.convertor.ConvertorManager;
76 import org.opendaylight.openflowplugin.openflow.md.core.sal.convertor.ConvertorManagerFactory;
77 import org.opendaylight.openflowplugin.openflow.md.core.session.OFSessionUtil;
78 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.openflow.provider.config.rev160510.OpenflowProviderConfig;
79 import org.slf4j.Logger;
80 import org.slf4j.LoggerFactory;
82 public class OpenFlowPluginProviderImpl implements
83 OpenFlowPluginProvider,
84 OpenFlowPluginExtensionRegistratorProvider,
87 private static final Logger LOG = LoggerFactory.getLogger(OpenFlowPluginProviderImpl.class);
89 private static final int TICKS_PER_WHEEL = 500; // 0.5 sec.
90 private static final long TICK_DURATION = 10;
91 private static final String POOL_NAME = "ofppool";
93 private static final MessageIntelligenceAgency MESSAGE_INTELLIGENCE_AGENCY = new MessageIntelligenceAgencyImpl();
94 private static final String MESSAGE_INTELLIGENCE_AGENCY_MX_BEAN_NAME = String
96 MessageIntelligenceAgencyMXBean.class.getPackage().getName(),
97 MessageIntelligenceAgencyMXBean.class.getSimpleName());
99 private final HashedWheelTimer hashedWheelTimer =
100 new HashedWheelTimer(TICK_DURATION, TimeUnit.MILLISECONDS, TICKS_PER_WHEEL);
101 private final NotificationPublishService notificationPublishService;
102 private final ExtensionConverterManager extensionConverterManager;
103 private final DataBroker dataBroker;
104 private final Collection<SwitchConnectionProvider> switchConnectionProviders;
105 private final DeviceInitializerProvider deviceInitializerProvider;
106 private final ConvertorManager convertorManager;
107 private final RpcProviderRegistry rpcProviderRegistry;
108 private final ClusterSingletonServiceProvider singletonServicesProvider;
109 private final OpenflowProviderConfig config;
110 private final EntityOwnershipService entityOwnershipService;
111 private final MastershipChangeServiceManager mastershipChangeServiceManager;
112 private DeviceManager deviceManager;
113 private RpcManager rpcManager;
114 private StatisticsManager statisticsManager;
115 private RoleManager roleManager;
116 private ConnectionManager connectionManager;
117 private ListeningExecutorService executorService;
118 private ContextChainHolderImpl contextChainHolder;
119 private final OpenflowPluginDiagStatusProvider openflowPluginStatusMonitor;
121 public static MessageIntelligenceAgency getMessageIntelligenceAgency() {
122 return MESSAGE_INTELLIGENCE_AGENCY;
125 OpenFlowPluginProviderImpl(final ConfigurationService configurationService,
126 final List<SwitchConnectionProvider> switchConnectionProviders,
127 final DataBroker dataBroker,
128 final RpcProviderRegistry rpcProviderRegistry,
129 final NotificationPublishService notificationPublishService,
130 final ClusterSingletonServiceProvider singletonServiceProvider,
131 final EntityOwnershipService entityOwnershipService,
132 final MastershipChangeServiceManager mastershipChangeServiceManager,
133 final OpenflowPluginDiagStatusProvider openflowPluginStatusMonitor,
134 final SystemReadyMonitor systemReadyMonitor) {
135 this.switchConnectionProviders = switchConnectionProviders;
136 this.dataBroker = dataBroker;
137 this.rpcProviderRegistry = rpcProviderRegistry;
138 this.notificationPublishService = notificationPublishService;
139 this.singletonServicesProvider = singletonServiceProvider;
140 this.entityOwnershipService = entityOwnershipService;
141 convertorManager = ConvertorManagerFactory.createDefaultManager();
142 extensionConverterManager = new ExtensionConverterManagerImpl();
143 deviceInitializerProvider = DeviceInitializerProviderFactory.createDefaultProvider();
144 config = new OpenFlowProviderConfigImpl(configurationService);
145 this.mastershipChangeServiceManager = mastershipChangeServiceManager;
146 this.openflowPluginStatusMonitor = openflowPluginStatusMonitor;
147 systemReadyMonitor.registerListener(this);
148 LOG.debug("registered onSystemBootReady() listener for deferred startSwitchConnections()");
152 public void onSystemBootReady() {
153 LOG.debug("onSystemBootReady() received, starting the switch connections");
154 startSwitchConnections();
157 private void startSwitchConnections() {
158 Futures.addCallback(Futures.allAsList(switchConnectionProviders.stream().map(switchConnectionProvider -> {
159 // Inject OpenFlowPlugin custom serializers and deserializers into OpenFlowJava
160 if (config.isUseSingleLayerSerialization()) {
161 SerializerInjector.injectSerializers(switchConnectionProvider,
162 switchConnectionProvider.getConfiguration().isGroupAddModEnabled());
163 DeserializerInjector.injectDeserializers(switchConnectionProvider);
165 DeserializerInjector.revertDeserializers(switchConnectionProvider);
168 // Set handler of incoming connections and start switch connection provider
169 switchConnectionProvider.setSwitchConnectionHandler(connectionManager);
170 return switchConnectionProvider.startup();
171 }).collect(Collectors.toSet())), new FutureCallback<List<Boolean>>() {
173 public void onSuccess(@Nonnull final List<Boolean> result) {
174 LOG.info("All switchConnectionProviders are up and running ({}).", result.size());
175 openflowPluginStatusMonitor.reportStatus(ServiceState.OPERATIONAL);
179 public void onFailure(@Nonnull final Throwable throwable) {
180 LOG.warn("Some switchConnectionProviders failed to start.", throwable);
181 openflowPluginStatusMonitor.reportStatus(ServiceState.ERROR, throwable);
183 }, MoreExecutors.directExecutor());
186 private ListenableFuture<List<Boolean>> shutdownSwitchConnections() {
187 final ListenableFuture<List<Boolean>> listListenableFuture =
188 Futures.allAsList(switchConnectionProviders.stream().map(switchConnectionProvider -> {
189 // Revert deserializers to their original state
190 if (config.isUseSingleLayerSerialization()) {
191 DeserializerInjector.revertDeserializers(switchConnectionProvider);
194 // Shutdown switch connection provider
195 return switchConnectionProvider.shutdown();
196 }).collect(Collectors.toSet()));
198 Futures.addCallback(listListenableFuture, new FutureCallback<List<Boolean>>() {
200 public void onSuccess(@Nonnull final List<Boolean> result) {
201 LOG.info("All switchConnectionProviders were successfully shut down ({}).", result.size());
205 public void onFailure(@Nonnull final Throwable throwable) {
206 LOG.warn("Some switchConnectionProviders failed to shutdown.", throwable);
208 }, MoreExecutors.directExecutor());
210 return listListenableFuture;
214 public void initialize() {
215 registerMXBean(MESSAGE_INTELLIGENCE_AGENCY, MESSAGE_INTELLIGENCE_AGENCY_MX_BEAN_NAME);
217 // TODO: copied from OpenFlowPluginProvider (Helium) misusesing the old way of distributing extension converters
218 // TODO: rewrite later!
219 OFSessionUtil.getSessionManager().setExtensionConverterProvider(extensionConverterManager);
221 // Creates a thread pool that creates new threads as needed, but will reuse previously
222 // constructed threads when they are available.
223 // Threads that have not been used for x seconds are terminated and removed from the cache.
224 executorService = MoreExecutors.listeningDecorator(new ThreadPoolLoggingExecutor(
225 config.getThreadPoolMinThreads(),
226 config.getThreadPoolMaxThreads().getValue(),
227 config.getThreadPoolTimeout(),
228 TimeUnit.SECONDS, new SynchronousQueue<>(), POOL_NAME));
230 deviceManager = new DeviceManagerImpl(
233 getMessageIntelligenceAgency(),
234 notificationPublishService,
237 deviceInitializerProvider);
239 TranslatorLibraryUtil.injectBasicTranslatorLibrary(deviceManager, convertorManager);
240 ((ExtensionConverterProviderKeeper) deviceManager).setExtensionConverterProvider(extensionConverterManager);
242 rpcManager = new RpcManagerImpl(
245 extensionConverterManager,
247 notificationPublishService);
249 statisticsManager = new StatisticsManagerImpl(
255 roleManager = new RoleManagerImpl(hashedWheelTimer, config);
257 contextChainHolder = new ContextChainHolderImpl(
259 singletonServicesProvider,
260 entityOwnershipService,
261 mastershipChangeServiceManager);
263 contextChainHolder.addManager(deviceManager);
264 contextChainHolder.addManager(statisticsManager);
265 contextChainHolder.addManager(rpcManager);
266 contextChainHolder.addManager(roleManager);
268 connectionManager = new ConnectionManagerImpl(config, executorService);
269 connectionManager.setDeviceConnectedHandler(contextChainHolder);
270 connectionManager.setDeviceDisconnectedHandler(contextChainHolder);
272 deviceManager.setContextChainHolder(contextChainHolder);
273 deviceManager.initialize();
277 public ExtensionConverterRegistrator getExtensionConverterRegistrator() {
278 return extensionConverterManager;
282 public void close() {
284 shutdownSwitchConnections().get(10, TimeUnit.SECONDS);
285 } catch (InterruptedException | ExecutionException | TimeoutException e) {
286 LOG.warn("Failed to shut down switch connections in time {}s, error: {}", 10, e);
289 gracefulShutdown(contextChainHolder);
290 gracefulShutdown(deviceManager);
291 gracefulShutdown(rpcManager);
292 gracefulShutdown(statisticsManager);
293 gracefulShutdown(roleManager);
294 gracefulShutdown(executorService);
295 gracefulShutdown(hashedWheelTimer);
296 unregisterMXBean(MESSAGE_INTELLIGENCE_AGENCY_MX_BEAN_NAME);
297 openflowPluginStatusMonitor.reportStatus(ServiceState.UNREGISTERED);
300 @SuppressWarnings("checkstyle:IllegalCatch")
301 private static void gracefulShutdown(final AutoCloseable closeable) {
302 if (Objects.isNull(closeable)) {
308 } catch (Exception e) {
309 LOG.warn("Failed to shutdown {} gracefully.", closeable);
313 private static void gracefulShutdown(final Timer timer) {
314 if (Objects.isNull(timer)) {
320 } catch (IllegalStateException e) {
321 LOG.warn("Failed to shutdown {} gracefully.", timer);
325 private static void gracefulShutdown(final ExecutorService executorService) {
326 if (Objects.isNull(executorService)) {
331 executorService.shutdownNow();
332 } catch (SecurityException e) {
333 LOG.warn("Failed to shutdown {} gracefully.", executorService);
337 private static void registerMXBean(final Object bean, final String beanName) {
338 final MBeanServer mbs = ManagementFactory.getPlatformMBeanServer();
341 mbs.registerMBean(bean, new ObjectName(beanName));
342 } catch (MalformedObjectNameException
343 | NotCompliantMBeanException
344 | MBeanRegistrationException
345 | InstanceAlreadyExistsException e) {
346 LOG.warn("Error registering MBean {}", e);
350 private static void unregisterMXBean(final String beanName) {
351 final MBeanServer mbs = ManagementFactory.getPlatformMBeanServer();
354 mbs.unregisterMBean(new ObjectName(beanName));
355 } catch (InstanceNotFoundException
356 | MBeanRegistrationException
357 | MalformedObjectNameException e) {
358 LOG.warn("Error unregistering MBean {}", e);