Merge "Fixup Augmentable and Identifiable methods changing"
[openflowplugin.git] / openflowplugin-impl / src / main / java / org / opendaylight / openflowplugin / impl / OpenFlowPluginProviderImpl.java
1 /*
2  * Copyright (c) 2015 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.openflowplugin.impl;
10
11 import com.google.common.util.concurrent.FutureCallback;
12 import com.google.common.util.concurrent.Futures;
13 import com.google.common.util.concurrent.ListenableFuture;
14 import com.google.common.util.concurrent.ListeningExecutorService;
15 import com.google.common.util.concurrent.MoreExecutors;
16 import io.netty.util.HashedWheelTimer;
17 import io.netty.util.Timer;
18 import java.lang.management.ManagementFactory;
19 import java.util.Collection;
20 import java.util.List;
21 import java.util.Objects;
22 import java.util.concurrent.ExecutionException;
23 import java.util.concurrent.ExecutorService;
24 import java.util.concurrent.SynchronousQueue;
25 import java.util.concurrent.TimeUnit;
26 import java.util.concurrent.TimeoutException;
27 import java.util.stream.Collectors;
28 import javax.annotation.Nonnull;
29 import javax.management.InstanceAlreadyExistsException;
30 import javax.management.InstanceNotFoundException;
31 import javax.management.MBeanRegistrationException;
32 import javax.management.MBeanServer;
33 import javax.management.MalformedObjectNameException;
34 import javax.management.NotCompliantMBeanException;
35 import javax.management.ObjectName;
36 import org.opendaylight.controller.md.sal.binding.api.DataBroker;
37 import org.opendaylight.controller.md.sal.binding.api.NotificationPublishService;
38 import org.opendaylight.controller.sal.binding.api.RpcProviderRegistry;
39 import org.opendaylight.infrautils.diagstatus.ServiceState;
40 import org.opendaylight.infrautils.ready.SystemReadyListener;
41 import org.opendaylight.infrautils.ready.SystemReadyMonitor;
42 import org.opendaylight.mdsal.eos.binding.api.EntityOwnershipService;
43 import org.opendaylight.mdsal.singleton.common.api.ClusterSingletonServiceProvider;
44 import org.opendaylight.openflowjava.protocol.spi.connection.SwitchConnectionProvider;
45 import org.opendaylight.openflowplugin.api.diagstatus.OpenflowPluginDiagStatusProvider;
46 import org.opendaylight.openflowplugin.api.openflow.OpenFlowPluginProvider;
47 import org.opendaylight.openflowplugin.api.openflow.configuration.ConfigurationService;
48 import org.opendaylight.openflowplugin.api.openflow.connection.ConnectionManager;
49 import org.opendaylight.openflowplugin.api.openflow.device.DeviceManager;
50 import org.opendaylight.openflowplugin.api.openflow.mastership.MastershipChangeServiceManager;
51 import org.opendaylight.openflowplugin.api.openflow.role.RoleManager;
52 import org.opendaylight.openflowplugin.api.openflow.rpc.RpcManager;
53 import org.opendaylight.openflowplugin.api.openflow.statistics.StatisticsManager;
54 import org.opendaylight.openflowplugin.api.openflow.statistics.ofpspecific.MessageIntelligenceAgency;
55 import org.opendaylight.openflowplugin.extension.api.ExtensionConverterProviderKeeper;
56 import org.opendaylight.openflowplugin.extension.api.ExtensionConverterRegistrator;
57 import org.opendaylight.openflowplugin.extension.api.OpenFlowPluginExtensionRegistratorProvider;
58 import org.opendaylight.openflowplugin.extension.api.core.extension.ExtensionConverterManager;
59 import org.opendaylight.openflowplugin.impl.configuration.OpenFlowProviderConfigImpl;
60 import org.opendaylight.openflowplugin.impl.connection.ConnectionManagerImpl;
61 import org.opendaylight.openflowplugin.impl.device.DeviceManagerImpl;
62 import org.opendaylight.openflowplugin.impl.device.initialization.DeviceInitializerProvider;
63 import org.opendaylight.openflowplugin.impl.device.initialization.DeviceInitializerProviderFactory;
64 import org.opendaylight.openflowplugin.impl.lifecycle.ContextChainHolderImpl;
65 import org.opendaylight.openflowplugin.impl.protocol.deserialization.DeserializerInjector;
66 import org.opendaylight.openflowplugin.impl.protocol.serialization.SerializerInjector;
67 import org.opendaylight.openflowplugin.impl.role.RoleManagerImpl;
68 import org.opendaylight.openflowplugin.impl.rpc.RpcManagerImpl;
69 import org.opendaylight.openflowplugin.impl.statistics.StatisticsManagerImpl;
70 import org.opendaylight.openflowplugin.impl.statistics.ofpspecific.MessageIntelligenceAgencyImpl;
71 import org.opendaylight.openflowplugin.impl.statistics.ofpspecific.MessageIntelligenceAgencyMXBean;
72 import org.opendaylight.openflowplugin.impl.util.ThreadPoolLoggingExecutor;
73 import org.opendaylight.openflowplugin.impl.util.TranslatorLibraryUtil;
74 import org.opendaylight.openflowplugin.openflow.md.core.extension.ExtensionConverterManagerImpl;
75 import org.opendaylight.openflowplugin.openflow.md.core.sal.convertor.ConvertorManager;
76 import org.opendaylight.openflowplugin.openflow.md.core.sal.convertor.ConvertorManagerFactory;
77 import org.opendaylight.openflowplugin.openflow.md.core.session.OFSessionUtil;
78 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.openflow.provider.config.rev160510.OpenflowProviderConfig;
79 import org.slf4j.Logger;
80 import org.slf4j.LoggerFactory;
81
82 public class OpenFlowPluginProviderImpl implements
83         OpenFlowPluginProvider,
84         OpenFlowPluginExtensionRegistratorProvider,
85         SystemReadyListener {
86
87     private static final Logger LOG = LoggerFactory.getLogger(OpenFlowPluginProviderImpl.class);
88
89     private static final int TICKS_PER_WHEEL = 500; // 0.5 sec.
90     private static final long TICK_DURATION = 10;
91     private static final String POOL_NAME = "ofppool";
92
93     private static final MessageIntelligenceAgency MESSAGE_INTELLIGENCE_AGENCY = new MessageIntelligenceAgencyImpl();
94     private static final String MESSAGE_INTELLIGENCE_AGENCY_MX_BEAN_NAME = String
95             .format("%s:type=%s",
96                     MessageIntelligenceAgencyMXBean.class.getPackage().getName(),
97                     MessageIntelligenceAgencyMXBean.class.getSimpleName());
98
99     private final HashedWheelTimer hashedWheelTimer =
100             new HashedWheelTimer(TICK_DURATION, TimeUnit.MILLISECONDS, TICKS_PER_WHEEL);
101     private final NotificationPublishService notificationPublishService;
102     private final ExtensionConverterManager extensionConverterManager;
103     private final DataBroker dataBroker;
104     private final Collection<SwitchConnectionProvider> switchConnectionProviders;
105     private final DeviceInitializerProvider deviceInitializerProvider;
106     private final ConvertorManager convertorManager;
107     private final RpcProviderRegistry rpcProviderRegistry;
108     private final ClusterSingletonServiceProvider singletonServicesProvider;
109     private final OpenflowProviderConfig config;
110     private final EntityOwnershipService entityOwnershipService;
111     private final MastershipChangeServiceManager mastershipChangeServiceManager;
112     private DeviceManager deviceManager;
113     private RpcManager rpcManager;
114     private StatisticsManager statisticsManager;
115     private RoleManager roleManager;
116     private ConnectionManager connectionManager;
117     private ListeningExecutorService executorService;
118     private ContextChainHolderImpl contextChainHolder;
119     private final OpenflowPluginDiagStatusProvider openflowPluginStatusMonitor;
120
121     public static MessageIntelligenceAgency getMessageIntelligenceAgency() {
122         return MESSAGE_INTELLIGENCE_AGENCY;
123     }
124
125     OpenFlowPluginProviderImpl(final ConfigurationService configurationService,
126                                final List<SwitchConnectionProvider> switchConnectionProviders,
127                                final DataBroker dataBroker,
128                                final RpcProviderRegistry rpcProviderRegistry,
129                                final NotificationPublishService notificationPublishService,
130                                final ClusterSingletonServiceProvider singletonServiceProvider,
131                                final EntityOwnershipService entityOwnershipService,
132                                final MastershipChangeServiceManager mastershipChangeServiceManager,
133                                final OpenflowPluginDiagStatusProvider openflowPluginStatusMonitor,
134                                final SystemReadyMonitor systemReadyMonitor) {
135         this.switchConnectionProviders = switchConnectionProviders;
136         this.dataBroker = dataBroker;
137         this.rpcProviderRegistry = rpcProviderRegistry;
138         this.notificationPublishService = notificationPublishService;
139         this.singletonServicesProvider = singletonServiceProvider;
140         this.entityOwnershipService = entityOwnershipService;
141         convertorManager = ConvertorManagerFactory.createDefaultManager();
142         extensionConverterManager = new ExtensionConverterManagerImpl();
143         deviceInitializerProvider = DeviceInitializerProviderFactory.createDefaultProvider();
144         config = new OpenFlowProviderConfigImpl(configurationService);
145         this.mastershipChangeServiceManager = mastershipChangeServiceManager;
146         this.openflowPluginStatusMonitor = openflowPluginStatusMonitor;
147         systemReadyMonitor.registerListener(this);
148         LOG.debug("registered onSystemBootReady() listener for deferred startSwitchConnections()");
149     }
150
151     @Override
152     public void onSystemBootReady() {
153         LOG.debug("onSystemBootReady() received, starting the switch connections");
154         startSwitchConnections();
155     }
156
157     private void startSwitchConnections() {
158         Futures.addCallback(Futures.allAsList(switchConnectionProviders.stream().map(switchConnectionProvider -> {
159             // Inject OpenFlowPlugin custom serializers and deserializers into OpenFlowJava
160             if (config.isUseSingleLayerSerialization()) {
161                 SerializerInjector.injectSerializers(switchConnectionProvider,
162                         switchConnectionProvider.getConfiguration().isGroupAddModEnabled());
163                 DeserializerInjector.injectDeserializers(switchConnectionProvider);
164             } else {
165                 DeserializerInjector.revertDeserializers(switchConnectionProvider);
166             }
167
168             // Set handler of incoming connections and start switch connection provider
169             switchConnectionProvider.setSwitchConnectionHandler(connectionManager);
170             return switchConnectionProvider.startup();
171         }).collect(Collectors.toSet())), new FutureCallback<List<Boolean>>() {
172             @Override
173             public void onSuccess(@Nonnull final List<Boolean> result) {
174                 LOG.info("All switchConnectionProviders are up and running ({}).", result.size());
175                 openflowPluginStatusMonitor.reportStatus(ServiceState.OPERATIONAL, "switch connections started");
176             }
177
178             @Override
179             public void onFailure(@Nonnull final Throwable throwable) {
180                 LOG.warn("Some switchConnectionProviders failed to start.", throwable);
181                 openflowPluginStatusMonitor.reportStatus(ServiceState.ERROR, throwable);
182             }
183         }, MoreExecutors.directExecutor());
184     }
185
186     private ListenableFuture<List<Boolean>> shutdownSwitchConnections() {
187         final ListenableFuture<List<Boolean>> listListenableFuture =
188                 Futures.allAsList(switchConnectionProviders.stream().map(switchConnectionProvider -> {
189                     // Revert deserializers to their original state
190                     if (config.isUseSingleLayerSerialization()) {
191                         DeserializerInjector.revertDeserializers(switchConnectionProvider);
192                     }
193
194                     // Shutdown switch connection provider
195                     return switchConnectionProvider.shutdown();
196                 }).collect(Collectors.toSet()));
197
198         Futures.addCallback(listListenableFuture, new FutureCallback<List<Boolean>>() {
199             @Override
200             public void onSuccess(@Nonnull final List<Boolean> result) {
201                 LOG.info("All switchConnectionProviders were successfully shut down ({}).", result.size());
202             }
203
204             @Override
205             public void onFailure(@Nonnull final Throwable throwable) {
206                 LOG.warn("Some switchConnectionProviders failed to shutdown.", throwable);
207             }
208         }, MoreExecutors.directExecutor());
209
210         return listListenableFuture;
211     }
212
213     @Override
214     public void initialize() {
215         registerMXBean(MESSAGE_INTELLIGENCE_AGENCY, MESSAGE_INTELLIGENCE_AGENCY_MX_BEAN_NAME);
216
217         // TODO: copied from OpenFlowPluginProvider (Helium) misusesing the old way of distributing extension converters
218         // TODO: rewrite later!
219         OFSessionUtil.getSessionManager().setExtensionConverterProvider(extensionConverterManager);
220
221         // Creates a thread pool that creates new threads as needed, but will reuse previously
222         // constructed threads when they are available.
223         // Threads that have not been used for x seconds are terminated and removed from the cache.
224         executorService = MoreExecutors.listeningDecorator(new ThreadPoolLoggingExecutor(
225                 config.getThreadPoolMinThreads(),
226                 config.getThreadPoolMaxThreads().getValue(),
227                 config.getThreadPoolTimeout(),
228                 TimeUnit.SECONDS, new SynchronousQueue<>(), POOL_NAME));
229
230         deviceManager = new DeviceManagerImpl(
231                 config,
232                 dataBroker,
233                 getMessageIntelligenceAgency(),
234                 notificationPublishService,
235                 hashedWheelTimer,
236                 convertorManager,
237                 deviceInitializerProvider);
238
239         TranslatorLibraryUtil.injectBasicTranslatorLibrary(deviceManager, convertorManager);
240         ((ExtensionConverterProviderKeeper) deviceManager).setExtensionConverterProvider(extensionConverterManager);
241
242         rpcManager = new RpcManagerImpl(
243                 config,
244                 rpcProviderRegistry,
245                 extensionConverterManager,
246                 convertorManager,
247                 notificationPublishService);
248
249         statisticsManager = new StatisticsManagerImpl(
250                 config,
251                 rpcProviderRegistry,
252                 convertorManager,
253                 executorService);
254
255         roleManager = new RoleManagerImpl(hashedWheelTimer, config);
256
257         contextChainHolder = new ContextChainHolderImpl(
258                 executorService,
259                 singletonServicesProvider,
260                 entityOwnershipService,
261                 mastershipChangeServiceManager);
262
263         contextChainHolder.addManager(deviceManager);
264         contextChainHolder.addManager(statisticsManager);
265         contextChainHolder.addManager(rpcManager);
266         contextChainHolder.addManager(roleManager);
267
268         connectionManager = new ConnectionManagerImpl(config, executorService);
269         connectionManager.setDeviceConnectedHandler(contextChainHolder);
270         connectionManager.setDeviceDisconnectedHandler(contextChainHolder);
271
272         deviceManager.setContextChainHolder(contextChainHolder);
273         deviceManager.initialize();
274     }
275
276     @Override
277     public ExtensionConverterRegistrator getExtensionConverterRegistrator() {
278         return extensionConverterManager;
279     }
280
281     @Override
282     public void close() {
283         try {
284             shutdownSwitchConnections().get(10, TimeUnit.SECONDS);
285         } catch (InterruptedException | ExecutionException | TimeoutException e) {
286             LOG.warn("Failed to shut down switch connections in time {}s, error: {}", 10, e);
287         }
288
289         gracefulShutdown(contextChainHolder);
290         gracefulShutdown(deviceManager);
291         gracefulShutdown(rpcManager);
292         gracefulShutdown(statisticsManager);
293         gracefulShutdown(roleManager);
294         gracefulShutdown(executorService);
295         gracefulShutdown(hashedWheelTimer);
296         unregisterMXBean(MESSAGE_INTELLIGENCE_AGENCY_MX_BEAN_NAME);
297         openflowPluginStatusMonitor.reportStatus(ServiceState.UNREGISTERED, "service shutting down");
298     }
299
300     @SuppressWarnings("checkstyle:IllegalCatch")
301     private static void gracefulShutdown(final AutoCloseable closeable) {
302         if (Objects.isNull(closeable)) {
303             return;
304         }
305
306         try {
307             closeable.close();
308         } catch (Exception e) {
309             LOG.warn("Failed to shutdown {} gracefully.", closeable);
310         }
311     }
312
313     private static void gracefulShutdown(final Timer timer) {
314         if (Objects.isNull(timer)) {
315             return;
316         }
317
318         try {
319             timer.stop();
320         } catch (IllegalStateException e) {
321             LOG.warn("Failed to shutdown {} gracefully.", timer);
322         }
323     }
324
325     private static void gracefulShutdown(final ExecutorService executorService) {
326         if (Objects.isNull(executorService)) {
327             return;
328         }
329
330         try {
331             executorService.shutdownNow();
332         } catch (SecurityException e) {
333             LOG.warn("Failed to shutdown {} gracefully.", executorService);
334         }
335     }
336
337     private static void registerMXBean(final Object bean, final String beanName) {
338         final MBeanServer mbs = ManagementFactory.getPlatformMBeanServer();
339
340         try {
341             mbs.registerMBean(bean, new ObjectName(beanName));
342         } catch (MalformedObjectNameException
343                 | NotCompliantMBeanException
344                 | MBeanRegistrationException
345                 | InstanceAlreadyExistsException e) {
346             LOG.warn("Error registering MBean {}", e);
347         }
348     }
349
350     private static void unregisterMXBean(final String beanName) {
351         final MBeanServer mbs = ManagementFactory.getPlatformMBeanServer();
352
353         try {
354             mbs.unregisterMBean(new ObjectName(beanName));
355         } catch (InstanceNotFoundException
356                 | MBeanRegistrationException
357                 | MalformedObjectNameException e) {
358             LOG.warn("Error unregistering MBean {}", e);
359         }
360     }
361 }