f5b4afd14b7c14f3d2aa1609cd16e8d0dff28699
[openflowplugin.git] / openflowplugin-impl / src / main / java / org / opendaylight / openflowplugin / impl / OpenFlowPluginProviderImpl.java
1 /*
2  * Copyright (c) 2015 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.openflowplugin.impl;
9
10 import com.google.common.annotations.VisibleForTesting;
11 import com.google.common.util.concurrent.FutureCallback;
12 import com.google.common.util.concurrent.Futures;
13 import com.google.common.util.concurrent.ListenableFuture;
14 import com.google.common.util.concurrent.MoreExecutors;
15 import com.google.common.util.concurrent.SettableFuture;
16 import io.netty.util.HashedWheelTimer;
17 import io.netty.util.Timer;
18 import java.lang.management.ManagementFactory;
19 import java.util.Collection;
20 import java.util.List;
21 import java.util.Map;
22 import java.util.concurrent.ExecutionException;
23 import java.util.concurrent.ExecutorService;
24 import java.util.concurrent.Future;
25 import java.util.concurrent.SynchronousQueue;
26 import java.util.concurrent.TimeUnit;
27 import java.util.concurrent.TimeoutException;
28 import java.util.stream.Collectors;
29 import javax.annotation.PostConstruct;
30 import javax.annotation.PreDestroy;
31 import javax.inject.Inject;
32 import javax.inject.Singleton;
33 import javax.management.InstanceAlreadyExistsException;
34 import javax.management.InstanceNotFoundException;
35 import javax.management.MBeanRegistrationException;
36 import javax.management.MBeanServer;
37 import javax.management.MalformedObjectNameException;
38 import javax.management.NotCompliantMBeanException;
39 import javax.management.ObjectName;
40 import org.opendaylight.infrautils.diagstatus.ServiceState;
41 import org.opendaylight.infrautils.ready.SystemReadyListener;
42 import org.opendaylight.infrautils.ready.SystemReadyMonitor;
43 import org.opendaylight.mdsal.binding.api.DataBroker;
44 import org.opendaylight.mdsal.binding.api.NotificationPublishService;
45 import org.opendaylight.mdsal.binding.api.RpcProviderService;
46 import org.opendaylight.mdsal.eos.binding.api.EntityOwnershipService;
47 import org.opendaylight.mdsal.singleton.api.ClusterSingletonServiceProvider;
48 import org.opendaylight.openflowjava.protocol.spi.connection.SwitchConnectionProvider;
49 import org.opendaylight.openflowjava.protocol.spi.connection.SwitchConnectionProviderList;
50 import org.opendaylight.openflowplugin.api.openflow.FlowGroupInfoHistories;
51 import org.opendaylight.openflowplugin.api.openflow.FlowGroupInfoHistory;
52 import org.opendaylight.openflowplugin.api.openflow.OpenFlowPluginProvider;
53 import org.opendaylight.openflowplugin.api.openflow.configuration.ConfigurationService;
54 import org.opendaylight.openflowplugin.api.openflow.connection.ConnectionManager;
55 import org.opendaylight.openflowplugin.api.openflow.device.DeviceManager;
56 import org.opendaylight.openflowplugin.api.openflow.mastership.MastershipChangeServiceManager;
57 import org.opendaylight.openflowplugin.api.openflow.role.RoleManager;
58 import org.opendaylight.openflowplugin.api.openflow.rpc.RpcManager;
59 import org.opendaylight.openflowplugin.api.openflow.statistics.StatisticsManager;
60 import org.opendaylight.openflowplugin.api.openflow.statistics.ofpspecific.MessageIntelligenceAgency;
61 import org.opendaylight.openflowplugin.extension.api.ExtensionConverterProviderKeeper;
62 import org.opendaylight.openflowplugin.extension.api.ExtensionConverterRegistrator;
63 import org.opendaylight.openflowplugin.extension.api.OpenFlowPluginExtensionRegistratorProvider;
64 import org.opendaylight.openflowplugin.extension.api.core.extension.ExtensionConverterManager;
65 import org.opendaylight.openflowplugin.impl.configuration.OpenFlowProviderConfigImpl;
66 import org.opendaylight.openflowplugin.impl.connection.ConnectionManagerImpl;
67 import org.opendaylight.openflowplugin.impl.device.DeviceManagerImpl;
68 import org.opendaylight.openflowplugin.impl.device.initialization.DeviceInitializerProvider;
69 import org.opendaylight.openflowplugin.impl.device.initialization.DeviceInitializerProviderFactory;
70 import org.opendaylight.openflowplugin.impl.lifecycle.ContextChainHolderImpl;
71 import org.opendaylight.openflowplugin.impl.protocol.deserialization.DeserializerInjector;
72 import org.opendaylight.openflowplugin.impl.protocol.serialization.SerializerInjector;
73 import org.opendaylight.openflowplugin.impl.role.RoleManagerImpl;
74 import org.opendaylight.openflowplugin.impl.rpc.RpcManagerImpl;
75 import org.opendaylight.openflowplugin.impl.statistics.StatisticsManagerImpl;
76 import org.opendaylight.openflowplugin.impl.statistics.ofpspecific.MessageIntelligenceAgencyImpl;
77 import org.opendaylight.openflowplugin.impl.statistics.ofpspecific.MessageIntelligenceAgencyMXBean;
78 import org.opendaylight.openflowplugin.impl.util.ThreadPoolLoggingExecutor;
79 import org.opendaylight.openflowplugin.impl.util.TranslatorLibraryUtil;
80 import org.opendaylight.openflowplugin.openflow.md.core.extension.ExtensionConverterManagerImpl;
81 import org.opendaylight.openflowplugin.openflow.md.core.sal.convertor.ConvertorManager;
82 import org.opendaylight.openflowplugin.openflow.md.core.sal.convertor.ConvertorManagerFactory;
83 import org.opendaylight.openflowplugin.openflow.md.core.session.OFSessionUtil;
84 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeId;
85 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.openflow.provider.config.rev160510.OpenflowProviderConfig;
86 import org.slf4j.Logger;
87 import org.slf4j.LoggerFactory;
88
89 @Singleton
90 public class OpenFlowPluginProviderImpl implements
91         OpenFlowPluginProvider,
92         OpenFlowPluginExtensionRegistratorProvider,
93         FlowGroupInfoHistories,
94         SystemReadyListener {
95
96     private static final Logger LOG = LoggerFactory.getLogger(OpenFlowPluginProviderImpl.class);
97
98     private static final int TICKS_PER_WHEEL = 500; // 0.5 sec.
99     private static final long TICK_DURATION = 10;
100     private static final String POOL_NAME = "ofppool";
101
102     private static final MessageIntelligenceAgency MESSAGE_INTELLIGENCE_AGENCY = new MessageIntelligenceAgencyImpl();
103     private static final String MESSAGE_INTELLIGENCE_AGENCY_MX_BEAN_NAME = String
104             .format("%s:type=%s",
105                     MessageIntelligenceAgencyMXBean.class.getPackage().getName(),
106                     MessageIntelligenceAgencyMXBean.class.getSimpleName());
107
108     private final HashedWheelTimer hashedWheelTimer =
109             new HashedWheelTimer(TICK_DURATION, TimeUnit.MILLISECONDS, TICKS_PER_WHEEL);
110     private final NotificationPublishService notificationPublishService;
111     private final ExtensionConverterManager extensionConverterManager;
112     private final DataBroker dataBroker;
113     private final Collection<SwitchConnectionProvider> switchConnectionProviders;
114     private final DeviceInitializerProvider deviceInitializerProvider;
115     private final ConvertorManager convertorManager;
116     private final RpcProviderService rpcProviderRegistry;
117     private final ClusterSingletonServiceProvider singletonServicesProvider;
118     private final OpenflowProviderConfig config;
119     private final EntityOwnershipService entityOwnershipService;
120     private final MastershipChangeServiceManager mastershipChangeServiceManager;
121     private DeviceManager deviceManager;
122     private RpcManager rpcManager;
123     private StatisticsManager statisticsManager;
124     private RoleManager roleManager;
125     private ConnectionManager connectionManager;
126     private ExecutorService executorService;
127     private ContextChainHolderImpl contextChainHolder;
128     private final DiagStatusProvider diagStatusProvider;
129     private final SystemReadyMonitor systemReadyMonitor;
130     private final SettableFuture<Void> fullyStarted = SettableFuture.create();
131     private static final String OPENFLOW_SERVICE_NAME = "OPENFLOW";
132
133     public static MessageIntelligenceAgency getMessageIntelligenceAgency() {
134         return MESSAGE_INTELLIGENCE_AGENCY;
135     }
136
137     @Inject
138     public OpenFlowPluginProviderImpl(final ConfigurationService configurationService,
139                                final SwitchConnectionProviderList switchConnectionProviders,
140                                final PingPongDataBroker pingPongDataBroker,
141                                final RpcProviderService rpcProviderRegistry,
142                                final NotificationPublishService notificationPublishService,
143                                final ClusterSingletonServiceProvider singletonServiceProvider,
144                                final EntityOwnershipService entityOwnershipService,
145                                final MastershipChangeServiceManager mastershipChangeServiceManager,
146                                final DiagStatusProvider diagStatusProvider,
147                                final SystemReadyMonitor systemReadyMonitor) {
148         this.switchConnectionProviders = switchConnectionProviders;
149         dataBroker = pingPongDataBroker;
150         this.rpcProviderRegistry = rpcProviderRegistry;
151         this.notificationPublishService = notificationPublishService;
152         singletonServicesProvider = singletonServiceProvider;
153         this.entityOwnershipService = entityOwnershipService;
154         convertorManager = ConvertorManagerFactory.createDefaultManager();
155         extensionConverterManager = new ExtensionConverterManagerImpl();
156         deviceInitializerProvider = DeviceInitializerProviderFactory.createDefaultProvider();
157         config = new OpenFlowProviderConfigImpl(configurationService);
158         this.mastershipChangeServiceManager = mastershipChangeServiceManager;
159         this.diagStatusProvider = diagStatusProvider;
160         this.systemReadyMonitor = systemReadyMonitor;
161     }
162
163     @Override
164     public void onSystemBootReady() {
165         LOG.info("onSystemBootReady() received, starting the switch connections");
166         Futures.addCallback(Futures.allAsList(switchConnectionProviders.stream().map(switchConnectionProvider -> {
167             // Inject OpenFlowPlugin custom serializers and deserializers into OpenFlowJava
168             if (config.getUseSingleLayerSerialization()) {
169                 SerializerInjector.injectSerializers(switchConnectionProvider,
170                         switchConnectionProvider.getConfiguration().isGroupAddModEnabled());
171                 DeserializerInjector.injectDeserializers(switchConnectionProvider);
172             } else {
173                 DeserializerInjector.revertDeserializers(switchConnectionProvider);
174             }
175
176             // Set handler of incoming connections and start switch connection provider
177             switchConnectionProvider.setSwitchConnectionHandler(connectionManager);
178             return switchConnectionProvider.startup();
179         }).collect(Collectors.toSet())), new FutureCallback<List<Boolean>>() {
180             @Override
181             public void onSuccess(final List<Boolean> result) {
182                 LOG.info("All switchConnectionProviders are up and running ({}).", result.size());
183                 diagStatusProvider.reportStatus(ServiceState.OPERATIONAL);
184                 fullyStarted.set(null);
185             }
186
187             @Override
188             public void onFailure(final Throwable throwable) {
189                 LOG.warn("Some switchConnectionProviders failed to start.", throwable);
190                 diagStatusProvider.reportStatus(ServiceState.ERROR, throwable);
191                 fullyStarted.setException(throwable);
192             }
193         }, MoreExecutors.directExecutor());
194     }
195
196     @VisibleForTesting
197     public Future<Void> getFullyStarted() {
198         return fullyStarted;
199     }
200
201     private ListenableFuture<List<Boolean>> shutdownSwitchConnections() {
202         final ListenableFuture<List<Boolean>> listListenableFuture =
203                 Futures.allAsList(switchConnectionProviders.stream().map(switchConnectionProvider -> {
204                     // Revert deserializers to their original state
205                     if (config.getUseSingleLayerSerialization()) {
206                         DeserializerInjector.revertDeserializers(switchConnectionProvider);
207                     }
208
209                     // Shutdown switch connection provider
210                     return switchConnectionProvider.shutdown();
211                 }).collect(Collectors.toSet()));
212
213         Futures.addCallback(listListenableFuture, new FutureCallback<List<Boolean>>() {
214             @Override
215             public void onSuccess(final List<Boolean> result) {
216                 LOG.info("All switchConnectionProviders were successfully shut down ({}).", result.size());
217             }
218
219             @Override
220             public void onFailure(final Throwable throwable) {
221                 LOG.warn("Some switchConnectionProviders failed to shutdown.", throwable);
222             }
223         }, MoreExecutors.directExecutor());
224
225         return listListenableFuture;
226     }
227
228     @Override
229     @PostConstruct
230     public void initialize() {
231         registerMXBean(MESSAGE_INTELLIGENCE_AGENCY, MESSAGE_INTELLIGENCE_AGENCY_MX_BEAN_NAME);
232
233         // TODO: copied from OpenFlowPluginProvider (Helium) misusesing the old way of distributing extension converters
234         // TODO: rewrite later!
235         OFSessionUtil.getSessionManager().setExtensionConverterProvider(extensionConverterManager);
236
237         // Creates a thread pool that creates new threads as needed, but will reuse previously
238         // constructed threads when they are available.
239         // Threads that have not been used for x seconds are terminated and removed from the cache.
240         executorService = new ThreadPoolLoggingExecutor(
241                 config.getThreadPoolMinThreads().toJava(),
242                 config.getThreadPoolMaxThreads().getValue().toJava(),
243                 config.getThreadPoolTimeout().toJava(),
244                 TimeUnit.SECONDS, new SynchronousQueue<>(), POOL_NAME);
245
246         deviceManager = new DeviceManagerImpl(
247                 config,
248                 dataBroker,
249                 getMessageIntelligenceAgency(),
250                 notificationPublishService,
251                 hashedWheelTimer,
252                 convertorManager,
253                 deviceInitializerProvider,
254                 executorService);
255
256         TranslatorLibraryUtil.injectBasicTranslatorLibrary(deviceManager, convertorManager);
257         ((ExtensionConverterProviderKeeper) deviceManager).setExtensionConverterProvider(extensionConverterManager);
258
259         rpcManager = new RpcManagerImpl(
260                 config,
261                 rpcProviderRegistry,
262                 extensionConverterManager,
263                 convertorManager,
264                 notificationPublishService);
265
266         statisticsManager = new StatisticsManagerImpl(
267                 config,
268                 rpcProviderRegistry,
269                 convertorManager,
270                 executorService);
271
272         roleManager = new RoleManagerImpl(hashedWheelTimer, config, executorService);
273
274         contextChainHolder = new ContextChainHolderImpl(
275                 executorService,
276                 singletonServicesProvider,
277                 entityOwnershipService,
278                 mastershipChangeServiceManager,
279                 config);
280
281         contextChainHolder.addManager(deviceManager);
282         contextChainHolder.addManager(statisticsManager);
283         contextChainHolder.addManager(rpcManager);
284         contextChainHolder.addManager(roleManager);
285
286         connectionManager = new ConnectionManagerImpl(config, executorService, dataBroker, notificationPublishService);
287         connectionManager.setDeviceConnectedHandler(contextChainHolder);
288         connectionManager.setDeviceDisconnectedHandler(contextChainHolder);
289
290         deviceManager.setContextChainHolder(contextChainHolder);
291         deviceManager.initialize();
292         systemReadyMonitor.registerListener(this);
293         LOG.info("registered onSystemBootReady() listener for OpenFlowPluginProvider");
294     }
295
296     @Override
297     public ExtensionConverterRegistrator getExtensionConverterRegistrator() {
298         return extensionConverterManager;
299     }
300
301     @Override
302     public Map<NodeId, FlowGroupInfoHistory> getAllFlowGroupHistories() {
303         return deviceManager.getAllFlowGroupHistories();
304     }
305
306     @Override
307     public FlowGroupInfoHistory getFlowGroupHistory(final NodeId nodeId) {
308         return deviceManager.getFlowGroupHistory(nodeId);
309     }
310
311     @Override
312     @PreDestroy
313     @SuppressWarnings("checkstyle:IllegalCatch")
314     public void close() {
315         try {
316             shutdownSwitchConnections().get(10, TimeUnit.SECONDS);
317         } catch (InterruptedException | ExecutionException | TimeoutException e) {
318             LOG.warn("Failed to shut down switch connections in time {}s", 10, e);
319         }
320
321         gracefulShutdown(contextChainHolder);
322         gracefulShutdown(connectionManager);
323         gracefulShutdown(deviceManager);
324         gracefulShutdown(rpcManager);
325         gracefulShutdown(statisticsManager);
326         gracefulShutdown(roleManager);
327         gracefulShutdown(executorService);
328         gracefulShutdown(hashedWheelTimer);
329         unregisterMXBean(MESSAGE_INTELLIGENCE_AGENCY_MX_BEAN_NAME);
330         diagStatusProvider.reportStatus(ServiceState.UNREGISTERED);
331         try {
332             if (connectionManager != null) {
333                 connectionManager.close();
334                 connectionManager = null;
335             }
336         } catch (Exception e) {
337             LOG.error("Failed to close ConnectionManager", e);
338         }
339     }
340
341     @SuppressWarnings("checkstyle:IllegalCatch")
342     private static void gracefulShutdown(final AutoCloseable closeable) {
343         if (closeable != null) {
344             try {
345                 closeable.close();
346             } catch (Exception e) {
347                 LOG.warn("Failed to shutdown {} gracefully.", closeable);
348             }
349         }
350     }
351
352     private static void gracefulShutdown(final Timer timer) {
353         if (timer != null) {
354             try {
355                 timer.stop();
356             } catch (IllegalStateException e) {
357                 LOG.warn("Failed to shutdown {} gracefully.", timer);
358             }
359         }
360     }
361
362     private static void gracefulShutdown(final ExecutorService executorService) {
363         if (executorService != null) {
364             try {
365                 executorService.shutdownNow();
366             } catch (SecurityException e) {
367                 LOG.warn("Failed to shutdown {} gracefully.", executorService);
368             }
369         }
370     }
371
372     private static void registerMXBean(final Object bean, final String beanName) {
373         final MBeanServer mbs = ManagementFactory.getPlatformMBeanServer();
374
375         try {
376             mbs.registerMBean(bean, new ObjectName(beanName));
377         } catch (MalformedObjectNameException
378                 | NotCompliantMBeanException
379                 | MBeanRegistrationException
380                 | InstanceAlreadyExistsException e) {
381             LOG.warn("Error registering MBean {}", beanName, e);
382         }
383     }
384
385     private static void unregisterMXBean(final String beanName) {
386         final MBeanServer mbs = ManagementFactory.getPlatformMBeanServer();
387
388         try {
389             mbs.unregisterMBean(new ObjectName(beanName));
390         } catch (InstanceNotFoundException
391                 | MBeanRegistrationException
392                 | MalformedObjectNameException e) {
393             LOG.warn("Error unregistering MBean {}", beanName, e);
394         }
395     }
396 }