2 * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
9 package org.opendaylight.openflowplugin.impl;
11 import com.google.common.util.concurrent.FutureCallback;
12 import com.google.common.util.concurrent.Futures;
13 import com.google.common.util.concurrent.ListenableFuture;
14 import com.google.common.util.concurrent.ListeningExecutorService;
15 import com.google.common.util.concurrent.MoreExecutors;
16 import io.netty.util.HashedWheelTimer;
17 import io.netty.util.Timer;
18 import java.lang.management.ManagementFactory;
19 import java.util.Collection;
20 import java.util.List;
21 import java.util.Objects;
22 import java.util.concurrent.ExecutionException;
23 import java.util.concurrent.ExecutorService;
24 import java.util.concurrent.SynchronousQueue;
25 import java.util.concurrent.TimeUnit;
26 import java.util.concurrent.TimeoutException;
27 import java.util.stream.Collectors;
28 import javax.annotation.Nonnull;
29 import javax.management.InstanceAlreadyExistsException;
30 import javax.management.InstanceNotFoundException;
31 import javax.management.MBeanRegistrationException;
32 import javax.management.MBeanServer;
33 import javax.management.MalformedObjectNameException;
34 import javax.management.NotCompliantMBeanException;
35 import javax.management.ObjectName;
36 import org.opendaylight.controller.md.sal.binding.api.DataBroker;
37 import org.opendaylight.controller.md.sal.binding.api.NotificationPublishService;
38 import org.opendaylight.controller.md.sal.common.api.clustering.EntityOwnershipService;
39 import org.opendaylight.controller.sal.binding.api.RpcProviderRegistry;
40 import org.opendaylight.infrautils.diagstatus.ServiceState;
41 import org.opendaylight.infrautils.ready.SystemReadyListener;
42 import org.opendaylight.infrautils.ready.SystemReadyMonitor;
43 import org.opendaylight.mdsal.singleton.common.api.ClusterSingletonServiceProvider;
44 import org.opendaylight.openflowjava.protocol.spi.connection.SwitchConnectionProvider;
45 import org.opendaylight.openflowplugin.api.diagstatus.OpenflowPluginDiagStatusProvider;
46 import org.opendaylight.openflowplugin.api.openflow.OpenFlowPluginProvider;
47 import org.opendaylight.openflowplugin.api.openflow.configuration.ConfigurationService;
48 import org.opendaylight.openflowplugin.api.openflow.connection.ConnectionManager;
49 import org.opendaylight.openflowplugin.api.openflow.device.DeviceManager;
50 import org.opendaylight.openflowplugin.api.openflow.mastership.MastershipChangeServiceManager;
51 import org.opendaylight.openflowplugin.api.openflow.role.RoleManager;
52 import org.opendaylight.openflowplugin.api.openflow.rpc.RpcManager;
53 import org.opendaylight.openflowplugin.api.openflow.statistics.StatisticsManager;
54 import org.opendaylight.openflowplugin.api.openflow.statistics.ofpspecific.MessageIntelligenceAgency;
55 import org.opendaylight.openflowplugin.extension.api.ExtensionConverterProviderKeeper;
56 import org.opendaylight.openflowplugin.extension.api.ExtensionConverterRegistrator;
57 import org.opendaylight.openflowplugin.extension.api.OpenFlowPluginExtensionRegistratorProvider;
58 import org.opendaylight.openflowplugin.extension.api.core.extension.ExtensionConverterManager;
59 import org.opendaylight.openflowplugin.impl.configuration.OpenFlowProviderConfigImpl;
60 import org.opendaylight.openflowplugin.impl.connection.ConnectionManagerImpl;
61 import org.opendaylight.openflowplugin.impl.device.DeviceManagerImpl;
62 import org.opendaylight.openflowplugin.impl.device.initialization.DeviceInitializerProvider;
63 import org.opendaylight.openflowplugin.impl.device.initialization.DeviceInitializerProviderFactory;
64 import org.opendaylight.openflowplugin.impl.lifecycle.ContextChainHolderImpl;
65 import org.opendaylight.openflowplugin.impl.protocol.deserialization.DeserializerInjector;
66 import org.opendaylight.openflowplugin.impl.protocol.serialization.SerializerInjector;
67 import org.opendaylight.openflowplugin.impl.role.RoleManagerImpl;
68 import org.opendaylight.openflowplugin.impl.rpc.RpcManagerImpl;
69 import org.opendaylight.openflowplugin.impl.statistics.StatisticsManagerImpl;
70 import org.opendaylight.openflowplugin.impl.statistics.ofpspecific.MessageIntelligenceAgencyImpl;
71 import org.opendaylight.openflowplugin.impl.statistics.ofpspecific.MessageIntelligenceAgencyMXBean;
72 import org.opendaylight.openflowplugin.impl.util.ThreadPoolLoggingExecutor;
73 import org.opendaylight.openflowplugin.impl.util.TranslatorLibraryUtil;
74 import org.opendaylight.openflowplugin.openflow.md.core.extension.ExtensionConverterManagerImpl;
75 import org.opendaylight.openflowplugin.openflow.md.core.sal.convertor.ConvertorManager;
76 import org.opendaylight.openflowplugin.openflow.md.core.sal.convertor.ConvertorManagerFactory;
77 import org.opendaylight.openflowplugin.openflow.md.core.session.OFSessionUtil;
78 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.openflow.provider.config.rev160510.OpenflowProviderConfig;
79 import org.slf4j.Logger;
80 import org.slf4j.LoggerFactory;
82 public class OpenFlowPluginProviderImpl implements
83 OpenFlowPluginProvider,
84 OpenFlowPluginExtensionRegistratorProvider,
87 private static final Logger LOG = LoggerFactory.getLogger(OpenFlowPluginProviderImpl.class);
89 private static final int TICKS_PER_WHEEL = 500; // 0.5 sec.
90 private static final long TICK_DURATION = 10;
91 private static final String POOL_NAME = "ofppool";
93 private static final MessageIntelligenceAgency MESSAGE_INTELLIGENCE_AGENCY = new MessageIntelligenceAgencyImpl();
94 private static final String MESSAGE_INTELLIGENCE_AGENCY_MX_BEAN_NAME = String
96 MessageIntelligenceAgencyMXBean.class.getPackage().getName(),
97 MessageIntelligenceAgencyMXBean.class.getSimpleName());
99 private final HashedWheelTimer hashedWheelTimer =
100 new HashedWheelTimer(TICK_DURATION, TimeUnit.MILLISECONDS, TICKS_PER_WHEEL);
101 private final NotificationPublishService notificationPublishService;
102 private final ExtensionConverterManager extensionConverterManager;
103 private final DataBroker dataBroker;
104 private final Collection<SwitchConnectionProvider> switchConnectionProviders;
105 private final DeviceInitializerProvider deviceInitializerProvider;
106 private final ConvertorManager convertorManager;
107 private final RpcProviderRegistry rpcProviderRegistry;
108 private final ClusterSingletonServiceProvider singletonServicesProvider;
109 private final OpenflowProviderConfig config;
110 private final EntityOwnershipService entityOwnershipService;
111 private final MastershipChangeServiceManager mastershipChangeServiceManager;
112 private DeviceManager deviceManager;
113 private RpcManager rpcManager;
114 private StatisticsManager statisticsManager;
115 private RoleManager roleManager;
116 private ConnectionManager connectionManager;
117 private ListeningExecutorService executorService;
118 private ContextChainHolderImpl contextChainHolder;
119 private OpenflowPluginDiagStatusProvider openflowPluginStatusMonitor;
121 public static MessageIntelligenceAgency getMessageIntelligenceAgency() {
122 return MESSAGE_INTELLIGENCE_AGENCY;
125 OpenFlowPluginProviderImpl(final ConfigurationService configurationService,
126 final List<SwitchConnectionProvider> switchConnectionProviders,
127 final DataBroker dataBroker,
128 final RpcProviderRegistry rpcProviderRegistry,
129 final NotificationPublishService notificationPublishService,
130 final ClusterSingletonServiceProvider singletonServiceProvider,
131 final EntityOwnershipService entityOwnershipService,
132 final MastershipChangeServiceManager mastershipChangeServiceManager,
133 final OpenflowPluginDiagStatusProvider openflowPluginStatusMonitor,
134 final SystemReadyMonitor systemReadyMonitor) {
135 this.switchConnectionProviders = switchConnectionProviders;
136 this.dataBroker = dataBroker;
137 this.rpcProviderRegistry = rpcProviderRegistry;
138 this.notificationPublishService = notificationPublishService;
139 this.singletonServicesProvider = singletonServiceProvider;
140 this.entityOwnershipService = entityOwnershipService;
141 convertorManager = ConvertorManagerFactory.createDefaultManager();
142 extensionConverterManager = new ExtensionConverterManagerImpl();
143 deviceInitializerProvider = DeviceInitializerProviderFactory.createDefaultProvider();
144 config = new OpenFlowProviderConfigImpl(configurationService);
145 this.mastershipChangeServiceManager = mastershipChangeServiceManager;
146 this.openflowPluginStatusMonitor = openflowPluginStatusMonitor;
147 systemReadyMonitor.registerListener(this);
148 LOG.debug("registered onSystemBootReady() listener for deferred startSwitchConnections()");
152 public void onSystemBootReady() {
153 LOG.debug("onSystemBootReady() received, starting the switch connections");
154 startSwitchConnections();
157 private void startSwitchConnections() {
158 Futures.addCallback(Futures.allAsList(switchConnectionProviders.stream().map(switchConnectionProvider -> {
159 // Inject OpenFlowPlugin custom serializers and deserializers into OpenFlowJava
160 if (config.isUseSingleLayerSerialization()) {
161 SerializerInjector.injectSerializers(switchConnectionProvider);
162 DeserializerInjector.injectDeserializers(switchConnectionProvider);
164 DeserializerInjector.revertDeserializers(switchConnectionProvider);
167 // Set handler of incoming connections and start switch connection provider
168 switchConnectionProvider.setSwitchConnectionHandler(connectionManager);
169 return switchConnectionProvider.startup();
170 }).collect(Collectors.toSet())), new FutureCallback<List<Boolean>>() {
172 public void onSuccess(final List<Boolean> result) {
173 LOG.info("All switchConnectionProviders are up and running ({}).", result.size());
174 openflowPluginStatusMonitor.reportStatus(ServiceState.OPERATIONAL, "switch connections started");
178 public void onFailure(@Nonnull final Throwable throwable) {
179 LOG.warn("Some switchConnectionProviders failed to start.", throwable);
180 openflowPluginStatusMonitor.reportStatus(ServiceState.ERROR, "some switch connections failed to start");
185 private ListenableFuture<List<Boolean>> shutdownSwitchConnections() {
186 final ListenableFuture<List<Boolean>> listListenableFuture =
187 Futures.allAsList(switchConnectionProviders.stream().map(switchConnectionProvider -> {
188 // Revert deserializers to their original state
189 if (config.isUseSingleLayerSerialization()) {
190 DeserializerInjector.revertDeserializers(switchConnectionProvider);
193 // Shutdown switch connection provider
194 return switchConnectionProvider.shutdown();
195 }).collect(Collectors.toSet()));
197 Futures.addCallback(listListenableFuture, new FutureCallback<List<Boolean>>() {
199 public void onSuccess(final List<Boolean> result) {
200 LOG.info("All switchConnectionProviders were successfully shut down ({}).", result.size());
204 public void onFailure(@Nonnull final Throwable throwable) {
205 LOG.warn("Some switchConnectionProviders failed to shutdown.", throwable);
209 return listListenableFuture;
213 public void initialize() {
214 registerMXBean(MESSAGE_INTELLIGENCE_AGENCY, MESSAGE_INTELLIGENCE_AGENCY_MX_BEAN_NAME);
216 // TODO: copied from OpenFlowPluginProvider (Helium) misusesing the old way of distributing extension converters
217 // TODO: rewrite later!
218 OFSessionUtil.getSessionManager().setExtensionConverterProvider(extensionConverterManager);
220 // Creates a thread pool that creates new threads as needed, but will reuse previously
221 // constructed threads when they are available.
222 // Threads that have not been used for x seconds are terminated and removed from the cache.
223 executorService = MoreExecutors.listeningDecorator(new ThreadPoolLoggingExecutor(
224 config.getThreadPoolMinThreads(),
225 config.getThreadPoolMaxThreads().getValue(),
226 config.getThreadPoolTimeout(),
227 TimeUnit.SECONDS, new SynchronousQueue<>(), POOL_NAME));
229 deviceManager = new DeviceManagerImpl(
232 getMessageIntelligenceAgency(),
233 notificationPublishService,
236 deviceInitializerProvider);
238 TranslatorLibraryUtil.injectBasicTranslatorLibrary(deviceManager, convertorManager);
239 ((ExtensionConverterProviderKeeper) deviceManager).setExtensionConverterProvider(extensionConverterManager);
241 rpcManager = new RpcManagerImpl(
244 extensionConverterManager,
246 notificationPublishService);
248 statisticsManager = new StatisticsManagerImpl(
254 roleManager = new RoleManagerImpl(hashedWheelTimer);
256 contextChainHolder = new ContextChainHolderImpl(
258 singletonServicesProvider,
259 entityOwnershipService,
260 mastershipChangeServiceManager);
262 contextChainHolder.addManager(deviceManager);
263 contextChainHolder.addManager(statisticsManager);
264 contextChainHolder.addManager(rpcManager);
265 contextChainHolder.addManager(roleManager);
267 connectionManager = new ConnectionManagerImpl(config, executorService);
268 connectionManager.setDeviceConnectedHandler(contextChainHolder);
269 connectionManager.setDeviceDisconnectedHandler(contextChainHolder);
271 deviceManager.initialize();
275 public ExtensionConverterRegistrator getExtensionConverterRegistrator() {
276 return extensionConverterManager;
280 public void close() {
282 shutdownSwitchConnections().get(10, TimeUnit.SECONDS);
283 } catch (InterruptedException | ExecutionException | TimeoutException e) {
284 LOG.warn("Failed to shut down switch connections in time {}s, error: {}", 10, e);
287 gracefulShutdown(contextChainHolder);
288 gracefulShutdown(deviceManager);
289 gracefulShutdown(rpcManager);
290 gracefulShutdown(statisticsManager);
291 gracefulShutdown(roleManager);
292 gracefulShutdown(executorService);
293 gracefulShutdown(hashedWheelTimer);
294 unregisterMXBean(MESSAGE_INTELLIGENCE_AGENCY_MX_BEAN_NAME);
295 openflowPluginStatusMonitor.reportStatus(ServiceState.UNREGISTERED, "service shutting down");
298 @SuppressWarnings("checkstyle:IllegalCatch")
299 private static void gracefulShutdown(final AutoCloseable closeable) {
300 if (Objects.isNull(closeable)) {
306 } catch (Exception e) {
307 LOG.warn("Failed to shutdown {} gracefully.", closeable);
311 private static void gracefulShutdown(final Timer timer) {
312 if (Objects.isNull(timer)) {
318 } catch (IllegalStateException e) {
319 LOG.warn("Failed to shutdown {} gracefully.", timer);
323 private static void gracefulShutdown(final ExecutorService executorService) {
324 if (Objects.isNull(executorService)) {
329 executorService.shutdownNow();
330 } catch (SecurityException e) {
331 LOG.warn("Failed to shutdown {} gracefully.", executorService);
335 private static void registerMXBean(final Object bean, final String beanName) {
336 final MBeanServer mbs = ManagementFactory.getPlatformMBeanServer();
339 mbs.registerMBean(bean, new ObjectName(beanName));
340 } catch (MalformedObjectNameException
341 | NotCompliantMBeanException
342 | MBeanRegistrationException
343 | InstanceAlreadyExistsException e) {
344 LOG.warn("Error registering MBean {}", e);
348 private static void unregisterMXBean(final String beanName) {
349 final MBeanServer mbs = ManagementFactory.getPlatformMBeanServer();
352 mbs.unregisterMBean(new ObjectName(beanName));
353 } catch (InstanceNotFoundException
354 | MBeanRegistrationException
355 | MalformedObjectNameException e) {
356 LOG.warn("Error unregistering MBean {}", e);