2 * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.openflowplugin.impl;
10 import com.google.common.annotations.VisibleForTesting;
11 import com.google.common.util.concurrent.FutureCallback;
12 import com.google.common.util.concurrent.Futures;
13 import com.google.common.util.concurrent.ListenableFuture;
14 import com.google.common.util.concurrent.MoreExecutors;
15 import com.google.common.util.concurrent.SettableFuture;
16 import io.netty.util.HashedWheelTimer;
17 import io.netty.util.Timer;
18 import java.lang.management.ManagementFactory;
19 import java.util.Collection;
20 import java.util.List;
22 import java.util.concurrent.ExecutionException;
23 import java.util.concurrent.ExecutorService;
24 import java.util.concurrent.Future;
25 import java.util.concurrent.SynchronousQueue;
26 import java.util.concurrent.TimeUnit;
27 import java.util.concurrent.TimeoutException;
28 import java.util.stream.Collectors;
29 import javax.annotation.PostConstruct;
30 import javax.annotation.PreDestroy;
31 import javax.inject.Inject;
32 import javax.inject.Singleton;
33 import javax.management.InstanceAlreadyExistsException;
34 import javax.management.InstanceNotFoundException;
35 import javax.management.MBeanRegistrationException;
36 import javax.management.MBeanServer;
37 import javax.management.MalformedObjectNameException;
38 import javax.management.NotCompliantMBeanException;
39 import javax.management.ObjectName;
40 import org.opendaylight.infrautils.diagstatus.ServiceState;
41 import org.opendaylight.infrautils.ready.SystemReadyListener;
42 import org.opendaylight.infrautils.ready.SystemReadyMonitor;
43 import org.opendaylight.mdsal.binding.api.DataBroker;
44 import org.opendaylight.mdsal.binding.api.NotificationPublishService;
45 import org.opendaylight.mdsal.binding.api.RpcProviderService;
46 import org.opendaylight.mdsal.eos.binding.api.EntityOwnershipService;
47 import org.opendaylight.mdsal.singleton.common.api.ClusterSingletonServiceProvider;
48 import org.opendaylight.openflowjava.protocol.spi.connection.SwitchConnectionProvider;
49 import org.opendaylight.openflowjava.protocol.spi.connection.SwitchConnectionProviderList;
50 import org.opendaylight.openflowplugin.api.openflow.FlowGroupInfoHistories;
51 import org.opendaylight.openflowplugin.api.openflow.FlowGroupInfoHistory;
52 import org.opendaylight.openflowplugin.api.openflow.OpenFlowPluginProvider;
53 import org.opendaylight.openflowplugin.api.openflow.configuration.ConfigurationService;
54 import org.opendaylight.openflowplugin.api.openflow.connection.ConnectionManager;
55 import org.opendaylight.openflowplugin.api.openflow.device.DeviceManager;
56 import org.opendaylight.openflowplugin.api.openflow.mastership.MastershipChangeServiceManager;
57 import org.opendaylight.openflowplugin.api.openflow.role.RoleManager;
58 import org.opendaylight.openflowplugin.api.openflow.rpc.RpcManager;
59 import org.opendaylight.openflowplugin.api.openflow.statistics.StatisticsManager;
60 import org.opendaylight.openflowplugin.api.openflow.statistics.ofpspecific.MessageIntelligenceAgency;
61 import org.opendaylight.openflowplugin.extension.api.ExtensionConverterProviderKeeper;
62 import org.opendaylight.openflowplugin.extension.api.ExtensionConverterRegistrator;
63 import org.opendaylight.openflowplugin.extension.api.OpenFlowPluginExtensionRegistratorProvider;
64 import org.opendaylight.openflowplugin.extension.api.core.extension.ExtensionConverterManager;
65 import org.opendaylight.openflowplugin.impl.configuration.OpenFlowProviderConfigImpl;
66 import org.opendaylight.openflowplugin.impl.connection.ConnectionManagerImpl;
67 import org.opendaylight.openflowplugin.impl.device.DeviceManagerImpl;
68 import org.opendaylight.openflowplugin.impl.device.initialization.DeviceInitializerProvider;
69 import org.opendaylight.openflowplugin.impl.device.initialization.DeviceInitializerProviderFactory;
70 import org.opendaylight.openflowplugin.impl.lifecycle.ContextChainHolderImpl;
71 import org.opendaylight.openflowplugin.impl.protocol.deserialization.DeserializerInjector;
72 import org.opendaylight.openflowplugin.impl.protocol.serialization.SerializerInjector;
73 import org.opendaylight.openflowplugin.impl.role.RoleManagerImpl;
74 import org.opendaylight.openflowplugin.impl.rpc.RpcManagerImpl;
75 import org.opendaylight.openflowplugin.impl.statistics.StatisticsManagerImpl;
76 import org.opendaylight.openflowplugin.impl.statistics.ofpspecific.MessageIntelligenceAgencyImpl;
77 import org.opendaylight.openflowplugin.impl.statistics.ofpspecific.MessageIntelligenceAgencyMXBean;
78 import org.opendaylight.openflowplugin.impl.util.ThreadPoolLoggingExecutor;
79 import org.opendaylight.openflowplugin.impl.util.TranslatorLibraryUtil;
80 import org.opendaylight.openflowplugin.openflow.md.core.extension.ExtensionConverterManagerImpl;
81 import org.opendaylight.openflowplugin.openflow.md.core.sal.convertor.ConvertorManager;
82 import org.opendaylight.openflowplugin.openflow.md.core.sal.convertor.ConvertorManagerFactory;
83 import org.opendaylight.openflowplugin.openflow.md.core.session.OFSessionUtil;
84 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeId;
85 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.openflow.provider.config.rev160510.OpenflowProviderConfig;
86 import org.slf4j.Logger;
87 import org.slf4j.LoggerFactory;
90 public class OpenFlowPluginProviderImpl implements
91 OpenFlowPluginProvider,
92 OpenFlowPluginExtensionRegistratorProvider,
93 FlowGroupInfoHistories,
96 private static final Logger LOG = LoggerFactory.getLogger(OpenFlowPluginProviderImpl.class);
98 private static final int TICKS_PER_WHEEL = 500; // 0.5 sec.
99 private static final long TICK_DURATION = 10;
100 private static final String POOL_NAME = "ofppool";
102 private static final MessageIntelligenceAgency MESSAGE_INTELLIGENCE_AGENCY = new MessageIntelligenceAgencyImpl();
103 private static final String MESSAGE_INTELLIGENCE_AGENCY_MX_BEAN_NAME = String
104 .format("%s:type=%s",
105 MessageIntelligenceAgencyMXBean.class.getPackage().getName(),
106 MessageIntelligenceAgencyMXBean.class.getSimpleName());
108 private final HashedWheelTimer hashedWheelTimer =
109 new HashedWheelTimer(TICK_DURATION, TimeUnit.MILLISECONDS, TICKS_PER_WHEEL);
110 private final NotificationPublishService notificationPublishService;
111 private final ExtensionConverterManager extensionConverterManager;
112 private final DataBroker dataBroker;
113 private final Collection<SwitchConnectionProvider> switchConnectionProviders;
114 private final DeviceInitializerProvider deviceInitializerProvider;
115 private final ConvertorManager convertorManager;
116 private final RpcProviderService rpcProviderRegistry;
117 private final ClusterSingletonServiceProvider singletonServicesProvider;
118 private final OpenflowProviderConfig config;
119 private final EntityOwnershipService entityOwnershipService;
120 private final MastershipChangeServiceManager mastershipChangeServiceManager;
121 private DeviceManager deviceManager;
122 private RpcManager rpcManager;
123 private StatisticsManager statisticsManager;
124 private RoleManager roleManager;
125 private ConnectionManager connectionManager;
126 private ExecutorService executorService;
127 private ContextChainHolderImpl contextChainHolder;
128 private final DiagStatusProvider diagStatusProvider;
129 private final SystemReadyMonitor systemReadyMonitor;
130 private final SettableFuture<Void> fullyStarted = SettableFuture.create();
131 private static final String OPENFLOW_SERVICE_NAME = "OPENFLOW";
133 public static MessageIntelligenceAgency getMessageIntelligenceAgency() {
134 return MESSAGE_INTELLIGENCE_AGENCY;
138 public OpenFlowPluginProviderImpl(final ConfigurationService configurationService,
139 final SwitchConnectionProviderList switchConnectionProviders,
140 final PingPongDataBroker pingPongDataBroker,
141 final RpcProviderService rpcProviderRegistry,
142 final NotificationPublishService notificationPublishService,
143 final ClusterSingletonServiceProvider singletonServiceProvider,
144 final EntityOwnershipService entityOwnershipService,
145 final MastershipChangeServiceManager mastershipChangeServiceManager,
146 final DiagStatusProvider diagStatusProvider,
147 final SystemReadyMonitor systemReadyMonitor) {
148 this.switchConnectionProviders = switchConnectionProviders;
149 dataBroker = pingPongDataBroker;
150 this.rpcProviderRegistry = rpcProviderRegistry;
151 this.notificationPublishService = notificationPublishService;
152 singletonServicesProvider = singletonServiceProvider;
153 this.entityOwnershipService = entityOwnershipService;
154 convertorManager = ConvertorManagerFactory.createDefaultManager();
155 extensionConverterManager = new ExtensionConverterManagerImpl();
156 deviceInitializerProvider = DeviceInitializerProviderFactory.createDefaultProvider();
157 config = new OpenFlowProviderConfigImpl(configurationService);
158 this.mastershipChangeServiceManager = mastershipChangeServiceManager;
159 this.diagStatusProvider = diagStatusProvider;
160 this.systemReadyMonitor = systemReadyMonitor;
164 public void onSystemBootReady() {
165 LOG.info("onSystemBootReady() received, starting the switch connections");
166 Futures.addCallback(Futures.allAsList(switchConnectionProviders.stream().map(switchConnectionProvider -> {
167 // Inject OpenFlowPlugin custom serializers and deserializers into OpenFlowJava
168 if (config.getUseSingleLayerSerialization()) {
169 SerializerInjector.injectSerializers(switchConnectionProvider,
170 switchConnectionProvider.getConfiguration().isGroupAddModEnabled());
171 DeserializerInjector.injectDeserializers(switchConnectionProvider);
173 DeserializerInjector.revertDeserializers(switchConnectionProvider);
176 // Set handler of incoming connections and start switch connection provider
177 switchConnectionProvider.setSwitchConnectionHandler(connectionManager);
178 return switchConnectionProvider.startup();
179 }).collect(Collectors.toSet())), new FutureCallback<List<Boolean>>() {
181 public void onSuccess(final List<Boolean> result) {
182 LOG.info("All switchConnectionProviders are up and running ({}).", result.size());
183 diagStatusProvider.reportStatus(ServiceState.OPERATIONAL);
184 fullyStarted.set(null);
188 public void onFailure(final Throwable throwable) {
189 LOG.warn("Some switchConnectionProviders failed to start.", throwable);
190 diagStatusProvider.reportStatus(ServiceState.ERROR, throwable);
191 fullyStarted.setException(throwable);
193 }, MoreExecutors.directExecutor());
197 public Future<Void> getFullyStarted() {
201 private ListenableFuture<List<Boolean>> shutdownSwitchConnections() {
202 final ListenableFuture<List<Boolean>> listListenableFuture =
203 Futures.allAsList(switchConnectionProviders.stream().map(switchConnectionProvider -> {
204 // Revert deserializers to their original state
205 if (config.getUseSingleLayerSerialization()) {
206 DeserializerInjector.revertDeserializers(switchConnectionProvider);
209 // Shutdown switch connection provider
210 return switchConnectionProvider.shutdown();
211 }).collect(Collectors.toSet()));
213 Futures.addCallback(listListenableFuture, new FutureCallback<List<Boolean>>() {
215 public void onSuccess(final List<Boolean> result) {
216 LOG.info("All switchConnectionProviders were successfully shut down ({}).", result.size());
220 public void onFailure(final Throwable throwable) {
221 LOG.warn("Some switchConnectionProviders failed to shutdown.", throwable);
223 }, MoreExecutors.directExecutor());
225 return listListenableFuture;
230 public void initialize() {
231 registerMXBean(MESSAGE_INTELLIGENCE_AGENCY, MESSAGE_INTELLIGENCE_AGENCY_MX_BEAN_NAME);
233 // TODO: copied from OpenFlowPluginProvider (Helium) misusesing the old way of distributing extension converters
234 // TODO: rewrite later!
235 OFSessionUtil.getSessionManager().setExtensionConverterProvider(extensionConverterManager);
237 // Creates a thread pool that creates new threads as needed, but will reuse previously
238 // constructed threads when they are available.
239 // Threads that have not been used for x seconds are terminated and removed from the cache.
240 executorService = new ThreadPoolLoggingExecutor(
241 config.getThreadPoolMinThreads().toJava(),
242 config.getThreadPoolMaxThreads().getValue().toJava(),
243 config.getThreadPoolTimeout().toJava(),
244 TimeUnit.SECONDS, new SynchronousQueue<>(), POOL_NAME);
246 deviceManager = new DeviceManagerImpl(
249 getMessageIntelligenceAgency(),
250 notificationPublishService,
253 deviceInitializerProvider,
256 TranslatorLibraryUtil.injectBasicTranslatorLibrary(deviceManager, convertorManager);
257 ((ExtensionConverterProviderKeeper) deviceManager).setExtensionConverterProvider(extensionConverterManager);
259 rpcManager = new RpcManagerImpl(
262 extensionConverterManager,
264 notificationPublishService);
266 statisticsManager = new StatisticsManagerImpl(
272 roleManager = new RoleManagerImpl(hashedWheelTimer, config, executorService);
274 contextChainHolder = new ContextChainHolderImpl(
276 singletonServicesProvider,
277 entityOwnershipService,
278 mastershipChangeServiceManager,
281 contextChainHolder.addManager(deviceManager);
282 contextChainHolder.addManager(statisticsManager);
283 contextChainHolder.addManager(rpcManager);
284 contextChainHolder.addManager(roleManager);
286 connectionManager = new ConnectionManagerImpl(config, executorService, dataBroker, notificationPublishService);
287 connectionManager.setDeviceConnectedHandler(contextChainHolder);
288 connectionManager.setDeviceDisconnectedHandler(contextChainHolder);
290 deviceManager.setContextChainHolder(contextChainHolder);
291 deviceManager.initialize();
292 systemReadyMonitor.registerListener(this);
293 LOG.info("registered onSystemBootReady() listener for OpenFlowPluginProvider");
297 public ExtensionConverterRegistrator getExtensionConverterRegistrator() {
298 return extensionConverterManager;
302 public Map<NodeId, FlowGroupInfoHistory> getAllFlowGroupHistories() {
303 return deviceManager.getAllFlowGroupHistories();
307 public FlowGroupInfoHistory getFlowGroupHistory(final NodeId nodeId) {
308 return deviceManager.getFlowGroupHistory(nodeId);
313 @SuppressWarnings("checkstyle:IllegalCatch")
314 public void close() {
316 shutdownSwitchConnections().get(10, TimeUnit.SECONDS);
317 } catch (InterruptedException | ExecutionException | TimeoutException e) {
318 LOG.warn("Failed to shut down switch connections in time {}s", 10, e);
321 gracefulShutdown(contextChainHolder);
322 gracefulShutdown(connectionManager);
323 gracefulShutdown(deviceManager);
324 gracefulShutdown(rpcManager);
325 gracefulShutdown(statisticsManager);
326 gracefulShutdown(roleManager);
327 gracefulShutdown(executorService);
328 gracefulShutdown(hashedWheelTimer);
329 unregisterMXBean(MESSAGE_INTELLIGENCE_AGENCY_MX_BEAN_NAME);
330 diagStatusProvider.reportStatus(ServiceState.UNREGISTERED);
332 if (connectionManager != null) {
333 connectionManager.close();
334 connectionManager = null;
336 } catch (Exception e) {
337 LOG.error("Failed to close ConnectionManager", e);
341 @SuppressWarnings("checkstyle:IllegalCatch")
342 private static void gracefulShutdown(final AutoCloseable closeable) {
343 if (closeable != null) {
346 } catch (Exception e) {
347 LOG.warn("Failed to shutdown {} gracefully.", closeable);
352 private static void gracefulShutdown(final Timer timer) {
356 } catch (IllegalStateException e) {
357 LOG.warn("Failed to shutdown {} gracefully.", timer);
362 private static void gracefulShutdown(final ExecutorService executorService) {
363 if (executorService != null) {
365 executorService.shutdownNow();
366 } catch (SecurityException e) {
367 LOG.warn("Failed to shutdown {} gracefully.", executorService);
372 private static void registerMXBean(final Object bean, final String beanName) {
373 final MBeanServer mbs = ManagementFactory.getPlatformMBeanServer();
376 mbs.registerMBean(bean, new ObjectName(beanName));
377 } catch (MalformedObjectNameException
378 | NotCompliantMBeanException
379 | MBeanRegistrationException
380 | InstanceAlreadyExistsException e) {
381 LOG.warn("Error registering MBean {}", beanName, e);
385 private static void unregisterMXBean(final String beanName) {
386 final MBeanServer mbs = ManagementFactory.getPlatformMBeanServer();
389 mbs.unregisterMBean(new ObjectName(beanName));
390 } catch (InstanceNotFoundException
391 | MBeanRegistrationException
392 | MalformedObjectNameException e) {
393 LOG.warn("Error unregistering MBean {}", beanName, e);