114ee81b60fbfdfcac65ceb3f7f9147b6caeeae8
[openflowplugin.git] / openflowplugin-impl / src / main / java / org / opendaylight / openflowplugin / impl / OpenFlowPluginProviderImpl.java
1 /*
2  * Copyright (c) 2015 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.openflowplugin.impl;
10
11 import com.google.common.util.concurrent.FutureCallback;
12 import com.google.common.util.concurrent.Futures;
13 import com.google.common.util.concurrent.ListenableFuture;
14 import com.google.common.util.concurrent.ListeningExecutorService;
15 import com.google.common.util.concurrent.MoreExecutors;
16 import io.netty.util.HashedWheelTimer;
17 import io.netty.util.Timer;
18 import java.lang.management.ManagementFactory;
19 import java.util.Collection;
20 import java.util.List;
21 import java.util.Objects;
22 import java.util.concurrent.ExecutionException;
23 import java.util.concurrent.ExecutorService;
24 import java.util.concurrent.SynchronousQueue;
25 import java.util.concurrent.TimeUnit;
26 import java.util.concurrent.TimeoutException;
27 import java.util.stream.Collectors;
28 import javax.annotation.Nonnull;
29 import javax.management.InstanceAlreadyExistsException;
30 import javax.management.InstanceNotFoundException;
31 import javax.management.MBeanRegistrationException;
32 import javax.management.MBeanServer;
33 import javax.management.MalformedObjectNameException;
34 import javax.management.NotCompliantMBeanException;
35 import javax.management.ObjectName;
36 import org.opendaylight.controller.md.sal.binding.api.DataBroker;
37 import org.opendaylight.controller.md.sal.binding.api.NotificationPublishService;
38 import org.opendaylight.controller.sal.binding.api.RpcProviderRegistry;
39 import org.opendaylight.infrautils.diagstatus.ServiceState;
40 import org.opendaylight.infrautils.ready.SystemReadyListener;
41 import org.opendaylight.infrautils.ready.SystemReadyMonitor;
42 import org.opendaylight.mdsal.eos.binding.api.EntityOwnershipService;
43 import org.opendaylight.mdsal.singleton.common.api.ClusterSingletonServiceProvider;
44 import org.opendaylight.openflowjava.protocol.spi.connection.SwitchConnectionProvider;
45 import org.opendaylight.openflowplugin.api.diagstatus.OpenflowPluginDiagStatusProvider;
46 import org.opendaylight.openflowplugin.api.openflow.OpenFlowPluginProvider;
47 import org.opendaylight.openflowplugin.api.openflow.configuration.ConfigurationService;
48 import org.opendaylight.openflowplugin.api.openflow.connection.ConnectionManager;
49 import org.opendaylight.openflowplugin.api.openflow.device.DeviceManager;
50 import org.opendaylight.openflowplugin.api.openflow.mastership.MastershipChangeServiceManager;
51 import org.opendaylight.openflowplugin.api.openflow.role.RoleManager;
52 import org.opendaylight.openflowplugin.api.openflow.rpc.RpcManager;
53 import org.opendaylight.openflowplugin.api.openflow.statistics.StatisticsManager;
54 import org.opendaylight.openflowplugin.api.openflow.statistics.ofpspecific.MessageIntelligenceAgency;
55 import org.opendaylight.openflowplugin.extension.api.ExtensionConverterProviderKeeper;
56 import org.opendaylight.openflowplugin.extension.api.ExtensionConverterRegistrator;
57 import org.opendaylight.openflowplugin.extension.api.OpenFlowPluginExtensionRegistratorProvider;
58 import org.opendaylight.openflowplugin.extension.api.core.extension.ExtensionConverterManager;
59 import org.opendaylight.openflowplugin.impl.configuration.OpenFlowProviderConfigImpl;
60 import org.opendaylight.openflowplugin.impl.connection.ConnectionManagerImpl;
61 import org.opendaylight.openflowplugin.impl.device.DeviceManagerImpl;
62 import org.opendaylight.openflowplugin.impl.device.initialization.DeviceInitializerProvider;
63 import org.opendaylight.openflowplugin.impl.device.initialization.DeviceInitializerProviderFactory;
64 import org.opendaylight.openflowplugin.impl.lifecycle.ContextChainHolderImpl;
65 import org.opendaylight.openflowplugin.impl.protocol.deserialization.DeserializerInjector;
66 import org.opendaylight.openflowplugin.impl.protocol.serialization.SerializerInjector;
67 import org.opendaylight.openflowplugin.impl.role.RoleManagerImpl;
68 import org.opendaylight.openflowplugin.impl.rpc.RpcManagerImpl;
69 import org.opendaylight.openflowplugin.impl.statistics.StatisticsManagerImpl;
70 import org.opendaylight.openflowplugin.impl.statistics.ofpspecific.MessageIntelligenceAgencyImpl;
71 import org.opendaylight.openflowplugin.impl.statistics.ofpspecific.MessageIntelligenceAgencyMXBean;
72 import org.opendaylight.openflowplugin.impl.util.ThreadPoolLoggingExecutor;
73 import org.opendaylight.openflowplugin.impl.util.TranslatorLibraryUtil;
74 import org.opendaylight.openflowplugin.openflow.md.core.extension.ExtensionConverterManagerImpl;
75 import org.opendaylight.openflowplugin.openflow.md.core.sal.convertor.ConvertorManager;
76 import org.opendaylight.openflowplugin.openflow.md.core.sal.convertor.ConvertorManagerFactory;
77 import org.opendaylight.openflowplugin.openflow.md.core.session.OFSessionUtil;
78 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.openflow.provider.config.rev160510.OpenflowProviderConfig;
79 import org.slf4j.Logger;
80 import org.slf4j.LoggerFactory;
81
82 public class OpenFlowPluginProviderImpl implements
83         OpenFlowPluginProvider,
84         OpenFlowPluginExtensionRegistratorProvider,
85         SystemReadyListener {
86
87     private static final Logger LOG = LoggerFactory.getLogger(OpenFlowPluginProviderImpl.class);
88
89     private static final int TICKS_PER_WHEEL = 500; // 0.5 sec.
90     private static final long TICK_DURATION = 10;
91     private static final String POOL_NAME = "ofppool";
92
93     private static final MessageIntelligenceAgency MESSAGE_INTELLIGENCE_AGENCY = new MessageIntelligenceAgencyImpl();
94     private static final String MESSAGE_INTELLIGENCE_AGENCY_MX_BEAN_NAME = String
95             .format("%s:type=%s",
96                     MessageIntelligenceAgencyMXBean.class.getPackage().getName(),
97                     MessageIntelligenceAgencyMXBean.class.getSimpleName());
98
99     private final HashedWheelTimer hashedWheelTimer =
100             new HashedWheelTimer(TICK_DURATION, TimeUnit.MILLISECONDS, TICKS_PER_WHEEL);
101     private final NotificationPublishService notificationPublishService;
102     private final ExtensionConverterManager extensionConverterManager;
103     private final DataBroker dataBroker;
104     private final Collection<SwitchConnectionProvider> switchConnectionProviders;
105     private final DeviceInitializerProvider deviceInitializerProvider;
106     private final ConvertorManager convertorManager;
107     private final RpcProviderRegistry rpcProviderRegistry;
108     private final ClusterSingletonServiceProvider singletonServicesProvider;
109     private final OpenflowProviderConfig config;
110     private final EntityOwnershipService entityOwnershipService;
111     private final MastershipChangeServiceManager mastershipChangeServiceManager;
112     private DeviceManager deviceManager;
113     private RpcManager rpcManager;
114     private StatisticsManager statisticsManager;
115     private RoleManager roleManager;
116     private ConnectionManager connectionManager;
117     private ListeningExecutorService executorService;
118     private ContextChainHolderImpl contextChainHolder;
119     private OpenflowPluginDiagStatusProvider openflowPluginStatusMonitor;
120
121     public static MessageIntelligenceAgency getMessageIntelligenceAgency() {
122         return MESSAGE_INTELLIGENCE_AGENCY;
123     }
124
125     OpenFlowPluginProviderImpl(final ConfigurationService configurationService,
126                                final List<SwitchConnectionProvider> switchConnectionProviders,
127                                final DataBroker dataBroker,
128                                final RpcProviderRegistry rpcProviderRegistry,
129                                final NotificationPublishService notificationPublishService,
130                                final ClusterSingletonServiceProvider singletonServiceProvider,
131                                final EntityOwnershipService entityOwnershipService,
132                                final MastershipChangeServiceManager mastershipChangeServiceManager,
133                                final OpenflowPluginDiagStatusProvider openflowPluginStatusMonitor,
134                                final SystemReadyMonitor systemReadyMonitor) {
135         this.switchConnectionProviders = switchConnectionProviders;
136         this.dataBroker = dataBroker;
137         this.rpcProviderRegistry = rpcProviderRegistry;
138         this.notificationPublishService = notificationPublishService;
139         this.singletonServicesProvider = singletonServiceProvider;
140         this.entityOwnershipService = entityOwnershipService;
141         convertorManager = ConvertorManagerFactory.createDefaultManager();
142         extensionConverterManager = new ExtensionConverterManagerImpl();
143         deviceInitializerProvider = DeviceInitializerProviderFactory.createDefaultProvider();
144         config = new OpenFlowProviderConfigImpl(configurationService);
145         this.mastershipChangeServiceManager = mastershipChangeServiceManager;
146         this.openflowPluginStatusMonitor = openflowPluginStatusMonitor;
147         systemReadyMonitor.registerListener(this);
148         LOG.debug("registered onSystemBootReady() listener for deferred startSwitchConnections()");
149     }
150
151     @Override
152     public void onSystemBootReady() {
153         LOG.debug("onSystemBootReady() received, starting the switch connections");
154         startSwitchConnections();
155     }
156
157     private void startSwitchConnections() {
158         Futures.addCallback(Futures.allAsList(switchConnectionProviders.stream().map(switchConnectionProvider -> {
159             // Inject OpenFlowPlugin custom serializers and deserializers into OpenFlowJava
160             if (config.isUseSingleLayerSerialization()) {
161                 SerializerInjector.injectSerializers(switchConnectionProvider);
162                 DeserializerInjector.injectDeserializers(switchConnectionProvider);
163             } else {
164                 DeserializerInjector.revertDeserializers(switchConnectionProvider);
165             }
166
167             // Set handler of incoming connections and start switch connection provider
168             switchConnectionProvider.setSwitchConnectionHandler(connectionManager);
169             return switchConnectionProvider.startup();
170         }).collect(Collectors.toSet())), new FutureCallback<List<Boolean>>() {
171             @Override
172             public void onSuccess(final List<Boolean> result) {
173                 LOG.info("All switchConnectionProviders are up and running ({}).", result.size());
174                 openflowPluginStatusMonitor.reportStatus(ServiceState.OPERATIONAL, "switch connections started");
175             }
176
177             @Override
178             public void onFailure(@Nonnull final Throwable throwable) {
179                 LOG.warn("Some switchConnectionProviders failed to start.", throwable);
180                 openflowPluginStatusMonitor.reportStatus(ServiceState.ERROR, "some switch connections failed to start");
181             }
182         }, MoreExecutors.directExecutor());
183     }
184
185     private ListenableFuture<List<Boolean>> shutdownSwitchConnections() {
186         final ListenableFuture<List<Boolean>> listListenableFuture =
187                 Futures.allAsList(switchConnectionProviders.stream().map(switchConnectionProvider -> {
188                     // Revert deserializers to their original state
189                     if (config.isUseSingleLayerSerialization()) {
190                         DeserializerInjector.revertDeserializers(switchConnectionProvider);
191                     }
192
193                     // Shutdown switch connection provider
194                     return switchConnectionProvider.shutdown();
195                 }).collect(Collectors.toSet()));
196
197         Futures.addCallback(listListenableFuture, new FutureCallback<List<Boolean>>() {
198             @Override
199             public void onSuccess(final List<Boolean> result) {
200                 LOG.info("All switchConnectionProviders were successfully shut down ({}).", result.size());
201             }
202
203             @Override
204             public void onFailure(@Nonnull final Throwable throwable) {
205                 LOG.warn("Some switchConnectionProviders failed to shutdown.", throwable);
206             }
207         }, MoreExecutors.directExecutor());
208
209         return listListenableFuture;
210     }
211
212     @Override
213     public void initialize() {
214         registerMXBean(MESSAGE_INTELLIGENCE_AGENCY, MESSAGE_INTELLIGENCE_AGENCY_MX_BEAN_NAME);
215
216         // TODO: copied from OpenFlowPluginProvider (Helium) misusesing the old way of distributing extension converters
217         // TODO: rewrite later!
218         OFSessionUtil.getSessionManager().setExtensionConverterProvider(extensionConverterManager);
219
220         // Creates a thread pool that creates new threads as needed, but will reuse previously
221         // constructed threads when they are available.
222         // Threads that have not been used for x seconds are terminated and removed from the cache.
223         executorService = MoreExecutors.listeningDecorator(new ThreadPoolLoggingExecutor(
224                 config.getThreadPoolMinThreads(),
225                 config.getThreadPoolMaxThreads().getValue(),
226                 config.getThreadPoolTimeout(),
227                 TimeUnit.SECONDS, new SynchronousQueue<>(), POOL_NAME));
228
229         deviceManager = new DeviceManagerImpl(
230                 config,
231                 dataBroker,
232                 getMessageIntelligenceAgency(),
233                 notificationPublishService,
234                 hashedWheelTimer,
235                 convertorManager,
236                 deviceInitializerProvider);
237
238         TranslatorLibraryUtil.injectBasicTranslatorLibrary(deviceManager, convertorManager);
239         ((ExtensionConverterProviderKeeper) deviceManager).setExtensionConverterProvider(extensionConverterManager);
240
241         rpcManager = new RpcManagerImpl(
242                 config,
243                 rpcProviderRegistry,
244                 extensionConverterManager,
245                 convertorManager,
246                 notificationPublishService);
247
248         statisticsManager = new StatisticsManagerImpl(
249                 config,
250                 rpcProviderRegistry,
251                 convertorManager,
252                 executorService);
253
254         roleManager = new RoleManagerImpl(hashedWheelTimer);
255
256         contextChainHolder = new ContextChainHolderImpl(
257                 executorService,
258                 singletonServicesProvider,
259                 entityOwnershipService,
260                 mastershipChangeServiceManager);
261
262         contextChainHolder.addManager(deviceManager);
263         contextChainHolder.addManager(statisticsManager);
264         contextChainHolder.addManager(rpcManager);
265         contextChainHolder.addManager(roleManager);
266
267         connectionManager = new ConnectionManagerImpl(config, executorService);
268         connectionManager.setDeviceConnectedHandler(contextChainHolder);
269         connectionManager.setDeviceDisconnectedHandler(contextChainHolder);
270
271         deviceManager.initialize();
272     }
273
274     @Override
275     public ExtensionConverterRegistrator getExtensionConverterRegistrator() {
276         return extensionConverterManager;
277     }
278
279     @Override
280     public void close() {
281         try {
282             shutdownSwitchConnections().get(10, TimeUnit.SECONDS);
283         } catch (InterruptedException | ExecutionException | TimeoutException e) {
284             LOG.warn("Failed to shut down switch connections in time {}s, error: {}", 10, e);
285         }
286
287         gracefulShutdown(contextChainHolder);
288         gracefulShutdown(deviceManager);
289         gracefulShutdown(rpcManager);
290         gracefulShutdown(statisticsManager);
291         gracefulShutdown(roleManager);
292         gracefulShutdown(executorService);
293         gracefulShutdown(hashedWheelTimer);
294         unregisterMXBean(MESSAGE_INTELLIGENCE_AGENCY_MX_BEAN_NAME);
295         openflowPluginStatusMonitor.reportStatus(ServiceState.UNREGISTERED, "service shutting down");
296     }
297
298     @SuppressWarnings("checkstyle:IllegalCatch")
299     private static void gracefulShutdown(final AutoCloseable closeable) {
300         if (Objects.isNull(closeable)) {
301             return;
302         }
303
304         try {
305             closeable.close();
306         } catch (Exception e) {
307             LOG.warn("Failed to shutdown {} gracefully.", closeable);
308         }
309     }
310
311     private static void gracefulShutdown(final Timer timer) {
312         if (Objects.isNull(timer)) {
313             return;
314         }
315
316         try {
317             timer.stop();
318         } catch (IllegalStateException e) {
319             LOG.warn("Failed to shutdown {} gracefully.", timer);
320         }
321     }
322
323     private static void gracefulShutdown(final ExecutorService executorService) {
324         if (Objects.isNull(executorService)) {
325             return;
326         }
327
328         try {
329             executorService.shutdownNow();
330         } catch (SecurityException e) {
331             LOG.warn("Failed to shutdown {} gracefully.", executorService);
332         }
333     }
334
335     private static void registerMXBean(final Object bean, final String beanName) {
336         final MBeanServer mbs = ManagementFactory.getPlatformMBeanServer();
337
338         try {
339             mbs.registerMBean(bean, new ObjectName(beanName));
340         } catch (MalformedObjectNameException
341                 | NotCompliantMBeanException
342                 | MBeanRegistrationException
343                 | InstanceAlreadyExistsException e) {
344             LOG.warn("Error registering MBean {}", e);
345         }
346     }
347
348     private static void unregisterMXBean(final String beanName) {
349         final MBeanServer mbs = ManagementFactory.getPlatformMBeanServer();
350
351         try {
352             mbs.unregisterMBean(new ObjectName(beanName));
353         } catch (InstanceNotFoundException
354                 | MBeanRegistrationException
355                 | MalformedObjectNameException e) {
356             LOG.warn("Error unregistering MBean {}", e);
357         }
358     }
359 }