Merge "Use QueuedNotificationManager to dispatch tasks"
[openflowplugin.git] / openflowplugin-impl / src / main / java / org / opendaylight / openflowplugin / impl / OpenFlowPluginProviderImpl.java
1 /*
2  * Copyright (c) 2015 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.openflowplugin.impl;
9
10 import com.google.common.util.concurrent.FutureCallback;
11 import com.google.common.util.concurrent.Futures;
12 import com.google.common.util.concurrent.ListenableFuture;
13 import com.google.common.util.concurrent.ListeningExecutorService;
14 import com.google.common.util.concurrent.MoreExecutors;
15 import io.netty.util.HashedWheelTimer;
16 import io.netty.util.Timer;
17 import java.lang.management.ManagementFactory;
18 import java.util.Collection;
19 import java.util.List;
20 import java.util.Objects;
21 import java.util.concurrent.ExecutionException;
22 import java.util.concurrent.ExecutorService;
23 import java.util.concurrent.SynchronousQueue;
24 import java.util.concurrent.TimeUnit;
25 import java.util.concurrent.TimeoutException;
26 import java.util.stream.Collectors;
27 import javax.annotation.Nonnull;
28 import javax.annotation.PostConstruct;
29 import javax.annotation.PreDestroy;
30 import javax.inject.Inject;
31 import javax.inject.Singleton;
32 import javax.management.InstanceAlreadyExistsException;
33 import javax.management.InstanceNotFoundException;
34 import javax.management.MBeanRegistrationException;
35 import javax.management.MBeanServer;
36 import javax.management.MalformedObjectNameException;
37 import javax.management.NotCompliantMBeanException;
38 import javax.management.ObjectName;
39 import org.apache.aries.blueprint.annotation.service.Reference;
40 import org.apache.aries.blueprint.annotation.service.Service;
41 import org.opendaylight.controller.md.sal.binding.api.DataBroker;
42 import org.opendaylight.controller.md.sal.binding.api.NotificationPublishService;
43 import org.opendaylight.controller.sal.binding.api.RpcProviderRegistry;
44 import org.opendaylight.infrautils.diagstatus.ServiceState;
45 import org.opendaylight.infrautils.ready.SystemReadyListener;
46 import org.opendaylight.infrautils.ready.SystemReadyMonitor;
47 import org.opendaylight.mdsal.eos.binding.api.EntityOwnershipService;
48 import org.opendaylight.mdsal.singleton.common.api.ClusterSingletonServiceProvider;
49 import org.opendaylight.openflowjava.protocol.spi.connection.SwitchConnectionProvider;
50 import org.opendaylight.openflowjava.protocol.spi.connection.SwitchConnectionProviderList;
51 import org.opendaylight.openflowplugin.api.openflow.OpenFlowPluginProvider;
52 import org.opendaylight.openflowplugin.api.openflow.configuration.ConfigurationService;
53 import org.opendaylight.openflowplugin.api.openflow.connection.ConnectionManager;
54 import org.opendaylight.openflowplugin.api.openflow.device.DeviceManager;
55 import org.opendaylight.openflowplugin.api.openflow.mastership.MastershipChangeServiceManager;
56 import org.opendaylight.openflowplugin.api.openflow.role.RoleManager;
57 import org.opendaylight.openflowplugin.api.openflow.rpc.RpcManager;
58 import org.opendaylight.openflowplugin.api.openflow.statistics.StatisticsManager;
59 import org.opendaylight.openflowplugin.api.openflow.statistics.ofpspecific.MessageIntelligenceAgency;
60 import org.opendaylight.openflowplugin.extension.api.ExtensionConverterProviderKeeper;
61 import org.opendaylight.openflowplugin.extension.api.ExtensionConverterRegistrator;
62 import org.opendaylight.openflowplugin.extension.api.OpenFlowPluginExtensionRegistratorProvider;
63 import org.opendaylight.openflowplugin.extension.api.core.extension.ExtensionConverterManager;
64 import org.opendaylight.openflowplugin.impl.configuration.OpenFlowProviderConfigImpl;
65 import org.opendaylight.openflowplugin.impl.connection.ConnectionManagerImpl;
66 import org.opendaylight.openflowplugin.impl.device.DeviceManagerImpl;
67 import org.opendaylight.openflowplugin.impl.device.initialization.DeviceInitializerProvider;
68 import org.opendaylight.openflowplugin.impl.device.initialization.DeviceInitializerProviderFactory;
69 import org.opendaylight.openflowplugin.impl.lifecycle.ContextChainHolderImpl;
70 import org.opendaylight.openflowplugin.impl.protocol.deserialization.DeserializerInjector;
71 import org.opendaylight.openflowplugin.impl.protocol.serialization.SerializerInjector;
72 import org.opendaylight.openflowplugin.impl.role.RoleManagerImpl;
73 import org.opendaylight.openflowplugin.impl.rpc.RpcManagerImpl;
74 import org.opendaylight.openflowplugin.impl.statistics.StatisticsManagerImpl;
75 import org.opendaylight.openflowplugin.impl.statistics.ofpspecific.MessageIntelligenceAgencyImpl;
76 import org.opendaylight.openflowplugin.impl.statistics.ofpspecific.MessageIntelligenceAgencyMXBean;
77 import org.opendaylight.openflowplugin.impl.util.ThreadPoolLoggingExecutor;
78 import org.opendaylight.openflowplugin.impl.util.TranslatorLibraryUtil;
79 import org.opendaylight.openflowplugin.openflow.md.core.extension.ExtensionConverterManagerImpl;
80 import org.opendaylight.openflowplugin.openflow.md.core.sal.convertor.ConvertorManager;
81 import org.opendaylight.openflowplugin.openflow.md.core.sal.convertor.ConvertorManagerFactory;
82 import org.opendaylight.openflowplugin.openflow.md.core.session.OFSessionUtil;
83 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.openflow.provider.config.rev160510.OpenflowProviderConfig;
84 import org.slf4j.Logger;
85 import org.slf4j.LoggerFactory;
86
87 @Singleton
88 @Service(classes = { OpenFlowPluginProvider.class, OpenFlowPluginExtensionRegistratorProvider.class })
89 public class OpenFlowPluginProviderImpl implements
90         OpenFlowPluginProvider,
91         OpenFlowPluginExtensionRegistratorProvider,
92         SystemReadyListener {
93
94     private static final Logger LOG = LoggerFactory.getLogger(OpenFlowPluginProviderImpl.class);
95
96     private static final int TICKS_PER_WHEEL = 500; // 0.5 sec.
97     private static final long TICK_DURATION = 10;
98     private static final String POOL_NAME = "ofppool";
99
100     private static final MessageIntelligenceAgency MESSAGE_INTELLIGENCE_AGENCY = new MessageIntelligenceAgencyImpl();
101     private static final String MESSAGE_INTELLIGENCE_AGENCY_MX_BEAN_NAME = String
102             .format("%s:type=%s",
103                     MessageIntelligenceAgencyMXBean.class.getPackage().getName(),
104                     MessageIntelligenceAgencyMXBean.class.getSimpleName());
105
106     private final HashedWheelTimer hashedWheelTimer =
107             new HashedWheelTimer(TICK_DURATION, TimeUnit.MILLISECONDS, TICKS_PER_WHEEL);
108     private final NotificationPublishService notificationPublishService;
109     private final ExtensionConverterManager extensionConverterManager;
110     private final DataBroker dataBroker;
111     private final Collection<SwitchConnectionProvider> switchConnectionProviders;
112     private final DeviceInitializerProvider deviceInitializerProvider;
113     private final ConvertorManager convertorManager;
114     private final RpcProviderRegistry rpcProviderRegistry;
115     private final ClusterSingletonServiceProvider singletonServicesProvider;
116     private final OpenflowProviderConfig config;
117     private final EntityOwnershipService entityOwnershipService;
118     private final MastershipChangeServiceManager mastershipChangeServiceManager;
119     private DeviceManager deviceManager;
120     private RpcManager rpcManager;
121     private StatisticsManager statisticsManager;
122     private RoleManager roleManager;
123     private ConnectionManager connectionManager;
124     private ListeningExecutorService executorService;
125     private ContextChainHolderImpl contextChainHolder;
126     private final OpenflowPluginDiagStatusProvider openflowPluginStatusMonitor;
127
128     public static MessageIntelligenceAgency getMessageIntelligenceAgency() {
129         return MESSAGE_INTELLIGENCE_AGENCY;
130     }
131
132     @Inject
133     public OpenFlowPluginProviderImpl(final ConfigurationService configurationService,
134                                final SwitchConnectionProviderList switchConnectionProviders,
135                                final PingPongDataBroker pingPongDataBroker,
136                                final @Reference RpcProviderRegistry rpcProviderRegistry,
137                                final @Reference NotificationPublishService notificationPublishService,
138                                final @Reference ClusterSingletonServiceProvider singletonServiceProvider,
139                                final @Reference EntityOwnershipService entityOwnershipService,
140                                final MastershipChangeServiceManager mastershipChangeServiceManager,
141                                final OpenflowPluginDiagStatusProvider openflowPluginStatusMonitor,
142                                final @Reference SystemReadyMonitor systemReadyMonitor) {
143         this.switchConnectionProviders = switchConnectionProviders;
144         this.dataBroker = pingPongDataBroker;
145         this.rpcProviderRegistry = rpcProviderRegistry;
146         this.notificationPublishService = notificationPublishService;
147         this.singletonServicesProvider = singletonServiceProvider;
148         this.entityOwnershipService = entityOwnershipService;
149         convertorManager = ConvertorManagerFactory.createDefaultManager();
150         extensionConverterManager = new ExtensionConverterManagerImpl();
151         deviceInitializerProvider = DeviceInitializerProviderFactory.createDefaultProvider();
152         config = new OpenFlowProviderConfigImpl(configurationService);
153         this.mastershipChangeServiceManager = mastershipChangeServiceManager;
154         this.openflowPluginStatusMonitor = openflowPluginStatusMonitor;
155         systemReadyMonitor.registerListener(this);
156         LOG.debug("registered onSystemBootReady() listener for deferred startSwitchConnections()");
157     }
158
159     @Override
160     public void onSystemBootReady() {
161         LOG.debug("onSystemBootReady() received, starting the switch connections");
162         startSwitchConnections();
163     }
164
165     private void startSwitchConnections() {
166         Futures.addCallback(Futures.allAsList(switchConnectionProviders.stream().map(switchConnectionProvider -> {
167             // Inject OpenFlowPlugin custom serializers and deserializers into OpenFlowJava
168             if (config.isUseSingleLayerSerialization()) {
169                 SerializerInjector.injectSerializers(switchConnectionProvider,
170                         switchConnectionProvider.getConfiguration().isGroupAddModEnabled());
171                 DeserializerInjector.injectDeserializers(switchConnectionProvider);
172             } else {
173                 DeserializerInjector.revertDeserializers(switchConnectionProvider);
174             }
175
176             // Set handler of incoming connections and start switch connection provider
177             switchConnectionProvider.setSwitchConnectionHandler(connectionManager);
178             return switchConnectionProvider.startup();
179         }).collect(Collectors.toSet())), new FutureCallback<List<Boolean>>() {
180             @Override
181             public void onSuccess(@Nonnull final List<Boolean> result) {
182                 LOG.info("All switchConnectionProviders are up and running ({}).", result.size());
183                 openflowPluginStatusMonitor.reportStatus(ServiceState.OPERATIONAL);
184             }
185
186             @Override
187             public void onFailure(@Nonnull final Throwable throwable) {
188                 LOG.warn("Some switchConnectionProviders failed to start.", throwable);
189                 openflowPluginStatusMonitor.reportStatus(ServiceState.ERROR, throwable);
190             }
191         }, MoreExecutors.directExecutor());
192     }
193
194     private ListenableFuture<List<Boolean>> shutdownSwitchConnections() {
195         final ListenableFuture<List<Boolean>> listListenableFuture =
196                 Futures.allAsList(switchConnectionProviders.stream().map(switchConnectionProvider -> {
197                     // Revert deserializers to their original state
198                     if (config.isUseSingleLayerSerialization()) {
199                         DeserializerInjector.revertDeserializers(switchConnectionProvider);
200                     }
201
202                     // Shutdown switch connection provider
203                     return switchConnectionProvider.shutdown();
204                 }).collect(Collectors.toSet()));
205
206         Futures.addCallback(listListenableFuture, new FutureCallback<List<Boolean>>() {
207             @Override
208             public void onSuccess(@Nonnull final List<Boolean> result) {
209                 LOG.info("All switchConnectionProviders were successfully shut down ({}).", result.size());
210             }
211
212             @Override
213             public void onFailure(@Nonnull final Throwable throwable) {
214                 LOG.warn("Some switchConnectionProviders failed to shutdown.", throwable);
215             }
216         }, MoreExecutors.directExecutor());
217
218         return listListenableFuture;
219     }
220
221     @Override
222     @PostConstruct
223     public void initialize() {
224         registerMXBean(MESSAGE_INTELLIGENCE_AGENCY, MESSAGE_INTELLIGENCE_AGENCY_MX_BEAN_NAME);
225
226         // TODO: copied from OpenFlowPluginProvider (Helium) misusesing the old way of distributing extension converters
227         // TODO: rewrite later!
228         OFSessionUtil.getSessionManager().setExtensionConverterProvider(extensionConverterManager);
229
230         // Creates a thread pool that creates new threads as needed, but will reuse previously
231         // constructed threads when they are available.
232         // Threads that have not been used for x seconds are terminated and removed from the cache.
233         executorService = MoreExecutors.listeningDecorator(new ThreadPoolLoggingExecutor(
234                 config.getThreadPoolMinThreads(),
235                 config.getThreadPoolMaxThreads().getValue(),
236                 config.getThreadPoolTimeout(),
237                 TimeUnit.SECONDS, new SynchronousQueue<>(), POOL_NAME));
238
239         deviceManager = new DeviceManagerImpl(
240                 config,
241                 dataBroker,
242                 getMessageIntelligenceAgency(),
243                 notificationPublishService,
244                 hashedWheelTimer,
245                 convertorManager,
246                 deviceInitializerProvider);
247
248         TranslatorLibraryUtil.injectBasicTranslatorLibrary(deviceManager, convertorManager);
249         ((ExtensionConverterProviderKeeper) deviceManager).setExtensionConverterProvider(extensionConverterManager);
250
251         rpcManager = new RpcManagerImpl(
252                 config,
253                 rpcProviderRegistry,
254                 extensionConverterManager,
255                 convertorManager,
256                 notificationPublishService);
257
258         statisticsManager = new StatisticsManagerImpl(
259                 config,
260                 rpcProviderRegistry,
261                 convertorManager,
262                 executorService);
263
264         roleManager = new RoleManagerImpl(hashedWheelTimer, config);
265
266         contextChainHolder = new ContextChainHolderImpl(
267                 executorService,
268                 singletonServicesProvider,
269                 entityOwnershipService,
270                 mastershipChangeServiceManager);
271
272         contextChainHolder.addManager(deviceManager);
273         contextChainHolder.addManager(statisticsManager);
274         contextChainHolder.addManager(rpcManager);
275         contextChainHolder.addManager(roleManager);
276
277         connectionManager = new ConnectionManagerImpl(config, executorService);
278         connectionManager.setDeviceConnectedHandler(contextChainHolder);
279         connectionManager.setDeviceDisconnectedHandler(contextChainHolder);
280
281         deviceManager.setContextChainHolder(contextChainHolder);
282         deviceManager.initialize();
283     }
284
285     @Override
286     public ExtensionConverterRegistrator getExtensionConverterRegistrator() {
287         return extensionConverterManager;
288     }
289
290     @Override
291     @PreDestroy
292     public void close() {
293         try {
294             shutdownSwitchConnections().get(10, TimeUnit.SECONDS);
295         } catch (InterruptedException | ExecutionException | TimeoutException e) {
296             LOG.warn("Failed to shut down switch connections in time {}s", 10, e);
297         }
298
299         gracefulShutdown(contextChainHolder);
300         gracefulShutdown(deviceManager);
301         gracefulShutdown(rpcManager);
302         gracefulShutdown(statisticsManager);
303         gracefulShutdown(roleManager);
304         gracefulShutdown(executorService);
305         gracefulShutdown(hashedWheelTimer);
306         unregisterMXBean(MESSAGE_INTELLIGENCE_AGENCY_MX_BEAN_NAME);
307         openflowPluginStatusMonitor.reportStatus(ServiceState.UNREGISTERED);
308     }
309
310     @SuppressWarnings("checkstyle:IllegalCatch")
311     private static void gracefulShutdown(final AutoCloseable closeable) {
312         if (Objects.isNull(closeable)) {
313             return;
314         }
315
316         try {
317             closeable.close();
318         } catch (Exception e) {
319             LOG.warn("Failed to shutdown {} gracefully.", closeable);
320         }
321     }
322
323     private static void gracefulShutdown(final Timer timer) {
324         if (Objects.isNull(timer)) {
325             return;
326         }
327
328         try {
329             timer.stop();
330         } catch (IllegalStateException e) {
331             LOG.warn("Failed to shutdown {} gracefully.", timer);
332         }
333     }
334
335     private static void gracefulShutdown(final ExecutorService executorService) {
336         if (Objects.isNull(executorService)) {
337             return;
338         }
339
340         try {
341             executorService.shutdownNow();
342         } catch (SecurityException e) {
343             LOG.warn("Failed to shutdown {} gracefully.", executorService);
344         }
345     }
346
347     private static void registerMXBean(final Object bean, final String beanName) {
348         final MBeanServer mbs = ManagementFactory.getPlatformMBeanServer();
349
350         try {
351             mbs.registerMBean(bean, new ObjectName(beanName));
352         } catch (MalformedObjectNameException
353                 | NotCompliantMBeanException
354                 | MBeanRegistrationException
355                 | InstanceAlreadyExistsException e) {
356             LOG.warn("Error registering MBean {}", e);
357         }
358     }
359
360     private static void unregisterMXBean(final String beanName) {
361         final MBeanServer mbs = ManagementFactory.getPlatformMBeanServer();
362
363         try {
364             mbs.unregisterMBean(new ObjectName(beanName));
365         } catch (InstanceNotFoundException
366                 | MBeanRegistrationException
367                 | MalformedObjectNameException e) {
368             LOG.warn("Error unregistering MBean {}", e);
369         }
370     }
371 }