2 * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
9 package org.opendaylight.openflowplugin.impl;
11 import com.google.common.util.concurrent.FutureCallback;
12 import com.google.common.util.concurrent.Futures;
13 import com.google.common.util.concurrent.ListenableFuture;
14 import com.google.common.util.concurrent.ListeningExecutorService;
15 import com.google.common.util.concurrent.MoreExecutors;
16 import io.netty.util.HashedWheelTimer;
17 import io.netty.util.Timer;
18 import java.lang.management.ManagementFactory;
19 import java.util.Collection;
20 import java.util.List;
21 import java.util.Objects;
22 import java.util.concurrent.ExecutionException;
23 import java.util.concurrent.ExecutorService;
24 import java.util.concurrent.SynchronousQueue;
25 import java.util.concurrent.TimeUnit;
26 import java.util.concurrent.TimeoutException;
27 import java.util.stream.Collectors;
28 import javax.annotation.Nonnull;
29 import javax.management.InstanceAlreadyExistsException;
30 import javax.management.InstanceNotFoundException;
31 import javax.management.MBeanRegistrationException;
32 import javax.management.MBeanServer;
33 import javax.management.MalformedObjectNameException;
34 import javax.management.NotCompliantMBeanException;
35 import javax.management.ObjectName;
36 import org.opendaylight.controller.md.sal.binding.api.DataBroker;
37 import org.opendaylight.controller.md.sal.binding.api.NotificationPublishService;
38 import org.opendaylight.controller.sal.binding.api.RpcProviderRegistry;
39 import org.opendaylight.infrautils.diagstatus.ServiceState;
40 import org.opendaylight.infrautils.ready.SystemReadyListener;
41 import org.opendaylight.infrautils.ready.SystemReadyMonitor;
42 import org.opendaylight.mdsal.eos.binding.api.EntityOwnershipService;
43 import org.opendaylight.mdsal.singleton.common.api.ClusterSingletonServiceProvider;
44 import org.opendaylight.openflowjava.protocol.spi.connection.SwitchConnectionProvider;
45 import org.opendaylight.openflowplugin.api.diagstatus.OpenflowPluginDiagStatusProvider;
46 import org.opendaylight.openflowplugin.api.openflow.OpenFlowPluginProvider;
47 import org.opendaylight.openflowplugin.api.openflow.configuration.ConfigurationService;
48 import org.opendaylight.openflowplugin.api.openflow.connection.ConnectionManager;
49 import org.opendaylight.openflowplugin.api.openflow.device.DeviceManager;
50 import org.opendaylight.openflowplugin.api.openflow.mastership.MastershipChangeServiceManager;
51 import org.opendaylight.openflowplugin.api.openflow.role.RoleManager;
52 import org.opendaylight.openflowplugin.api.openflow.rpc.RpcManager;
53 import org.opendaylight.openflowplugin.api.openflow.statistics.StatisticsManager;
54 import org.opendaylight.openflowplugin.api.openflow.statistics.ofpspecific.MessageIntelligenceAgency;
55 import org.opendaylight.openflowplugin.extension.api.ExtensionConverterProviderKeeper;
56 import org.opendaylight.openflowplugin.extension.api.ExtensionConverterRegistrator;
57 import org.opendaylight.openflowplugin.extension.api.OpenFlowPluginExtensionRegistratorProvider;
58 import org.opendaylight.openflowplugin.extension.api.core.extension.ExtensionConverterManager;
59 import org.opendaylight.openflowplugin.impl.configuration.OpenFlowProviderConfigImpl;
60 import org.opendaylight.openflowplugin.impl.connection.ConnectionManagerImpl;
61 import org.opendaylight.openflowplugin.impl.device.DeviceManagerImpl;
62 import org.opendaylight.openflowplugin.impl.device.initialization.DeviceInitializerProvider;
63 import org.opendaylight.openflowplugin.impl.device.initialization.DeviceInitializerProviderFactory;
64 import org.opendaylight.openflowplugin.impl.lifecycle.ContextChainHolderImpl;
65 import org.opendaylight.openflowplugin.impl.protocol.deserialization.DeserializerInjector;
66 import org.opendaylight.openflowplugin.impl.protocol.serialization.SerializerInjector;
67 import org.opendaylight.openflowplugin.impl.role.RoleManagerImpl;
68 import org.opendaylight.openflowplugin.impl.rpc.RpcManagerImpl;
69 import org.opendaylight.openflowplugin.impl.statistics.StatisticsManagerImpl;
70 import org.opendaylight.openflowplugin.impl.statistics.ofpspecific.MessageIntelligenceAgencyImpl;
71 import org.opendaylight.openflowplugin.impl.statistics.ofpspecific.MessageIntelligenceAgencyMXBean;
72 import org.opendaylight.openflowplugin.impl.util.ThreadPoolLoggingExecutor;
73 import org.opendaylight.openflowplugin.impl.util.TranslatorLibraryUtil;
74 import org.opendaylight.openflowplugin.openflow.md.core.extension.ExtensionConverterManagerImpl;
75 import org.opendaylight.openflowplugin.openflow.md.core.sal.convertor.ConvertorManager;
76 import org.opendaylight.openflowplugin.openflow.md.core.sal.convertor.ConvertorManagerFactory;
77 import org.opendaylight.openflowplugin.openflow.md.core.session.OFSessionUtil;
78 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.openflow.provider.config.rev160510.OpenflowProviderConfig;
79 import org.slf4j.Logger;
80 import org.slf4j.LoggerFactory;
82 public class OpenFlowPluginProviderImpl implements
83 OpenFlowPluginProvider,
84 OpenFlowPluginExtensionRegistratorProvider,
87 private static final Logger LOG = LoggerFactory.getLogger(OpenFlowPluginProviderImpl.class);
89 private static final int TICKS_PER_WHEEL = 500; // 0.5 sec.
90 private static final long TICK_DURATION = 10;
91 private static final String POOL_NAME = "ofppool";
93 private static final MessageIntelligenceAgency MESSAGE_INTELLIGENCE_AGENCY = new MessageIntelligenceAgencyImpl();
94 private static final String MESSAGE_INTELLIGENCE_AGENCY_MX_BEAN_NAME = String
96 MessageIntelligenceAgencyMXBean.class.getPackage().getName(),
97 MessageIntelligenceAgencyMXBean.class.getSimpleName());
99 private final HashedWheelTimer hashedWheelTimer =
100 new HashedWheelTimer(TICK_DURATION, TimeUnit.MILLISECONDS, TICKS_PER_WHEEL);
101 private final NotificationPublishService notificationPublishService;
102 private final ExtensionConverterManager extensionConverterManager;
103 private final DataBroker dataBroker;
104 private final Collection<SwitchConnectionProvider> switchConnectionProviders;
105 private final DeviceInitializerProvider deviceInitializerProvider;
106 private final ConvertorManager convertorManager;
107 private final RpcProviderRegistry rpcProviderRegistry;
108 private final ClusterSingletonServiceProvider singletonServicesProvider;
109 private final OpenflowProviderConfig config;
110 private final EntityOwnershipService entityOwnershipService;
111 private final MastershipChangeServiceManager mastershipChangeServiceManager;
112 private DeviceManager deviceManager;
113 private RpcManager rpcManager;
114 private StatisticsManager statisticsManager;
115 private RoleManager roleManager;
116 private ConnectionManager connectionManager;
117 private ListeningExecutorService executorService;
118 private ContextChainHolderImpl contextChainHolder;
119 private final OpenflowPluginDiagStatusProvider openflowPluginStatusMonitor;
121 public static MessageIntelligenceAgency getMessageIntelligenceAgency() {
122 return MESSAGE_INTELLIGENCE_AGENCY;
125 OpenFlowPluginProviderImpl(final ConfigurationService configurationService,
126 final List<SwitchConnectionProvider> switchConnectionProviders,
127 final DataBroker dataBroker,
128 final RpcProviderRegistry rpcProviderRegistry,
129 final NotificationPublishService notificationPublishService,
130 final ClusterSingletonServiceProvider singletonServiceProvider,
131 final EntityOwnershipService entityOwnershipService,
132 final MastershipChangeServiceManager mastershipChangeServiceManager,
133 final OpenflowPluginDiagStatusProvider openflowPluginStatusMonitor,
134 final SystemReadyMonitor systemReadyMonitor) {
135 this.switchConnectionProviders = switchConnectionProviders;
136 this.dataBroker = dataBroker;
137 this.rpcProviderRegistry = rpcProviderRegistry;
138 this.notificationPublishService = notificationPublishService;
139 this.singletonServicesProvider = singletonServiceProvider;
140 this.entityOwnershipService = entityOwnershipService;
141 convertorManager = ConvertorManagerFactory.createDefaultManager();
142 extensionConverterManager = new ExtensionConverterManagerImpl();
143 deviceInitializerProvider = DeviceInitializerProviderFactory.createDefaultProvider();
144 config = new OpenFlowProviderConfigImpl(configurationService);
145 this.mastershipChangeServiceManager = mastershipChangeServiceManager;
146 this.openflowPluginStatusMonitor = openflowPluginStatusMonitor;
147 systemReadyMonitor.registerListener(this);
148 LOG.debug("registered onSystemBootReady() listener for deferred startSwitchConnections()");
152 public void onSystemBootReady() {
153 LOG.debug("onSystemBootReady() received, starting the switch connections");
154 startSwitchConnections();
157 private void startSwitchConnections() {
158 Futures.addCallback(Futures.allAsList(switchConnectionProviders.stream().map(switchConnectionProvider -> {
159 // Inject OpenFlowPlugin custom serializers and deserializers into OpenFlowJava
160 if (config.isUseSingleLayerSerialization()) {
161 SerializerInjector.injectSerializers(switchConnectionProvider,
162 switchConnectionProvider.getConfiguration().isGroupAddModEnabled());
163 DeserializerInjector.injectDeserializers(switchConnectionProvider);
165 DeserializerInjector.revertDeserializers(switchConnectionProvider);
168 // Set handler of incoming connections and start switch connection provider
169 switchConnectionProvider.setSwitchConnectionHandler(connectionManager);
170 return switchConnectionProvider.startup();
171 }).collect(Collectors.toSet())), new FutureCallback<List<Boolean>>() {
173 public void onSuccess(@Nonnull final List<Boolean> result) {
174 LOG.info("All switchConnectionProviders are up and running ({}).", result.size());
175 openflowPluginStatusMonitor.reportStatus(ServiceState.OPERATIONAL, "switch connections started");
179 public void onFailure(@Nonnull final Throwable throwable) {
180 LOG.warn("Some switchConnectionProviders failed to start.", throwable);
181 openflowPluginStatusMonitor.reportStatus(ServiceState.ERROR, "some switch connections failed to start");
183 }, MoreExecutors.directExecutor());
186 private ListenableFuture<List<Boolean>> shutdownSwitchConnections() {
187 final ListenableFuture<List<Boolean>> listListenableFuture =
188 Futures.allAsList(switchConnectionProviders.stream().map(switchConnectionProvider -> {
189 // Revert deserializers to their original state
190 if (config.isUseSingleLayerSerialization()) {
191 DeserializerInjector.revertDeserializers(switchConnectionProvider);
194 // Shutdown switch connection provider
195 return switchConnectionProvider.shutdown();
196 }).collect(Collectors.toSet()));
198 Futures.addCallback(listListenableFuture, new FutureCallback<List<Boolean>>() {
200 public void onSuccess(@Nonnull final List<Boolean> result) {
201 LOG.info("All switchConnectionProviders were successfully shut down ({}).", result.size());
205 public void onFailure(@Nonnull final Throwable throwable) {
206 LOG.warn("Some switchConnectionProviders failed to shutdown.", throwable);
208 }, MoreExecutors.directExecutor());
210 return listListenableFuture;
214 public void initialize() {
215 registerMXBean(MESSAGE_INTELLIGENCE_AGENCY, MESSAGE_INTELLIGENCE_AGENCY_MX_BEAN_NAME);
217 // TODO: copied from OpenFlowPluginProvider (Helium) misusesing the old way of distributing extension converters
218 // TODO: rewrite later!
219 OFSessionUtil.getSessionManager().setExtensionConverterProvider(extensionConverterManager);
221 // Creates a thread pool that creates new threads as needed, but will reuse previously
222 // constructed threads when they are available.
223 // Threads that have not been used for x seconds are terminated and removed from the cache.
224 executorService = MoreExecutors.listeningDecorator(new ThreadPoolLoggingExecutor(
225 config.getThreadPoolMinThreads(),
226 config.getThreadPoolMaxThreads().getValue(),
227 config.getThreadPoolTimeout(),
228 TimeUnit.SECONDS, new SynchronousQueue<>(), POOL_NAME));
230 deviceManager = new DeviceManagerImpl(
233 getMessageIntelligenceAgency(),
234 notificationPublishService,
237 deviceInitializerProvider);
239 TranslatorLibraryUtil.injectBasicTranslatorLibrary(deviceManager, convertorManager);
240 ((ExtensionConverterProviderKeeper) deviceManager).setExtensionConverterProvider(extensionConverterManager);
242 rpcManager = new RpcManagerImpl(
245 extensionConverterManager,
247 notificationPublishService);
249 statisticsManager = new StatisticsManagerImpl(
255 roleManager = new RoleManagerImpl(hashedWheelTimer);
257 contextChainHolder = new ContextChainHolderImpl(
259 singletonServicesProvider,
260 entityOwnershipService,
261 mastershipChangeServiceManager);
263 contextChainHolder.addManager(deviceManager);
264 contextChainHolder.addManager(statisticsManager);
265 contextChainHolder.addManager(rpcManager);
266 contextChainHolder.addManager(roleManager);
268 connectionManager = new ConnectionManagerImpl(config, executorService);
269 connectionManager.setDeviceConnectedHandler(contextChainHolder);
270 connectionManager.setDeviceDisconnectedHandler(contextChainHolder);
272 deviceManager.initialize();
276 public ExtensionConverterRegistrator getExtensionConverterRegistrator() {
277 return extensionConverterManager;
281 public void close() {
283 shutdownSwitchConnections().get(10, TimeUnit.SECONDS);
284 } catch (InterruptedException | ExecutionException | TimeoutException e) {
285 LOG.warn("Failed to shut down switch connections in time {}s, error: {}", 10, e);
288 gracefulShutdown(contextChainHolder);
289 gracefulShutdown(deviceManager);
290 gracefulShutdown(rpcManager);
291 gracefulShutdown(statisticsManager);
292 gracefulShutdown(roleManager);
293 gracefulShutdown(executorService);
294 gracefulShutdown(hashedWheelTimer);
295 unregisterMXBean(MESSAGE_INTELLIGENCE_AGENCY_MX_BEAN_NAME);
296 openflowPluginStatusMonitor.reportStatus(ServiceState.UNREGISTERED, "service shutting down");
299 @SuppressWarnings("checkstyle:IllegalCatch")
300 private static void gracefulShutdown(final AutoCloseable closeable) {
301 if (Objects.isNull(closeable)) {
307 } catch (Exception e) {
308 LOG.warn("Failed to shutdown {} gracefully.", closeable);
312 private static void gracefulShutdown(final Timer timer) {
313 if (Objects.isNull(timer)) {
319 } catch (IllegalStateException e) {
320 LOG.warn("Failed to shutdown {} gracefully.", timer);
324 private static void gracefulShutdown(final ExecutorService executorService) {
325 if (Objects.isNull(executorService)) {
330 executorService.shutdownNow();
331 } catch (SecurityException e) {
332 LOG.warn("Failed to shutdown {} gracefully.", executorService);
336 private static void registerMXBean(final Object bean, final String beanName) {
337 final MBeanServer mbs = ManagementFactory.getPlatformMBeanServer();
340 mbs.registerMBean(bean, new ObjectName(beanName));
341 } catch (MalformedObjectNameException
342 | NotCompliantMBeanException
343 | MBeanRegistrationException
344 | InstanceAlreadyExistsException e) {
345 LOG.warn("Error registering MBean {}", e);
349 private static void unregisterMXBean(final String beanName) {
350 final MBeanServer mbs = ManagementFactory.getPlatformMBeanServer();
353 mbs.unregisterMBean(new ObjectName(beanName));
354 } catch (InstanceNotFoundException
355 | MBeanRegistrationException
356 | MalformedObjectNameException e) {
357 LOG.warn("Error unregistering MBean {}", e);