Merge "Change OFConstants.OFPCML_NO_BUFFER to Uint16"
[openflowplugin.git] / openflowplugin-impl / src / main / java / org / opendaylight / openflowplugin / impl / OpenFlowPluginProviderImpl.java
1 /*
2  * Copyright (c) 2015 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.openflowplugin.impl;
9
10 import com.google.common.annotations.VisibleForTesting;
11 import com.google.common.util.concurrent.FutureCallback;
12 import com.google.common.util.concurrent.Futures;
13 import com.google.common.util.concurrent.ListenableFuture;
14 import com.google.common.util.concurrent.ListeningExecutorService;
15 import com.google.common.util.concurrent.MoreExecutors;
16 import com.google.common.util.concurrent.SettableFuture;
17 import io.netty.util.HashedWheelTimer;
18 import io.netty.util.Timer;
19 import java.lang.management.ManagementFactory;
20 import java.util.Collection;
21 import java.util.List;
22 import java.util.concurrent.ExecutionException;
23 import java.util.concurrent.ExecutorService;
24 import java.util.concurrent.Future;
25 import java.util.concurrent.SynchronousQueue;
26 import java.util.concurrent.TimeUnit;
27 import java.util.concurrent.TimeoutException;
28 import java.util.stream.Collectors;
29 import javax.annotation.PostConstruct;
30 import javax.annotation.PreDestroy;
31 import javax.inject.Inject;
32 import javax.inject.Singleton;
33 import javax.management.InstanceAlreadyExistsException;
34 import javax.management.InstanceNotFoundException;
35 import javax.management.MBeanRegistrationException;
36 import javax.management.MBeanServer;
37 import javax.management.MalformedObjectNameException;
38 import javax.management.NotCompliantMBeanException;
39 import javax.management.ObjectName;
40 import org.apache.aries.blueprint.annotation.service.Reference;
41 import org.apache.aries.blueprint.annotation.service.Service;
42 import org.opendaylight.infrautils.diagstatus.ServiceState;
43 import org.opendaylight.infrautils.ready.SystemReadyListener;
44 import org.opendaylight.infrautils.ready.SystemReadyMonitor;
45 import org.opendaylight.mdsal.binding.api.DataBroker;
46 import org.opendaylight.mdsal.binding.api.NotificationPublishService;
47 import org.opendaylight.mdsal.binding.api.RpcProviderService;
48 import org.opendaylight.mdsal.eos.binding.api.EntityOwnershipService;
49 import org.opendaylight.mdsal.singleton.common.api.ClusterSingletonServiceProvider;
50 import org.opendaylight.openflowjava.protocol.api.connection.OpenflowDiagStatusProvider;
51 import org.opendaylight.openflowjava.protocol.spi.connection.SwitchConnectionProvider;
52 import org.opendaylight.openflowjava.protocol.spi.connection.SwitchConnectionProviderList;
53 import org.opendaylight.openflowplugin.api.openflow.FlowGroupCacheManager;
54 import org.opendaylight.openflowplugin.api.openflow.OpenFlowPluginProvider;
55 import org.opendaylight.openflowplugin.api.openflow.configuration.ConfigurationService;
56 import org.opendaylight.openflowplugin.api.openflow.connection.ConnectionManager;
57 import org.opendaylight.openflowplugin.api.openflow.device.DeviceManager;
58 import org.opendaylight.openflowplugin.api.openflow.mastership.MastershipChangeServiceManager;
59 import org.opendaylight.openflowplugin.api.openflow.role.RoleManager;
60 import org.opendaylight.openflowplugin.api.openflow.rpc.RpcManager;
61 import org.opendaylight.openflowplugin.api.openflow.statistics.StatisticsManager;
62 import org.opendaylight.openflowplugin.api.openflow.statistics.ofpspecific.MessageIntelligenceAgency;
63 import org.opendaylight.openflowplugin.extension.api.ExtensionConverterProviderKeeper;
64 import org.opendaylight.openflowplugin.extension.api.ExtensionConverterRegistrator;
65 import org.opendaylight.openflowplugin.extension.api.OpenFlowPluginExtensionRegistratorProvider;
66 import org.opendaylight.openflowplugin.extension.api.core.extension.ExtensionConverterManager;
67 import org.opendaylight.openflowplugin.impl.configuration.OpenFlowProviderConfigImpl;
68 import org.opendaylight.openflowplugin.impl.connection.ConnectionManagerImpl;
69 import org.opendaylight.openflowplugin.impl.device.DeviceManagerImpl;
70 import org.opendaylight.openflowplugin.impl.device.initialization.DeviceInitializerProvider;
71 import org.opendaylight.openflowplugin.impl.device.initialization.DeviceInitializerProviderFactory;
72 import org.opendaylight.openflowplugin.impl.lifecycle.ContextChainHolderImpl;
73 import org.opendaylight.openflowplugin.impl.protocol.deserialization.DeserializerInjector;
74 import org.opendaylight.openflowplugin.impl.protocol.serialization.SerializerInjector;
75 import org.opendaylight.openflowplugin.impl.role.RoleManagerImpl;
76 import org.opendaylight.openflowplugin.impl.rpc.RpcManagerImpl;
77 import org.opendaylight.openflowplugin.impl.statistics.StatisticsManagerImpl;
78 import org.opendaylight.openflowplugin.impl.statistics.ofpspecific.MessageIntelligenceAgencyImpl;
79 import org.opendaylight.openflowplugin.impl.statistics.ofpspecific.MessageIntelligenceAgencyMXBean;
80 import org.opendaylight.openflowplugin.impl.util.ThreadPoolLoggingExecutor;
81 import org.opendaylight.openflowplugin.impl.util.TranslatorLibraryUtil;
82 import org.opendaylight.openflowplugin.openflow.md.core.extension.ExtensionConverterManagerImpl;
83 import org.opendaylight.openflowplugin.openflow.md.core.sal.convertor.ConvertorManager;
84 import org.opendaylight.openflowplugin.openflow.md.core.sal.convertor.ConvertorManagerFactory;
85 import org.opendaylight.openflowplugin.openflow.md.core.session.OFSessionUtil;
86 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.openflow.provider.config.rev160510.OpenflowProviderConfig;
87 import org.slf4j.Logger;
88 import org.slf4j.LoggerFactory;
89
90 @Singleton
91 @Service(classes = { OpenFlowPluginProvider.class, OpenFlowPluginExtensionRegistratorProvider.class })
92 public class OpenFlowPluginProviderImpl implements
93         OpenFlowPluginProvider,
94         OpenFlowPluginExtensionRegistratorProvider,
95         SystemReadyListener {
96
97     private static final Logger LOG = LoggerFactory.getLogger(OpenFlowPluginProviderImpl.class);
98
99     private static final int TICKS_PER_WHEEL = 500; // 0.5 sec.
100     private static final long TICK_DURATION = 10;
101     private static final String POOL_NAME = "ofppool";
102
103     private static final MessageIntelligenceAgency MESSAGE_INTELLIGENCE_AGENCY = new MessageIntelligenceAgencyImpl();
104     private static final String MESSAGE_INTELLIGENCE_AGENCY_MX_BEAN_NAME = String
105             .format("%s:type=%s",
106                     MessageIntelligenceAgencyMXBean.class.getPackage().getName(),
107                     MessageIntelligenceAgencyMXBean.class.getSimpleName());
108
109     private final HashedWheelTimer hashedWheelTimer =
110             new HashedWheelTimer(TICK_DURATION, TimeUnit.MILLISECONDS, TICKS_PER_WHEEL);
111     private final NotificationPublishService notificationPublishService;
112     private final ExtensionConverterManager extensionConverterManager;
113     private final DataBroker dataBroker;
114     private final Collection<SwitchConnectionProvider> switchConnectionProviders;
115     private final DeviceInitializerProvider deviceInitializerProvider;
116     private final ConvertorManager convertorManager;
117     private final RpcProviderService rpcProviderRegistry;
118     private final ClusterSingletonServiceProvider singletonServicesProvider;
119     private final OpenflowProviderConfig config;
120     private final EntityOwnershipService entityOwnershipService;
121     private final MastershipChangeServiceManager mastershipChangeServiceManager;
122     private DeviceManager deviceManager;
123     private RpcManager rpcManager;
124     private StatisticsManager statisticsManager;
125     private RoleManager roleManager;
126     private ConnectionManager connectionManager;
127     private ListeningExecutorService executorService;
128     private ContextChainHolderImpl contextChainHolder;
129     private final OpenflowDiagStatusProvider openflowDiagStatusProvider;
130     private final SystemReadyMonitor systemReadyMonitor;
131     private final FlowGroupCacheManager flowGroupCacheManager;
132     private final SettableFuture<Void> fullyStarted = SettableFuture.create();
133     private static final String OPENFLOW_SERVICE_NAME = "OPENFLOW";
134
135     public static MessageIntelligenceAgency getMessageIntelligenceAgency() {
136         return MESSAGE_INTELLIGENCE_AGENCY;
137     }
138
139     @Inject
140     public OpenFlowPluginProviderImpl(final ConfigurationService configurationService,
141                                final SwitchConnectionProviderList switchConnectionProviders,
142                                final PingPongDataBroker pingPongDataBroker,
143                                final @Reference RpcProviderService rpcProviderRegistry,
144                                final @Reference NotificationPublishService notificationPublishService,
145                                final @Reference ClusterSingletonServiceProvider singletonServiceProvider,
146                                final @Reference EntityOwnershipService entityOwnershipService,
147                                final MastershipChangeServiceManager mastershipChangeServiceManager,
148                                final @Reference OpenflowDiagStatusProvider openflowDiagStatusProvider,
149                                final @Reference SystemReadyMonitor systemReadyMonitor,
150                                final FlowGroupCacheManager flowGroupCacheManager) {
151         this.switchConnectionProviders = switchConnectionProviders;
152         this.dataBroker = pingPongDataBroker;
153         this.rpcProviderRegistry = rpcProviderRegistry;
154         this.notificationPublishService = notificationPublishService;
155         this.singletonServicesProvider = singletonServiceProvider;
156         this.entityOwnershipService = entityOwnershipService;
157         convertorManager = ConvertorManagerFactory.createDefaultManager();
158         extensionConverterManager = new ExtensionConverterManagerImpl();
159         deviceInitializerProvider = DeviceInitializerProviderFactory.createDefaultProvider();
160         config = new OpenFlowProviderConfigImpl(configurationService);
161         this.mastershipChangeServiceManager = mastershipChangeServiceManager;
162         this.openflowDiagStatusProvider = openflowDiagStatusProvider;
163         this.systemReadyMonitor = systemReadyMonitor;
164         this.flowGroupCacheManager = flowGroupCacheManager;
165     }
166
167     @Override
168     public void onSystemBootReady() {
169         LOG.info("onSystemBootReady() received, starting the switch connections");
170         Futures.addCallback(Futures.allAsList(switchConnectionProviders.stream().map(switchConnectionProvider -> {
171             // Inject OpenFlowPlugin custom serializers and deserializers into OpenFlowJava
172             if (config.isUseSingleLayerSerialization()) {
173                 SerializerInjector.injectSerializers(switchConnectionProvider,
174                         switchConnectionProvider.getConfiguration().isGroupAddModEnabled());
175                 DeserializerInjector.injectDeserializers(switchConnectionProvider);
176             } else {
177                 DeserializerInjector.revertDeserializers(switchConnectionProvider);
178             }
179
180             // Set handler of incoming connections and start switch connection provider
181             switchConnectionProvider.setSwitchConnectionHandler(connectionManager);
182             return switchConnectionProvider.startup();
183         }).collect(Collectors.toSet())), new FutureCallback<List<Boolean>>() {
184             @Override
185             public void onSuccess(final List<Boolean> result) {
186                 LOG.info("All switchConnectionProviders are up and running ({}).", result.size());
187                 openflowDiagStatusProvider.reportStatus(OPENFLOW_SERVICE_NAME, ServiceState.OPERATIONAL);
188                 fullyStarted.set(null);
189             }
190
191             @Override
192             public void onFailure(final Throwable throwable) {
193                 LOG.warn("Some switchConnectionProviders failed to start.", throwable);
194                 openflowDiagStatusProvider.reportStatus(OPENFLOW_SERVICE_NAME, throwable);
195                 fullyStarted.setException(throwable);
196             }
197         }, MoreExecutors.directExecutor());
198     }
199
200     @VisibleForTesting
201     public Future<Void> getFullyStarted() {
202         return fullyStarted;
203     }
204
205     private ListenableFuture<List<Boolean>> shutdownSwitchConnections() {
206         final ListenableFuture<List<Boolean>> listListenableFuture =
207                 Futures.allAsList(switchConnectionProviders.stream().map(switchConnectionProvider -> {
208                     // Revert deserializers to their original state
209                     if (config.isUseSingleLayerSerialization()) {
210                         DeserializerInjector.revertDeserializers(switchConnectionProvider);
211                     }
212
213                     // Shutdown switch connection provider
214                     return switchConnectionProvider.shutdown();
215                 }).collect(Collectors.toSet()));
216
217         Futures.addCallback(listListenableFuture, new FutureCallback<List<Boolean>>() {
218             @Override
219             public void onSuccess(final List<Boolean> result) {
220                 LOG.info("All switchConnectionProviders were successfully shut down ({}).", result.size());
221             }
222
223             @Override
224             public void onFailure(final Throwable throwable) {
225                 LOG.warn("Some switchConnectionProviders failed to shutdown.", throwable);
226             }
227         }, MoreExecutors.directExecutor());
228
229         return listListenableFuture;
230     }
231
232     @Override
233     @PostConstruct
234     public void initialize() {
235         registerMXBean(MESSAGE_INTELLIGENCE_AGENCY, MESSAGE_INTELLIGENCE_AGENCY_MX_BEAN_NAME);
236
237         // TODO: copied from OpenFlowPluginProvider (Helium) misusesing the old way of distributing extension converters
238         // TODO: rewrite later!
239         OFSessionUtil.getSessionManager().setExtensionConverterProvider(extensionConverterManager);
240
241         // Creates a thread pool that creates new threads as needed, but will reuse previously
242         // constructed threads when they are available.
243         // Threads that have not been used for x seconds are terminated and removed from the cache.
244         executorService = MoreExecutors.listeningDecorator(new ThreadPoolLoggingExecutor(
245                 config.getThreadPoolMinThreads().toJava(),
246                 config.getThreadPoolMaxThreads().getValue().toJava(),
247                 config.getThreadPoolTimeout().toJava(),
248                 TimeUnit.SECONDS, new SynchronousQueue<>(), POOL_NAME));
249
250         deviceManager = new DeviceManagerImpl(
251                 config,
252                 dataBroker,
253                 getMessageIntelligenceAgency(),
254                 notificationPublishService,
255                 hashedWheelTimer,
256                 convertorManager,
257                 deviceInitializerProvider,
258                 executorService);
259
260         TranslatorLibraryUtil.injectBasicTranslatorLibrary(deviceManager, convertorManager);
261         ((ExtensionConverterProviderKeeper) deviceManager).setExtensionConverterProvider(extensionConverterManager);
262
263         rpcManager = new RpcManagerImpl(
264                 config,
265                 rpcProviderRegistry,
266                 extensionConverterManager,
267                 convertorManager,
268                 notificationPublishService,
269                 flowGroupCacheManager);
270
271         statisticsManager = new StatisticsManagerImpl(
272                 config,
273                 rpcProviderRegistry,
274                 convertorManager,
275                 executorService);
276
277         roleManager = new RoleManagerImpl(hashedWheelTimer, config, executorService);
278
279         contextChainHolder = new ContextChainHolderImpl(
280                 executorService,
281                 singletonServicesProvider,
282                 entityOwnershipService,
283                 mastershipChangeServiceManager,
284                 config);
285
286         contextChainHolder.addManager(deviceManager);
287         contextChainHolder.addManager(statisticsManager);
288         contextChainHolder.addManager(rpcManager);
289         contextChainHolder.addManager(roleManager);
290
291         connectionManager = new ConnectionManagerImpl(config, executorService, dataBroker, notificationPublishService);
292         connectionManager.setDeviceConnectedHandler(contextChainHolder);
293         connectionManager.setDeviceDisconnectedHandler(contextChainHolder);
294
295         deviceManager.setContextChainHolder(contextChainHolder);
296         deviceManager.initialize();
297         systemReadyMonitor.registerListener(this);
298         LOG.info("registered onSystemBootReady() listener for OpenFlowPluginProvider");
299     }
300
301     @Override
302     public ExtensionConverterRegistrator getExtensionConverterRegistrator() {
303         return extensionConverterManager;
304     }
305
306     @Override
307     @PreDestroy
308     @SuppressWarnings("checkstyle:IllegalCatch")
309     public void close() {
310         try {
311             shutdownSwitchConnections().get(10, TimeUnit.SECONDS);
312         } catch (InterruptedException | ExecutionException | TimeoutException e) {
313             LOG.warn("Failed to shut down switch connections in time {}s", 10, e);
314         }
315
316         gracefulShutdown(contextChainHolder);
317         gracefulShutdown(connectionManager);
318         gracefulShutdown(deviceManager);
319         gracefulShutdown(rpcManager);
320         gracefulShutdown(statisticsManager);
321         gracefulShutdown(roleManager);
322         gracefulShutdown(executorService);
323         gracefulShutdown(hashedWheelTimer);
324         unregisterMXBean(MESSAGE_INTELLIGENCE_AGENCY_MX_BEAN_NAME);
325         openflowDiagStatusProvider.reportStatus(ServiceState.UNREGISTERED);
326         try {
327             if (connectionManager != null) {
328                 connectionManager.close();
329                 connectionManager = null;
330             }
331         } catch (Exception e) {
332             LOG.error("Failed to close ConnectionManager", e);
333         }
334     }
335
336     @SuppressWarnings("checkstyle:IllegalCatch")
337     private static void gracefulShutdown(final AutoCloseable closeable) {
338         if (closeable != null) {
339             try {
340                 closeable.close();
341             } catch (Exception e) {
342                 LOG.warn("Failed to shutdown {} gracefully.", closeable);
343             }
344         }
345     }
346
347     private static void gracefulShutdown(final Timer timer) {
348         if (timer != null) {
349             try {
350                 timer.stop();
351             } catch (IllegalStateException e) {
352                 LOG.warn("Failed to shutdown {} gracefully.", timer);
353             }
354         }
355     }
356
357     private static void gracefulShutdown(final ExecutorService executorService) {
358         if (executorService != null) {
359             try {
360                 executorService.shutdownNow();
361             } catch (SecurityException e) {
362                 LOG.warn("Failed to shutdown {} gracefully.", executorService);
363             }
364         }
365     }
366
367     private static void registerMXBean(final Object bean, final String beanName) {
368         final MBeanServer mbs = ManagementFactory.getPlatformMBeanServer();
369
370         try {
371             mbs.registerMBean(bean, new ObjectName(beanName));
372         } catch (MalformedObjectNameException
373                 | NotCompliantMBeanException
374                 | MBeanRegistrationException
375                 | InstanceAlreadyExistsException e) {
376             LOG.warn("Error registering MBean {}", beanName, e);
377         }
378     }
379
380     private static void unregisterMXBean(final String beanName) {
381         final MBeanServer mbs = ManagementFactory.getPlatformMBeanServer();
382
383         try {
384             mbs.unregisterMBean(new ObjectName(beanName));
385         } catch (InstanceNotFoundException
386                 | MBeanRegistrationException
387                 | MalformedObjectNameException e) {
388             LOG.warn("Error unregistering MBean {}", beanName, e);
389         }
390     }
391 }