OPNFLWPLUG-1064: Openflow diagstatus: Merge all
[openflowplugin.git] / openflowplugin-impl / src / main / java / org / opendaylight / openflowplugin / impl / OpenFlowPluginProviderImpl.java
1 /*
2  * Copyright (c) 2015 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.openflowplugin.impl;
9
10 import com.google.common.annotations.VisibleForTesting;
11 import com.google.common.util.concurrent.FutureCallback;
12 import com.google.common.util.concurrent.Futures;
13 import com.google.common.util.concurrent.ListenableFuture;
14 import com.google.common.util.concurrent.ListeningExecutorService;
15 import com.google.common.util.concurrent.MoreExecutors;
16 import com.google.common.util.concurrent.SettableFuture;
17 import io.netty.util.HashedWheelTimer;
18 import io.netty.util.Timer;
19 import java.lang.management.ManagementFactory;
20 import java.util.Collection;
21 import java.util.List;
22 import java.util.Objects;
23 import java.util.concurrent.ExecutionException;
24 import java.util.concurrent.ExecutorService;
25 import java.util.concurrent.Future;
26 import java.util.concurrent.SynchronousQueue;
27 import java.util.concurrent.TimeUnit;
28 import java.util.concurrent.TimeoutException;
29 import java.util.stream.Collectors;
30 import javax.annotation.Nonnull;
31 import javax.annotation.PostConstruct;
32 import javax.annotation.PreDestroy;
33 import javax.inject.Inject;
34 import javax.inject.Singleton;
35 import javax.management.InstanceAlreadyExistsException;
36 import javax.management.InstanceNotFoundException;
37 import javax.management.MBeanRegistrationException;
38 import javax.management.MBeanServer;
39 import javax.management.MalformedObjectNameException;
40 import javax.management.NotCompliantMBeanException;
41 import javax.management.ObjectName;
42 import org.apache.aries.blueprint.annotation.service.Reference;
43 import org.apache.aries.blueprint.annotation.service.Service;
44 import org.opendaylight.infrautils.diagstatus.ServiceState;
45 import org.opendaylight.infrautils.ready.SystemReadyListener;
46 import org.opendaylight.infrautils.ready.SystemReadyMonitor;
47 import org.opendaylight.mdsal.binding.api.DataBroker;
48 import org.opendaylight.mdsal.binding.api.NotificationPublishService;
49 import org.opendaylight.mdsal.binding.api.RpcProviderService;
50 import org.opendaylight.mdsal.eos.binding.api.EntityOwnershipService;
51 import org.opendaylight.mdsal.singleton.common.api.ClusterSingletonServiceProvider;
52 import org.opendaylight.openflowjava.protocol.api.connection.OpenflowDiagStatusProvider;
53 import org.opendaylight.openflowjava.protocol.spi.connection.SwitchConnectionProvider;
54 import org.opendaylight.openflowjava.protocol.spi.connection.SwitchConnectionProviderList;
55 import org.opendaylight.openflowplugin.api.openflow.OpenFlowPluginProvider;
56 import org.opendaylight.openflowplugin.api.openflow.configuration.ConfigurationService;
57 import org.opendaylight.openflowplugin.api.openflow.connection.ConnectionManager;
58 import org.opendaylight.openflowplugin.api.openflow.device.DeviceManager;
59 import org.opendaylight.openflowplugin.api.openflow.mastership.MastershipChangeServiceManager;
60 import org.opendaylight.openflowplugin.api.openflow.role.RoleManager;
61 import org.opendaylight.openflowplugin.api.openflow.rpc.RpcManager;
62 import org.opendaylight.openflowplugin.api.openflow.statistics.StatisticsManager;
63 import org.opendaylight.openflowplugin.api.openflow.statistics.ofpspecific.MessageIntelligenceAgency;
64 import org.opendaylight.openflowplugin.extension.api.ExtensionConverterProviderKeeper;
65 import org.opendaylight.openflowplugin.extension.api.ExtensionConverterRegistrator;
66 import org.opendaylight.openflowplugin.extension.api.OpenFlowPluginExtensionRegistratorProvider;
67 import org.opendaylight.openflowplugin.extension.api.core.extension.ExtensionConverterManager;
68 import org.opendaylight.openflowplugin.impl.configuration.OpenFlowProviderConfigImpl;
69 import org.opendaylight.openflowplugin.impl.connection.ConnectionManagerImpl;
70 import org.opendaylight.openflowplugin.impl.device.DeviceManagerImpl;
71 import org.opendaylight.openflowplugin.impl.device.initialization.DeviceInitializerProvider;
72 import org.opendaylight.openflowplugin.impl.device.initialization.DeviceInitializerProviderFactory;
73 import org.opendaylight.openflowplugin.impl.lifecycle.ContextChainHolderImpl;
74 import org.opendaylight.openflowplugin.impl.protocol.deserialization.DeserializerInjector;
75 import org.opendaylight.openflowplugin.impl.protocol.serialization.SerializerInjector;
76 import org.opendaylight.openflowplugin.impl.role.RoleManagerImpl;
77 import org.opendaylight.openflowplugin.impl.rpc.RpcManagerImpl;
78 import org.opendaylight.openflowplugin.impl.statistics.StatisticsManagerImpl;
79 import org.opendaylight.openflowplugin.impl.statistics.ofpspecific.MessageIntelligenceAgencyImpl;
80 import org.opendaylight.openflowplugin.impl.statistics.ofpspecific.MessageIntelligenceAgencyMXBean;
81 import org.opendaylight.openflowplugin.impl.util.ThreadPoolLoggingExecutor;
82 import org.opendaylight.openflowplugin.impl.util.TranslatorLibraryUtil;
83 import org.opendaylight.openflowplugin.openflow.md.core.extension.ExtensionConverterManagerImpl;
84 import org.opendaylight.openflowplugin.openflow.md.core.sal.convertor.ConvertorManager;
85 import org.opendaylight.openflowplugin.openflow.md.core.sal.convertor.ConvertorManagerFactory;
86 import org.opendaylight.openflowplugin.openflow.md.core.session.OFSessionUtil;
87 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.openflow.provider.config.rev160510.OpenflowProviderConfig;
88 import org.slf4j.Logger;
89 import org.slf4j.LoggerFactory;
90
91 @Singleton
92 @Service(classes = { OpenFlowPluginProvider.class, OpenFlowPluginExtensionRegistratorProvider.class })
93 public class OpenFlowPluginProviderImpl implements
94         OpenFlowPluginProvider,
95         OpenFlowPluginExtensionRegistratorProvider,
96         SystemReadyListener {
97
98     private static final Logger LOG = LoggerFactory.getLogger(OpenFlowPluginProviderImpl.class);
99
100     private static final int TICKS_PER_WHEEL = 500; // 0.5 sec.
101     private static final long TICK_DURATION = 10;
102     private static final String POOL_NAME = "ofppool";
103
104     private static final MessageIntelligenceAgency MESSAGE_INTELLIGENCE_AGENCY = new MessageIntelligenceAgencyImpl();
105     private static final String MESSAGE_INTELLIGENCE_AGENCY_MX_BEAN_NAME = String
106             .format("%s:type=%s",
107                     MessageIntelligenceAgencyMXBean.class.getPackage().getName(),
108                     MessageIntelligenceAgencyMXBean.class.getSimpleName());
109
110     private final HashedWheelTimer hashedWheelTimer =
111             new HashedWheelTimer(TICK_DURATION, TimeUnit.MILLISECONDS, TICKS_PER_WHEEL);
112     private final NotificationPublishService notificationPublishService;
113     private final ExtensionConverterManager extensionConverterManager;
114     private final DataBroker dataBroker;
115     private final Collection<SwitchConnectionProvider> switchConnectionProviders;
116     private final DeviceInitializerProvider deviceInitializerProvider;
117     private final ConvertorManager convertorManager;
118     private final RpcProviderService rpcProviderRegistry;
119     private final ClusterSingletonServiceProvider singletonServicesProvider;
120     private final OpenflowProviderConfig config;
121     private final EntityOwnershipService entityOwnershipService;
122     private final MastershipChangeServiceManager mastershipChangeServiceManager;
123     private DeviceManager deviceManager;
124     private RpcManager rpcManager;
125     private StatisticsManager statisticsManager;
126     private RoleManager roleManager;
127     private ConnectionManager connectionManager;
128     private ListeningExecutorService executorService;
129     private ContextChainHolderImpl contextChainHolder;
130     private final OpenflowDiagStatusProvider openflowDiagStatusProvider;
131     private final SettableFuture<Void> fullyStarted = SettableFuture.create();
132     private static final String OPENFLOW_SERVICE_NAME = "OPENFLOW";
133
134     public static MessageIntelligenceAgency getMessageIntelligenceAgency() {
135         return MESSAGE_INTELLIGENCE_AGENCY;
136     }
137
138     @Inject
139     public OpenFlowPluginProviderImpl(final ConfigurationService configurationService,
140                                final SwitchConnectionProviderList switchConnectionProviders,
141                                final PingPongDataBroker pingPongDataBroker,
142                                final @Reference RpcProviderService rpcProviderRegistry,
143                                final @Reference NotificationPublishService notificationPublishService,
144                                final @Reference ClusterSingletonServiceProvider singletonServiceProvider,
145                                final @Reference EntityOwnershipService entityOwnershipService,
146                                final MastershipChangeServiceManager mastershipChangeServiceManager,
147                                final @Reference OpenflowDiagStatusProvider openflowDiagStatusProvider,
148                                final @Reference SystemReadyMonitor systemReadyMonitor) {
149         this.switchConnectionProviders = switchConnectionProviders;
150         this.dataBroker = pingPongDataBroker;
151         this.rpcProviderRegistry = rpcProviderRegistry;
152         this.notificationPublishService = notificationPublishService;
153         this.singletonServicesProvider = singletonServiceProvider;
154         this.entityOwnershipService = entityOwnershipService;
155         convertorManager = ConvertorManagerFactory.createDefaultManager();
156         extensionConverterManager = new ExtensionConverterManagerImpl();
157         deviceInitializerProvider = DeviceInitializerProviderFactory.createDefaultProvider();
158         config = new OpenFlowProviderConfigImpl(configurationService);
159         this.mastershipChangeServiceManager = mastershipChangeServiceManager;
160         this.openflowDiagStatusProvider = openflowDiagStatusProvider;
161         systemReadyMonitor.registerListener(this);
162         LOG.info("registered onSystemBootReady() listener for deferred startSwitchConnections()");
163     }
164
165     @Override
166     public void onSystemBootReady() {
167         LOG.info("onSystemBootReady() received, starting the switch connections");
168         Futures.addCallback(Futures.allAsList(switchConnectionProviders.stream().map(switchConnectionProvider -> {
169             // Inject OpenFlowPlugin custom serializers and deserializers into OpenFlowJava
170             if (config.isUseSingleLayerSerialization()) {
171                 SerializerInjector.injectSerializers(switchConnectionProvider,
172                         switchConnectionProvider.getConfiguration().isGroupAddModEnabled());
173                 DeserializerInjector.injectDeserializers(switchConnectionProvider);
174             } else {
175                 DeserializerInjector.revertDeserializers(switchConnectionProvider);
176             }
177
178             // Set handler of incoming connections and start switch connection provider
179             switchConnectionProvider.setSwitchConnectionHandler(connectionManager);
180             return switchConnectionProvider.startup();
181         }).collect(Collectors.toSet())), new FutureCallback<List<Boolean>>() {
182             @Override
183             public void onSuccess(@Nonnull final List<Boolean> result) {
184                 LOG.info("All switchConnectionProviders are up and running ({}).", result.size());
185                 openflowDiagStatusProvider.reportStatus(OPENFLOW_SERVICE_NAME, ServiceState.OPERATIONAL);
186                 fullyStarted.set(null);
187             }
188
189             @Override
190             public void onFailure(@Nonnull final Throwable throwable) {
191                 LOG.warn("Some switchConnectionProviders failed to start.", throwable);
192                 openflowDiagStatusProvider.reportStatus(OPENFLOW_SERVICE_NAME, throwable);
193                 fullyStarted.setException(throwable);
194             }
195         }, MoreExecutors.directExecutor());
196     }
197
198     @VisibleForTesting
199     public Future<Void> getFullyStarted() {
200         return fullyStarted;
201     }
202
203     private ListenableFuture<List<Boolean>> shutdownSwitchConnections() {
204         final ListenableFuture<List<Boolean>> listListenableFuture =
205                 Futures.allAsList(switchConnectionProviders.stream().map(switchConnectionProvider -> {
206                     // Revert deserializers to their original state
207                     if (config.isUseSingleLayerSerialization()) {
208                         DeserializerInjector.revertDeserializers(switchConnectionProvider);
209                     }
210
211                     // Shutdown switch connection provider
212                     return switchConnectionProvider.shutdown();
213                 }).collect(Collectors.toSet()));
214
215         Futures.addCallback(listListenableFuture, new FutureCallback<List<Boolean>>() {
216             @Override
217             public void onSuccess(@Nonnull final List<Boolean> result) {
218                 LOG.info("All switchConnectionProviders were successfully shut down ({}).", result.size());
219             }
220
221             @Override
222             public void onFailure(@Nonnull final Throwable throwable) {
223                 LOG.warn("Some switchConnectionProviders failed to shutdown.", throwable);
224             }
225         }, MoreExecutors.directExecutor());
226
227         return listListenableFuture;
228     }
229
230     @Override
231     @PostConstruct
232     public void initialize() {
233         registerMXBean(MESSAGE_INTELLIGENCE_AGENCY, MESSAGE_INTELLIGENCE_AGENCY_MX_BEAN_NAME);
234
235         // TODO: copied from OpenFlowPluginProvider (Helium) misusesing the old way of distributing extension converters
236         // TODO: rewrite later!
237         OFSessionUtil.getSessionManager().setExtensionConverterProvider(extensionConverterManager);
238
239         // Creates a thread pool that creates new threads as needed, but will reuse previously
240         // constructed threads when they are available.
241         // Threads that have not been used for x seconds are terminated and removed from the cache.
242         executorService = MoreExecutors.listeningDecorator(new ThreadPoolLoggingExecutor(
243                 config.getThreadPoolMinThreads(),
244                 config.getThreadPoolMaxThreads().getValue(),
245                 config.getThreadPoolTimeout(),
246                 TimeUnit.SECONDS, new SynchronousQueue<>(), POOL_NAME));
247
248         deviceManager = new DeviceManagerImpl(
249                 config,
250                 dataBroker,
251                 getMessageIntelligenceAgency(),
252                 notificationPublishService,
253                 hashedWheelTimer,
254                 convertorManager,
255                 deviceInitializerProvider);
256
257         TranslatorLibraryUtil.injectBasicTranslatorLibrary(deviceManager, convertorManager);
258         ((ExtensionConverterProviderKeeper) deviceManager).setExtensionConverterProvider(extensionConverterManager);
259
260         rpcManager = new RpcManagerImpl(
261                 config,
262                 rpcProviderRegistry,
263                 extensionConverterManager,
264                 convertorManager,
265                 notificationPublishService);
266
267         statisticsManager = new StatisticsManagerImpl(
268                 config,
269                 rpcProviderRegistry,
270                 convertorManager,
271                 executorService);
272
273         roleManager = new RoleManagerImpl(hashedWheelTimer, config);
274
275         contextChainHolder = new ContextChainHolderImpl(
276                 executorService,
277                 singletonServicesProvider,
278                 entityOwnershipService,
279                 mastershipChangeServiceManager);
280
281         contextChainHolder.addManager(deviceManager);
282         contextChainHolder.addManager(statisticsManager);
283         contextChainHolder.addManager(rpcManager);
284         contextChainHolder.addManager(roleManager);
285
286         connectionManager = new ConnectionManagerImpl(config, executorService);
287         connectionManager.setDeviceConnectedHandler(contextChainHolder);
288         connectionManager.setDeviceDisconnectedHandler(contextChainHolder);
289
290         deviceManager.setContextChainHolder(contextChainHolder);
291         deviceManager.initialize();
292     }
293
294     @Override
295     public ExtensionConverterRegistrator getExtensionConverterRegistrator() {
296         return extensionConverterManager;
297     }
298
299     @Override
300     @PreDestroy
301     public void close() {
302         try {
303             shutdownSwitchConnections().get(10, TimeUnit.SECONDS);
304         } catch (InterruptedException | ExecutionException | TimeoutException e) {
305             LOG.warn("Failed to shut down switch connections in time {}s", 10, e);
306         }
307
308         gracefulShutdown(contextChainHolder);
309         gracefulShutdown(deviceManager);
310         gracefulShutdown(rpcManager);
311         gracefulShutdown(statisticsManager);
312         gracefulShutdown(roleManager);
313         gracefulShutdown(executorService);
314         gracefulShutdown(hashedWheelTimer);
315         unregisterMXBean(MESSAGE_INTELLIGENCE_AGENCY_MX_BEAN_NAME);
316         openflowDiagStatusProvider.reportStatus(ServiceState.UNREGISTERED);
317     }
318
319     @SuppressWarnings("checkstyle:IllegalCatch")
320     private static void gracefulShutdown(final AutoCloseable closeable) {
321         if (Objects.isNull(closeable)) {
322             return;
323         }
324
325         try {
326             closeable.close();
327         } catch (Exception e) {
328             LOG.warn("Failed to shutdown {} gracefully.", closeable);
329         }
330     }
331
332     private static void gracefulShutdown(final Timer timer) {
333         if (Objects.isNull(timer)) {
334             return;
335         }
336
337         try {
338             timer.stop();
339         } catch (IllegalStateException e) {
340             LOG.warn("Failed to shutdown {} gracefully.", timer);
341         }
342     }
343
344     private static void gracefulShutdown(final ExecutorService executorService) {
345         if (Objects.isNull(executorService)) {
346             return;
347         }
348
349         try {
350             executorService.shutdownNow();
351         } catch (SecurityException e) {
352             LOG.warn("Failed to shutdown {} gracefully.", executorService);
353         }
354     }
355
356     private static void registerMXBean(final Object bean, final String beanName) {
357         final MBeanServer mbs = ManagementFactory.getPlatformMBeanServer();
358
359         try {
360             mbs.registerMBean(bean, new ObjectName(beanName));
361         } catch (MalformedObjectNameException
362                 | NotCompliantMBeanException
363                 | MBeanRegistrationException
364                 | InstanceAlreadyExistsException e) {
365             LOG.warn("Error registering MBean {}", e);
366         }
367     }
368
369     private static void unregisterMXBean(final String beanName) {
370         final MBeanServer mbs = ManagementFactory.getPlatformMBeanServer();
371
372         try {
373             mbs.unregisterMBean(new ObjectName(beanName));
374         } catch (InstanceNotFoundException
375                 | MBeanRegistrationException
376                 | MalformedObjectNameException e) {
377             LOG.warn("Error unregistering MBean {}", e);
378         }
379     }
380 }