Merge "OPNFLWPLUG-983 Group and flow removal stats are not reported in order"
[openflowplugin.git] / openflowplugin-impl / src / main / java / org / opendaylight / openflowplugin / impl / OpenFlowPluginProviderImpl.java
1 /*
2  * Copyright (c) 2015 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.openflowplugin.impl;
10
11 import com.google.common.util.concurrent.FutureCallback;
12 import com.google.common.util.concurrent.Futures;
13 import com.google.common.util.concurrent.ListenableFuture;
14 import com.google.common.util.concurrent.ListeningExecutorService;
15 import com.google.common.util.concurrent.MoreExecutors;
16 import io.netty.util.HashedWheelTimer;
17 import io.netty.util.Timer;
18 import java.lang.management.ManagementFactory;
19 import java.util.Collection;
20 import java.util.List;
21 import java.util.Objects;
22 import java.util.concurrent.ExecutionException;
23 import java.util.concurrent.ExecutorService;
24 import java.util.concurrent.SynchronousQueue;
25 import java.util.concurrent.TimeUnit;
26 import java.util.concurrent.TimeoutException;
27 import java.util.stream.Collectors;
28 import javax.annotation.Nonnull;
29 import javax.management.InstanceAlreadyExistsException;
30 import javax.management.InstanceNotFoundException;
31 import javax.management.MBeanRegistrationException;
32 import javax.management.MBeanServer;
33 import javax.management.MalformedObjectNameException;
34 import javax.management.NotCompliantMBeanException;
35 import javax.management.ObjectName;
36 import org.opendaylight.controller.md.sal.binding.api.DataBroker;
37 import org.opendaylight.controller.md.sal.binding.api.NotificationPublishService;
38 import org.opendaylight.controller.sal.binding.api.RpcProviderRegistry;
39 import org.opendaylight.infrautils.diagstatus.ServiceState;
40 import org.opendaylight.infrautils.ready.SystemReadyListener;
41 import org.opendaylight.infrautils.ready.SystemReadyMonitor;
42 import org.opendaylight.mdsal.eos.binding.api.EntityOwnershipService;
43 import org.opendaylight.mdsal.singleton.common.api.ClusterSingletonServiceProvider;
44 import org.opendaylight.openflowjava.protocol.spi.connection.SwitchConnectionProvider;
45 import org.opendaylight.openflowplugin.api.diagstatus.OpenflowPluginDiagStatusProvider;
46 import org.opendaylight.openflowplugin.api.openflow.OpenFlowPluginProvider;
47 import org.opendaylight.openflowplugin.api.openflow.configuration.ConfigurationService;
48 import org.opendaylight.openflowplugin.api.openflow.connection.ConnectionManager;
49 import org.opendaylight.openflowplugin.api.openflow.device.DeviceManager;
50 import org.opendaylight.openflowplugin.api.openflow.mastership.MastershipChangeServiceManager;
51 import org.opendaylight.openflowplugin.api.openflow.role.RoleManager;
52 import org.opendaylight.openflowplugin.api.openflow.rpc.RpcManager;
53 import org.opendaylight.openflowplugin.api.openflow.statistics.StatisticsManager;
54 import org.opendaylight.openflowplugin.api.openflow.statistics.ofpspecific.MessageIntelligenceAgency;
55 import org.opendaylight.openflowplugin.extension.api.ExtensionConverterProviderKeeper;
56 import org.opendaylight.openflowplugin.extension.api.ExtensionConverterRegistrator;
57 import org.opendaylight.openflowplugin.extension.api.OpenFlowPluginExtensionRegistratorProvider;
58 import org.opendaylight.openflowplugin.extension.api.core.extension.ExtensionConverterManager;
59 import org.opendaylight.openflowplugin.impl.configuration.OpenFlowProviderConfigImpl;
60 import org.opendaylight.openflowplugin.impl.connection.ConnectionManagerImpl;
61 import org.opendaylight.openflowplugin.impl.device.DeviceManagerImpl;
62 import org.opendaylight.openflowplugin.impl.device.initialization.DeviceInitializerProvider;
63 import org.opendaylight.openflowplugin.impl.device.initialization.DeviceInitializerProviderFactory;
64 import org.opendaylight.openflowplugin.impl.lifecycle.ContextChainHolderImpl;
65 import org.opendaylight.openflowplugin.impl.protocol.deserialization.DeserializerInjector;
66 import org.opendaylight.openflowplugin.impl.protocol.serialization.SerializerInjector;
67 import org.opendaylight.openflowplugin.impl.role.RoleManagerImpl;
68 import org.opendaylight.openflowplugin.impl.rpc.RpcManagerImpl;
69 import org.opendaylight.openflowplugin.impl.statistics.StatisticsManagerImpl;
70 import org.opendaylight.openflowplugin.impl.statistics.ofpspecific.MessageIntelligenceAgencyImpl;
71 import org.opendaylight.openflowplugin.impl.statistics.ofpspecific.MessageIntelligenceAgencyMXBean;
72 import org.opendaylight.openflowplugin.impl.util.ThreadPoolLoggingExecutor;
73 import org.opendaylight.openflowplugin.impl.util.TranslatorLibraryUtil;
74 import org.opendaylight.openflowplugin.openflow.md.core.extension.ExtensionConverterManagerImpl;
75 import org.opendaylight.openflowplugin.openflow.md.core.sal.convertor.ConvertorManager;
76 import org.opendaylight.openflowplugin.openflow.md.core.sal.convertor.ConvertorManagerFactory;
77 import org.opendaylight.openflowplugin.openflow.md.core.session.OFSessionUtil;
78 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.openflow.provider.config.rev160510.OpenflowProviderConfig;
79 import org.slf4j.Logger;
80 import org.slf4j.LoggerFactory;
81
82 public class OpenFlowPluginProviderImpl implements
83         OpenFlowPluginProvider,
84         OpenFlowPluginExtensionRegistratorProvider,
85         SystemReadyListener {
86
87     private static final Logger LOG = LoggerFactory.getLogger(OpenFlowPluginProviderImpl.class);
88
89     private static final int TICKS_PER_WHEEL = 500; // 0.5 sec.
90     private static final long TICK_DURATION = 10;
91     private static final String POOL_NAME = "ofppool";
92
93     private static final MessageIntelligenceAgency MESSAGE_INTELLIGENCE_AGENCY = new MessageIntelligenceAgencyImpl();
94     private static final String MESSAGE_INTELLIGENCE_AGENCY_MX_BEAN_NAME = String
95             .format("%s:type=%s",
96                     MessageIntelligenceAgencyMXBean.class.getPackage().getName(),
97                     MessageIntelligenceAgencyMXBean.class.getSimpleName());
98
99     private final HashedWheelTimer hashedWheelTimer =
100             new HashedWheelTimer(TICK_DURATION, TimeUnit.MILLISECONDS, TICKS_PER_WHEEL);
101     private final NotificationPublishService notificationPublishService;
102     private final ExtensionConverterManager extensionConverterManager;
103     private final DataBroker dataBroker;
104     private final Collection<SwitchConnectionProvider> switchConnectionProviders;
105     private final DeviceInitializerProvider deviceInitializerProvider;
106     private final ConvertorManager convertorManager;
107     private final RpcProviderRegistry rpcProviderRegistry;
108     private final ClusterSingletonServiceProvider singletonServicesProvider;
109     private final OpenflowProviderConfig config;
110     private final EntityOwnershipService entityOwnershipService;
111     private final MastershipChangeServiceManager mastershipChangeServiceManager;
112     private DeviceManager deviceManager;
113     private RpcManager rpcManager;
114     private StatisticsManager statisticsManager;
115     private RoleManager roleManager;
116     private ConnectionManager connectionManager;
117     private ListeningExecutorService executorService;
118     private ContextChainHolderImpl contextChainHolder;
119     private final OpenflowPluginDiagStatusProvider openflowPluginStatusMonitor;
120
121     public static MessageIntelligenceAgency getMessageIntelligenceAgency() {
122         return MESSAGE_INTELLIGENCE_AGENCY;
123     }
124
125     OpenFlowPluginProviderImpl(final ConfigurationService configurationService,
126                                final List<SwitchConnectionProvider> switchConnectionProviders,
127                                final DataBroker dataBroker,
128                                final RpcProviderRegistry rpcProviderRegistry,
129                                final NotificationPublishService notificationPublishService,
130                                final ClusterSingletonServiceProvider singletonServiceProvider,
131                                final EntityOwnershipService entityOwnershipService,
132                                final MastershipChangeServiceManager mastershipChangeServiceManager,
133                                final OpenflowPluginDiagStatusProvider openflowPluginStatusMonitor,
134                                final SystemReadyMonitor systemReadyMonitor) {
135         this.switchConnectionProviders = switchConnectionProviders;
136         this.dataBroker = dataBroker;
137         this.rpcProviderRegistry = rpcProviderRegistry;
138         this.notificationPublishService = notificationPublishService;
139         this.singletonServicesProvider = singletonServiceProvider;
140         this.entityOwnershipService = entityOwnershipService;
141         convertorManager = ConvertorManagerFactory.createDefaultManager();
142         extensionConverterManager = new ExtensionConverterManagerImpl();
143         deviceInitializerProvider = DeviceInitializerProviderFactory.createDefaultProvider();
144         config = new OpenFlowProviderConfigImpl(configurationService);
145         this.mastershipChangeServiceManager = mastershipChangeServiceManager;
146         this.openflowPluginStatusMonitor = openflowPluginStatusMonitor;
147         systemReadyMonitor.registerListener(this);
148         LOG.debug("registered onSystemBootReady() listener for deferred startSwitchConnections()");
149     }
150
151     @Override
152     public void onSystemBootReady() {
153         LOG.debug("onSystemBootReady() received, starting the switch connections");
154         startSwitchConnections();
155     }
156
157     private void startSwitchConnections() {
158         Futures.addCallback(Futures.allAsList(switchConnectionProviders.stream().map(switchConnectionProvider -> {
159             // Inject OpenFlowPlugin custom serializers and deserializers into OpenFlowJava
160             if (config.isUseSingleLayerSerialization()) {
161                 SerializerInjector.injectSerializers(switchConnectionProvider,
162                         switchConnectionProvider.getConfiguration().isGroupAddModEnabled());
163                 DeserializerInjector.injectDeserializers(switchConnectionProvider);
164             } else {
165                 DeserializerInjector.revertDeserializers(switchConnectionProvider);
166             }
167
168             // Set handler of incoming connections and start switch connection provider
169             switchConnectionProvider.setSwitchConnectionHandler(connectionManager);
170             return switchConnectionProvider.startup();
171         }).collect(Collectors.toSet())), new FutureCallback<List<Boolean>>() {
172             @Override
173             public void onSuccess(@Nonnull final List<Boolean> result) {
174                 LOG.info("All switchConnectionProviders are up and running ({}).", result.size());
175                 openflowPluginStatusMonitor.reportStatus(ServiceState.OPERATIONAL, "switch connections started");
176             }
177
178             @Override
179             public void onFailure(@Nonnull final Throwable throwable) {
180                 LOG.warn("Some switchConnectionProviders failed to start.", throwable);
181                 openflowPluginStatusMonitor.reportStatus(ServiceState.ERROR, "some switch connections failed to start");
182             }
183         }, MoreExecutors.directExecutor());
184     }
185
186     private ListenableFuture<List<Boolean>> shutdownSwitchConnections() {
187         final ListenableFuture<List<Boolean>> listListenableFuture =
188                 Futures.allAsList(switchConnectionProviders.stream().map(switchConnectionProvider -> {
189                     // Revert deserializers to their original state
190                     if (config.isUseSingleLayerSerialization()) {
191                         DeserializerInjector.revertDeserializers(switchConnectionProvider);
192                     }
193
194                     // Shutdown switch connection provider
195                     return switchConnectionProvider.shutdown();
196                 }).collect(Collectors.toSet()));
197
198         Futures.addCallback(listListenableFuture, new FutureCallback<List<Boolean>>() {
199             @Override
200             public void onSuccess(@Nonnull final List<Boolean> result) {
201                 LOG.info("All switchConnectionProviders were successfully shut down ({}).", result.size());
202             }
203
204             @Override
205             public void onFailure(@Nonnull final Throwable throwable) {
206                 LOG.warn("Some switchConnectionProviders failed to shutdown.", throwable);
207             }
208         }, MoreExecutors.directExecutor());
209
210         return listListenableFuture;
211     }
212
213     @Override
214     public void initialize() {
215         registerMXBean(MESSAGE_INTELLIGENCE_AGENCY, MESSAGE_INTELLIGENCE_AGENCY_MX_BEAN_NAME);
216
217         // TODO: copied from OpenFlowPluginProvider (Helium) misusesing the old way of distributing extension converters
218         // TODO: rewrite later!
219         OFSessionUtil.getSessionManager().setExtensionConverterProvider(extensionConverterManager);
220
221         // Creates a thread pool that creates new threads as needed, but will reuse previously
222         // constructed threads when they are available.
223         // Threads that have not been used for x seconds are terminated and removed from the cache.
224         executorService = MoreExecutors.listeningDecorator(new ThreadPoolLoggingExecutor(
225                 config.getThreadPoolMinThreads(),
226                 config.getThreadPoolMaxThreads().getValue(),
227                 config.getThreadPoolTimeout(),
228                 TimeUnit.SECONDS, new SynchronousQueue<>(), POOL_NAME));
229
230         deviceManager = new DeviceManagerImpl(
231                 config,
232                 dataBroker,
233                 getMessageIntelligenceAgency(),
234                 notificationPublishService,
235                 hashedWheelTimer,
236                 convertorManager,
237                 deviceInitializerProvider);
238
239         TranslatorLibraryUtil.injectBasicTranslatorLibrary(deviceManager, convertorManager);
240         ((ExtensionConverterProviderKeeper) deviceManager).setExtensionConverterProvider(extensionConverterManager);
241
242         rpcManager = new RpcManagerImpl(
243                 config,
244                 rpcProviderRegistry,
245                 extensionConverterManager,
246                 convertorManager,
247                 notificationPublishService);
248
249         statisticsManager = new StatisticsManagerImpl(
250                 config,
251                 rpcProviderRegistry,
252                 convertorManager,
253                 executorService);
254
255         roleManager = new RoleManagerImpl(hashedWheelTimer);
256
257         contextChainHolder = new ContextChainHolderImpl(
258                 executorService,
259                 singletonServicesProvider,
260                 entityOwnershipService,
261                 mastershipChangeServiceManager);
262
263         contextChainHolder.addManager(deviceManager);
264         contextChainHolder.addManager(statisticsManager);
265         contextChainHolder.addManager(rpcManager);
266         contextChainHolder.addManager(roleManager);
267
268         connectionManager = new ConnectionManagerImpl(config, executorService);
269         connectionManager.setDeviceConnectedHandler(contextChainHolder);
270         connectionManager.setDeviceDisconnectedHandler(contextChainHolder);
271
272         deviceManager.initialize();
273     }
274
275     @Override
276     public ExtensionConverterRegistrator getExtensionConverterRegistrator() {
277         return extensionConverterManager;
278     }
279
280     @Override
281     public void close() {
282         try {
283             shutdownSwitchConnections().get(10, TimeUnit.SECONDS);
284         } catch (InterruptedException | ExecutionException | TimeoutException e) {
285             LOG.warn("Failed to shut down switch connections in time {}s, error: {}", 10, e);
286         }
287
288         gracefulShutdown(contextChainHolder);
289         gracefulShutdown(deviceManager);
290         gracefulShutdown(rpcManager);
291         gracefulShutdown(statisticsManager);
292         gracefulShutdown(roleManager);
293         gracefulShutdown(executorService);
294         gracefulShutdown(hashedWheelTimer);
295         unregisterMXBean(MESSAGE_INTELLIGENCE_AGENCY_MX_BEAN_NAME);
296         openflowPluginStatusMonitor.reportStatus(ServiceState.UNREGISTERED, "service shutting down");
297     }
298
299     @SuppressWarnings("checkstyle:IllegalCatch")
300     private static void gracefulShutdown(final AutoCloseable closeable) {
301         if (Objects.isNull(closeable)) {
302             return;
303         }
304
305         try {
306             closeable.close();
307         } catch (Exception e) {
308             LOG.warn("Failed to shutdown {} gracefully.", closeable);
309         }
310     }
311
312     private static void gracefulShutdown(final Timer timer) {
313         if (Objects.isNull(timer)) {
314             return;
315         }
316
317         try {
318             timer.stop();
319         } catch (IllegalStateException e) {
320             LOG.warn("Failed to shutdown {} gracefully.", timer);
321         }
322     }
323
324     private static void gracefulShutdown(final ExecutorService executorService) {
325         if (Objects.isNull(executorService)) {
326             return;
327         }
328
329         try {
330             executorService.shutdownNow();
331         } catch (SecurityException e) {
332             LOG.warn("Failed to shutdown {} gracefully.", executorService);
333         }
334     }
335
336     private static void registerMXBean(final Object bean, final String beanName) {
337         final MBeanServer mbs = ManagementFactory.getPlatformMBeanServer();
338
339         try {
340             mbs.registerMBean(bean, new ObjectName(beanName));
341         } catch (MalformedObjectNameException
342                 | NotCompliantMBeanException
343                 | MBeanRegistrationException
344                 | InstanceAlreadyExistsException e) {
345             LOG.warn("Error registering MBean {}", e);
346         }
347     }
348
349     private static void unregisterMXBean(final String beanName) {
350         final MBeanServer mbs = ManagementFactory.getPlatformMBeanServer();
351
352         try {
353             mbs.unregisterMBean(new ObjectName(beanName));
354         } catch (InstanceNotFoundException
355                 | MBeanRegistrationException
356                 | MalformedObjectNameException e) {
357             LOG.warn("Error unregistering MBean {}", e);
358         }
359     }
360 }