DiagStatus integration for openflowplugin
[openflowplugin.git] / openflowplugin-impl / src / main / java / org / opendaylight / openflowplugin / impl / OpenFlowPluginProviderImpl.java
1 /*
2  * Copyright (c) 2015 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.openflowplugin.impl;
10
11 import com.google.common.util.concurrent.FutureCallback;
12 import com.google.common.util.concurrent.Futures;
13 import com.google.common.util.concurrent.ListenableFuture;
14 import com.google.common.util.concurrent.ListeningExecutorService;
15 import com.google.common.util.concurrent.MoreExecutors;
16 import io.netty.util.HashedWheelTimer;
17 import io.netty.util.Timer;
18 import java.lang.management.ManagementFactory;
19 import java.util.Collection;
20 import java.util.List;
21 import java.util.Objects;
22 import java.util.concurrent.ExecutionException;
23 import java.util.concurrent.ExecutorService;
24 import java.util.concurrent.SynchronousQueue;
25 import java.util.concurrent.TimeUnit;
26 import java.util.concurrent.TimeoutException;
27 import java.util.stream.Collectors;
28 import javax.annotation.Nonnull;
29 import javax.management.InstanceAlreadyExistsException;
30 import javax.management.InstanceNotFoundException;
31 import javax.management.MBeanRegistrationException;
32 import javax.management.MBeanServer;
33 import javax.management.MalformedObjectNameException;
34 import javax.management.NotCompliantMBeanException;
35 import javax.management.ObjectName;
36 import org.opendaylight.controller.md.sal.binding.api.DataBroker;
37 import org.opendaylight.controller.md.sal.binding.api.NotificationPublishService;
38 import org.opendaylight.controller.md.sal.common.api.clustering.EntityOwnershipService;
39 import org.opendaylight.controller.sal.binding.api.RpcProviderRegistry;
40 import org.opendaylight.infrautils.diagstatus.DiagStatusService;
41 import org.opendaylight.infrautils.diagstatus.ServiceState;
42 import org.opendaylight.mdsal.singleton.common.api.ClusterSingletonServiceProvider;
43 import org.opendaylight.openflowjava.protocol.spi.connection.SwitchConnectionProvider;
44 import org.opendaylight.openflowplugin.api.openflow.OpenFlowPluginProvider;
45 import org.opendaylight.openflowplugin.api.openflow.configuration.ConfigurationService;
46 import org.opendaylight.openflowplugin.api.openflow.connection.ConnectionManager;
47 import org.opendaylight.openflowplugin.api.openflow.device.DeviceManager;
48 import org.opendaylight.openflowplugin.api.openflow.mastership.MastershipChangeServiceManager;
49 import org.opendaylight.openflowplugin.api.openflow.role.RoleManager;
50 import org.opendaylight.openflowplugin.api.openflow.rpc.RpcManager;
51 import org.opendaylight.openflowplugin.api.openflow.statistics.StatisticsManager;
52 import org.opendaylight.openflowplugin.api.openflow.statistics.ofpspecific.MessageIntelligenceAgency;
53 import org.opendaylight.openflowplugin.extension.api.ExtensionConverterProviderKeeper;
54 import org.opendaylight.openflowplugin.extension.api.ExtensionConverterRegistrator;
55 import org.opendaylight.openflowplugin.extension.api.OpenFlowPluginExtensionRegistratorProvider;
56 import org.opendaylight.openflowplugin.extension.api.core.extension.ExtensionConverterManager;
57 import org.opendaylight.openflowplugin.impl.configuration.OpenFlowProviderConfigImpl;
58 import org.opendaylight.openflowplugin.impl.connection.ConnectionManagerImpl;
59 import org.opendaylight.openflowplugin.impl.device.DeviceManagerImpl;
60 import org.opendaylight.openflowplugin.impl.device.initialization.DeviceInitializerProvider;
61 import org.opendaylight.openflowplugin.impl.device.initialization.DeviceInitializerProviderFactory;
62 import org.opendaylight.openflowplugin.impl.diagstatus.OpenflowPluginDiagStatusProvider;
63 import org.opendaylight.openflowplugin.impl.lifecycle.ContextChainHolderImpl;
64 import org.opendaylight.openflowplugin.impl.protocol.deserialization.DeserializerInjector;
65 import org.opendaylight.openflowplugin.impl.protocol.serialization.SerializerInjector;
66 import org.opendaylight.openflowplugin.impl.role.RoleManagerImpl;
67 import org.opendaylight.openflowplugin.impl.rpc.RpcManagerImpl;
68 import org.opendaylight.openflowplugin.impl.statistics.StatisticsManagerImpl;
69 import org.opendaylight.openflowplugin.impl.statistics.ofpspecific.MessageIntelligenceAgencyImpl;
70 import org.opendaylight.openflowplugin.impl.statistics.ofpspecific.MessageIntelligenceAgencyMXBean;
71 import org.opendaylight.openflowplugin.impl.util.ThreadPoolLoggingExecutor;
72 import org.opendaylight.openflowplugin.impl.util.TranslatorLibraryUtil;
73 import org.opendaylight.openflowplugin.openflow.md.core.extension.ExtensionConverterManagerImpl;
74 import org.opendaylight.openflowplugin.openflow.md.core.sal.convertor.ConvertorManager;
75 import org.opendaylight.openflowplugin.openflow.md.core.sal.convertor.ConvertorManagerFactory;
76 import org.opendaylight.openflowplugin.openflow.md.core.session.OFSessionUtil;
77 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.openflow.provider.config.rev160510.OpenflowProviderConfig;
78 import org.slf4j.Logger;
79 import org.slf4j.LoggerFactory;
80
81 public class OpenFlowPluginProviderImpl implements
82         OpenFlowPluginProvider,
83         OpenFlowPluginExtensionRegistratorProvider {
84
85     private static final Logger LOG = LoggerFactory.getLogger(OpenFlowPluginProviderImpl.class);
86
87     private static final int TICKS_PER_WHEEL = 500; // 0.5 sec.
88     private static final long TICK_DURATION = 10;
89     private static final String POOL_NAME = "ofppool";
90
91     private static final MessageIntelligenceAgency MESSAGE_INTELLIGENCE_AGENCY = new MessageIntelligenceAgencyImpl();
92     private static final String MESSAGE_INTELLIGENCE_AGENCY_MX_BEAN_NAME = String
93             .format("%s:type=%s",
94                     MessageIntelligenceAgencyMXBean.class.getPackage().getName(),
95                     MessageIntelligenceAgencyMXBean.class.getSimpleName());
96
97     private final HashedWheelTimer hashedWheelTimer =
98             new HashedWheelTimer(TICK_DURATION, TimeUnit.MILLISECONDS, TICKS_PER_WHEEL);
99     private final NotificationPublishService notificationPublishService;
100     private final ExtensionConverterManager extensionConverterManager;
101     private final DataBroker dataBroker;
102     private final Collection<SwitchConnectionProvider> switchConnectionProviders;
103     private final DeviceInitializerProvider deviceInitializerProvider;
104     private final ConvertorManager convertorManager;
105     private final RpcProviderRegistry rpcProviderRegistry;
106     private final ClusterSingletonServiceProvider singletonServicesProvider;
107     private final OpenflowProviderConfig config;
108     private final EntityOwnershipService entityOwnershipService;
109     private final MastershipChangeServiceManager mastershipChangeServiceManager;
110     private DeviceManager deviceManager;
111     private RpcManager rpcManager;
112     private StatisticsManager statisticsManager;
113     private RoleManager roleManager;
114     private ConnectionManager connectionManager;
115     private ListeningExecutorService executorService;
116     private ContextChainHolderImpl contextChainHolder;
117     private OpenflowPluginDiagStatusProvider openflowPluginStatusMonitor;
118
119     public static MessageIntelligenceAgency getMessageIntelligenceAgency() {
120         return MESSAGE_INTELLIGENCE_AGENCY;
121     }
122
123     OpenFlowPluginProviderImpl(final ConfigurationService configurationService,
124                                final List<SwitchConnectionProvider> switchConnectionProviders,
125                                final DataBroker dataBroker,
126                                final RpcProviderRegistry rpcProviderRegistry,
127                                final NotificationPublishService notificationPublishService,
128                                final ClusterSingletonServiceProvider singletonServiceProvider,
129                                final EntityOwnershipService entityOwnershipService,
130                                final MastershipChangeServiceManager mastershipChangeServiceManager,
131                                final DiagStatusService diagStatusService) {
132         this.switchConnectionProviders = switchConnectionProviders;
133         this.dataBroker = dataBroker;
134         this.rpcProviderRegistry = rpcProviderRegistry;
135         this.notificationPublishService = notificationPublishService;
136         this.singletonServicesProvider = singletonServiceProvider;
137         this.entityOwnershipService = entityOwnershipService;
138         convertorManager = ConvertorManagerFactory.createDefaultManager();
139         extensionConverterManager = new ExtensionConverterManagerImpl();
140         deviceInitializerProvider = DeviceInitializerProviderFactory.createDefaultProvider();
141         config = new OpenFlowProviderConfigImpl(configurationService);
142         this.mastershipChangeServiceManager = mastershipChangeServiceManager;
143         openflowPluginStatusMonitor = new OpenflowPluginDiagStatusProvider(diagStatusService);
144     }
145
146
147     private void startSwitchConnections() {
148         Futures.addCallback(Futures.allAsList(switchConnectionProviders.stream().map(switchConnectionProvider -> {
149             // Inject OpenFlowPlugin custom serializers and deserializers into OpenFlowJava
150             if (config.isUseSingleLayerSerialization()) {
151                 SerializerInjector.injectSerializers(switchConnectionProvider);
152                 DeserializerInjector.injectDeserializers(switchConnectionProvider);
153             } else {
154                 DeserializerInjector.revertDeserializers(switchConnectionProvider);
155             }
156
157             // Set handler of incoming connections and start switch connection provider
158             switchConnectionProvider.setSwitchConnectionHandler(connectionManager);
159             return switchConnectionProvider.startup();
160         }).collect(Collectors.toSet())), new FutureCallback<List<Boolean>>() {
161             @Override
162             public void onSuccess(final List<Boolean> result) {
163                 LOG.info("All switchConnectionProviders are up and running ({}).", result.size());
164                 openflowPluginStatusMonitor.reportStatus(ServiceState.OPERATIONAL, "switch connections started");
165             }
166
167             @Override
168             public void onFailure(@Nonnull final Throwable throwable) {
169                 LOG.warn("Some switchConnectionProviders failed to start.", throwable);
170                 openflowPluginStatusMonitor.reportStatus(ServiceState.ERROR, "some switch connections failed to start");
171             }
172         });
173     }
174
175     private ListenableFuture<List<Boolean>> shutdownSwitchConnections() {
176         final ListenableFuture<List<Boolean>> listListenableFuture =
177                 Futures.allAsList(switchConnectionProviders.stream().map(switchConnectionProvider -> {
178                     // Revert deserializers to their original state
179                     if (config.isUseSingleLayerSerialization()) {
180                         DeserializerInjector.revertDeserializers(switchConnectionProvider);
181                     }
182
183                     // Shutdown switch connection provider
184                     return switchConnectionProvider.shutdown();
185                 }).collect(Collectors.toSet()));
186
187         Futures.addCallback(listListenableFuture, new FutureCallback<List<Boolean>>() {
188             @Override
189             public void onSuccess(final List<Boolean> result) {
190                 LOG.info("All switchConnectionProviders were successfully shut down ({}).", result.size());
191             }
192
193             @Override
194             public void onFailure(@Nonnull final Throwable throwable) {
195                 LOG.warn("Some switchConnectionProviders failed to shutdown.", throwable);
196             }
197         });
198
199         return listListenableFuture;
200     }
201
202     @Override
203     public void initialize() {
204         registerMXBean(MESSAGE_INTELLIGENCE_AGENCY, MESSAGE_INTELLIGENCE_AGENCY_MX_BEAN_NAME);
205
206         // TODO: copied from OpenFlowPluginProvider (Helium) misusesing the old way of distributing extension converters
207         // TODO: rewrite later!
208         OFSessionUtil.getSessionManager().setExtensionConverterProvider(extensionConverterManager);
209
210         // Creates a thread pool that creates new threads as needed, but will reuse previously
211         // constructed threads when they are available.
212         // Threads that have not been used for x seconds are terminated and removed from the cache.
213         executorService = MoreExecutors.listeningDecorator(new ThreadPoolLoggingExecutor(
214                 config.getThreadPoolMinThreads(),
215                 config.getThreadPoolMaxThreads().getValue(),
216                 config.getThreadPoolTimeout(),
217                 TimeUnit.SECONDS, new SynchronousQueue<>(), POOL_NAME));
218
219         deviceManager = new DeviceManagerImpl(
220                 config,
221                 dataBroker,
222                 getMessageIntelligenceAgency(),
223                 notificationPublishService,
224                 hashedWheelTimer,
225                 convertorManager,
226                 deviceInitializerProvider);
227
228         TranslatorLibraryUtil.injectBasicTranslatorLibrary(deviceManager, convertorManager);
229         ((ExtensionConverterProviderKeeper) deviceManager).setExtensionConverterProvider(extensionConverterManager);
230
231         rpcManager = new RpcManagerImpl(
232                 config,
233                 rpcProviderRegistry,
234                 extensionConverterManager,
235                 convertorManager,
236                 notificationPublishService);
237
238         statisticsManager = new StatisticsManagerImpl(
239                 config,
240                 rpcProviderRegistry,
241                 convertorManager,
242                 executorService);
243
244         roleManager = new RoleManagerImpl(hashedWheelTimer);
245
246         contextChainHolder = new ContextChainHolderImpl(
247                 executorService,
248                 singletonServicesProvider,
249                 entityOwnershipService,
250                 mastershipChangeServiceManager);
251
252         contextChainHolder.addManager(deviceManager);
253         contextChainHolder.addManager(statisticsManager);
254         contextChainHolder.addManager(rpcManager);
255         contextChainHolder.addManager(roleManager);
256
257         connectionManager = new ConnectionManagerImpl(config, executorService);
258         connectionManager.setDeviceConnectedHandler(contextChainHolder);
259         connectionManager.setDeviceDisconnectedHandler(contextChainHolder);
260
261         deviceManager.initialize();
262         startSwitchConnections();
263     }
264
265     @Override
266     public ExtensionConverterRegistrator getExtensionConverterRegistrator() {
267         return extensionConverterManager;
268     }
269
270     @Override
271     public void close() {
272         try {
273             shutdownSwitchConnections().get(10, TimeUnit.SECONDS);
274         } catch (InterruptedException | ExecutionException | TimeoutException e) {
275             LOG.warn("Failed to shut down switch connections in time {}s, error: {}", 10, e);
276         }
277
278         gracefulShutdown(contextChainHolder);
279         gracefulShutdown(deviceManager);
280         gracefulShutdown(rpcManager);
281         gracefulShutdown(statisticsManager);
282         gracefulShutdown(roleManager);
283         gracefulShutdown(executorService);
284         gracefulShutdown(hashedWheelTimer);
285         unregisterMXBean(MESSAGE_INTELLIGENCE_AGENCY_MX_BEAN_NAME);
286         openflowPluginStatusMonitor.reportStatus(ServiceState.UNREGISTERED, "service shutting down");
287     }
288
289     @SuppressWarnings("checkstyle:IllegalCatch")
290     private static void gracefulShutdown(final AutoCloseable closeable) {
291         if (Objects.isNull(closeable)) {
292             return;
293         }
294
295         try {
296             closeable.close();
297         } catch (Exception e) {
298             LOG.warn("Failed to shutdown {} gracefully.", closeable);
299         }
300     }
301
302     private static void gracefulShutdown(final Timer timer) {
303         if (Objects.isNull(timer)) {
304             return;
305         }
306
307         try {
308             timer.stop();
309         } catch (IllegalStateException e) {
310             LOG.warn("Failed to shutdown {} gracefully.", timer);
311         }
312     }
313
314     private static void gracefulShutdown(final ExecutorService executorService) {
315         if (Objects.isNull(executorService)) {
316             return;
317         }
318
319         try {
320             executorService.shutdownNow();
321         } catch (SecurityException e) {
322             LOG.warn("Failed to shutdown {} gracefully.", executorService);
323         }
324     }
325
326     private static void registerMXBean(final Object bean, final String beanName) {
327         final MBeanServer mbs = ManagementFactory.getPlatformMBeanServer();
328
329         try {
330             mbs.registerMBean(bean, new ObjectName(beanName));
331         } catch (MalformedObjectNameException
332                 | NotCompliantMBeanException
333                 | MBeanRegistrationException
334                 | InstanceAlreadyExistsException e) {
335             LOG.warn("Error registering MBean {}", e);
336         }
337     }
338
339     private static void unregisterMXBean(final String beanName) {
340         final MBeanServer mbs = ManagementFactory.getPlatformMBeanServer();
341
342         try {
343             mbs.unregisterMBean(new ObjectName(beanName));
344         } catch (InstanceNotFoundException
345                 | MBeanRegistrationException
346                 | MalformedObjectNameException e) {
347             LOG.warn("Error unregistering MBean {}", e);
348         }
349     }
350 }