Hide PingPongDataBroker
[openflowplugin.git] / openflowplugin-impl / src / main / java / org / opendaylight / openflowplugin / impl / OpenFlowPluginProviderImpl.java
1 /*
2  * Copyright (c) 2015 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.openflowplugin.impl;
9
10 import com.google.common.annotations.VisibleForTesting;
11 import com.google.common.util.concurrent.FutureCallback;
12 import com.google.common.util.concurrent.Futures;
13 import com.google.common.util.concurrent.ListenableFuture;
14 import com.google.common.util.concurrent.MoreExecutors;
15 import com.google.common.util.concurrent.SettableFuture;
16 import io.netty.util.HashedWheelTimer;
17 import io.netty.util.Timer;
18 import java.lang.management.ManagementFactory;
19 import java.util.Collection;
20 import java.util.List;
21 import java.util.Map;
22 import java.util.concurrent.ExecutionException;
23 import java.util.concurrent.ExecutorService;
24 import java.util.concurrent.Future;
25 import java.util.concurrent.SynchronousQueue;
26 import java.util.concurrent.TimeUnit;
27 import java.util.concurrent.TimeoutException;
28 import java.util.stream.Collectors;
29 import javax.annotation.PostConstruct;
30 import javax.annotation.PreDestroy;
31 import javax.inject.Inject;
32 import javax.inject.Singleton;
33 import javax.management.InstanceAlreadyExistsException;
34 import javax.management.InstanceNotFoundException;
35 import javax.management.MBeanRegistrationException;
36 import javax.management.MBeanServer;
37 import javax.management.MalformedObjectNameException;
38 import javax.management.NotCompliantMBeanException;
39 import javax.management.ObjectName;
40 import org.opendaylight.infrautils.diagstatus.ServiceState;
41 import org.opendaylight.infrautils.ready.SystemReadyListener;
42 import org.opendaylight.infrautils.ready.SystemReadyMonitor;
43 import org.opendaylight.mdsal.binding.api.DataBroker;
44 import org.opendaylight.mdsal.binding.api.NotificationPublishService;
45 import org.opendaylight.mdsal.binding.api.RpcProviderService;
46 import org.opendaylight.mdsal.eos.binding.api.EntityOwnershipService;
47 import org.opendaylight.mdsal.singleton.api.ClusterSingletonServiceProvider;
48 import org.opendaylight.openflowjava.protocol.spi.connection.SwitchConnectionProvider;
49 import org.opendaylight.openflowplugin.api.openflow.FlowGroupInfoHistories;
50 import org.opendaylight.openflowplugin.api.openflow.FlowGroupInfoHistory;
51 import org.opendaylight.openflowplugin.api.openflow.OpenFlowPluginProvider;
52 import org.opendaylight.openflowplugin.api.openflow.configuration.ConfigurationService;
53 import org.opendaylight.openflowplugin.api.openflow.connection.ConnectionManager;
54 import org.opendaylight.openflowplugin.api.openflow.device.DeviceManager;
55 import org.opendaylight.openflowplugin.api.openflow.mastership.MastershipChangeServiceManager;
56 import org.opendaylight.openflowplugin.api.openflow.role.RoleManager;
57 import org.opendaylight.openflowplugin.api.openflow.rpc.RpcManager;
58 import org.opendaylight.openflowplugin.api.openflow.statistics.StatisticsManager;
59 import org.opendaylight.openflowplugin.api.openflow.statistics.ofpspecific.MessageIntelligenceAgency;
60 import org.opendaylight.openflowplugin.extension.api.ExtensionConverterProviderKeeper;
61 import org.opendaylight.openflowplugin.extension.api.ExtensionConverterRegistrator;
62 import org.opendaylight.openflowplugin.extension.api.OpenFlowPluginExtensionRegistratorProvider;
63 import org.opendaylight.openflowplugin.extension.api.core.extension.ExtensionConverterManager;
64 import org.opendaylight.openflowplugin.impl.configuration.OpenFlowProviderConfigImpl;
65 import org.opendaylight.openflowplugin.impl.connection.ConnectionManagerImpl;
66 import org.opendaylight.openflowplugin.impl.device.DeviceManagerImpl;
67 import org.opendaylight.openflowplugin.impl.device.initialization.DeviceInitializerProvider;
68 import org.opendaylight.openflowplugin.impl.device.initialization.DeviceInitializerProviderFactory;
69 import org.opendaylight.openflowplugin.impl.lifecycle.ContextChainHolderImpl;
70 import org.opendaylight.openflowplugin.impl.protocol.deserialization.DeserializerInjector;
71 import org.opendaylight.openflowplugin.impl.protocol.serialization.SerializerInjector;
72 import org.opendaylight.openflowplugin.impl.role.RoleManagerImpl;
73 import org.opendaylight.openflowplugin.impl.rpc.RpcManagerImpl;
74 import org.opendaylight.openflowplugin.impl.statistics.StatisticsManagerImpl;
75 import org.opendaylight.openflowplugin.impl.statistics.ofpspecific.MessageIntelligenceAgencyImpl;
76 import org.opendaylight.openflowplugin.impl.statistics.ofpspecific.MessageIntelligenceAgencyMXBean;
77 import org.opendaylight.openflowplugin.impl.util.ThreadPoolLoggingExecutor;
78 import org.opendaylight.openflowplugin.impl.util.TranslatorLibraryUtil;
79 import org.opendaylight.openflowplugin.openflow.md.core.extension.ExtensionConverterManagerImpl;
80 import org.opendaylight.openflowplugin.openflow.md.core.sal.convertor.ConvertorManager;
81 import org.opendaylight.openflowplugin.openflow.md.core.sal.convertor.ConvertorManagerFactory;
82 import org.opendaylight.openflowplugin.openflow.md.core.session.OFSessionUtil;
83 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeId;
84 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.openflow.provider.config.rev160510.OpenflowProviderConfig;
85 import org.slf4j.Logger;
86 import org.slf4j.LoggerFactory;
87
88 @Singleton
89 public class OpenFlowPluginProviderImpl implements
90         OpenFlowPluginProvider,
91         OpenFlowPluginExtensionRegistratorProvider,
92         FlowGroupInfoHistories,
93         SystemReadyListener {
94
95     private static final Logger LOG = LoggerFactory.getLogger(OpenFlowPluginProviderImpl.class);
96
97     private static final int TICKS_PER_WHEEL = 500; // 0.5 sec.
98     private static final long TICK_DURATION = 10;
99     private static final String POOL_NAME = "ofppool";
100
101     private static final MessageIntelligenceAgency MESSAGE_INTELLIGENCE_AGENCY = new MessageIntelligenceAgencyImpl();
102     private static final String MESSAGE_INTELLIGENCE_AGENCY_MX_BEAN_NAME = String
103             .format("%s:type=%s",
104                     MessageIntelligenceAgencyMXBean.class.getPackage().getName(),
105                     MessageIntelligenceAgencyMXBean.class.getSimpleName());
106
107     private final HashedWheelTimer hashedWheelTimer =
108             new HashedWheelTimer(TICK_DURATION, TimeUnit.MILLISECONDS, TICKS_PER_WHEEL);
109     private final NotificationPublishService notificationPublishService;
110     private final ExtensionConverterManager extensionConverterManager;
111     private final DataBroker dataBroker;
112     private final Collection<SwitchConnectionProvider> switchConnectionProviders;
113     private final DeviceInitializerProvider deviceInitializerProvider;
114     private final ConvertorManager convertorManager;
115     private final RpcProviderService rpcProviderRegistry;
116     private final ClusterSingletonServiceProvider singletonServicesProvider;
117     private final OpenflowProviderConfig config;
118     private final EntityOwnershipService entityOwnershipService;
119     private final MastershipChangeServiceManager mastershipChangeServiceManager;
120     private DeviceManager deviceManager;
121     private RpcManager rpcManager;
122     private StatisticsManager statisticsManager;
123     private RoleManager roleManager;
124     private ConnectionManager connectionManager;
125     private ExecutorService executorService;
126     private ContextChainHolderImpl contextChainHolder;
127     private final DiagStatusProvider diagStatusProvider;
128     private final SystemReadyMonitor systemReadyMonitor;
129     private final SettableFuture<Void> fullyStarted = SettableFuture.create();
130     private static final String OPENFLOW_SERVICE_NAME = "OPENFLOW";
131
132     public static MessageIntelligenceAgency getMessageIntelligenceAgency() {
133         return MESSAGE_INTELLIGENCE_AGENCY;
134     }
135
136     @Inject
137     public OpenFlowPluginProviderImpl(final ConfigurationService configurationService,
138                                final List<SwitchConnectionProvider> switchConnectionProviders,
139                                final DataBroker dataBroker,
140                                final RpcProviderService rpcProviderRegistry,
141                                final NotificationPublishService notificationPublishService,
142                                final ClusterSingletonServiceProvider singletonServiceProvider,
143                                final EntityOwnershipService entityOwnershipService,
144                                final MastershipChangeServiceManager mastershipChangeServiceManager,
145                                final DiagStatusProvider diagStatusProvider,
146                                final SystemReadyMonitor systemReadyMonitor) {
147         this.switchConnectionProviders = switchConnectionProviders;
148         this.dataBroker = new PingPongDataBroker(dataBroker);
149         this.rpcProviderRegistry = rpcProviderRegistry;
150         this.notificationPublishService = notificationPublishService;
151         singletonServicesProvider = singletonServiceProvider;
152         this.entityOwnershipService = entityOwnershipService;
153         convertorManager = ConvertorManagerFactory.createDefaultManager();
154         extensionConverterManager = new ExtensionConverterManagerImpl();
155         deviceInitializerProvider = DeviceInitializerProviderFactory.createDefaultProvider();
156         config = new OpenFlowProviderConfigImpl(configurationService);
157         this.mastershipChangeServiceManager = mastershipChangeServiceManager;
158         this.diagStatusProvider = diagStatusProvider;
159         this.systemReadyMonitor = systemReadyMonitor;
160     }
161
162     @Override
163     public void onSystemBootReady() {
164         LOG.info("onSystemBootReady() received, starting the switch connections");
165         Futures.addCallback(Futures.allAsList(switchConnectionProviders.stream().map(switchConnectionProvider -> {
166             // Inject OpenFlowPlugin custom serializers and deserializers into OpenFlowJava
167             if (config.getUseSingleLayerSerialization()) {
168                 SerializerInjector.injectSerializers(switchConnectionProvider,
169                         switchConnectionProvider.getConfiguration().isGroupAddModEnabled());
170                 DeserializerInjector.injectDeserializers(switchConnectionProvider);
171             } else {
172                 DeserializerInjector.revertDeserializers(switchConnectionProvider);
173             }
174
175             // Set handler of incoming connections and start switch connection provider
176             switchConnectionProvider.setSwitchConnectionHandler(connectionManager);
177             return switchConnectionProvider.startup();
178         }).collect(Collectors.toSet())), new FutureCallback<List<Boolean>>() {
179             @Override
180             public void onSuccess(final List<Boolean> result) {
181                 LOG.info("All switchConnectionProviders are up and running ({}).", result.size());
182                 diagStatusProvider.reportStatus(ServiceState.OPERATIONAL);
183                 fullyStarted.set(null);
184             }
185
186             @Override
187             public void onFailure(final Throwable throwable) {
188                 LOG.warn("Some switchConnectionProviders failed to start.", throwable);
189                 diagStatusProvider.reportStatus(ServiceState.ERROR, throwable);
190                 fullyStarted.setException(throwable);
191             }
192         }, MoreExecutors.directExecutor());
193     }
194
195     @VisibleForTesting
196     public Future<Void> getFullyStarted() {
197         return fullyStarted;
198     }
199
200     private ListenableFuture<List<Boolean>> shutdownSwitchConnections() {
201         final ListenableFuture<List<Boolean>> listListenableFuture =
202                 Futures.allAsList(switchConnectionProviders.stream().map(switchConnectionProvider -> {
203                     // Revert deserializers to their original state
204                     if (config.getUseSingleLayerSerialization()) {
205                         DeserializerInjector.revertDeserializers(switchConnectionProvider);
206                     }
207
208                     // Shutdown switch connection provider
209                     return switchConnectionProvider.shutdown();
210                 }).collect(Collectors.toSet()));
211
212         Futures.addCallback(listListenableFuture, new FutureCallback<List<Boolean>>() {
213             @Override
214             public void onSuccess(final List<Boolean> result) {
215                 LOG.info("All switchConnectionProviders were successfully shut down ({}).", result.size());
216             }
217
218             @Override
219             public void onFailure(final Throwable throwable) {
220                 LOG.warn("Some switchConnectionProviders failed to shutdown.", throwable);
221             }
222         }, MoreExecutors.directExecutor());
223
224         return listListenableFuture;
225     }
226
227     @Override
228     @PostConstruct
229     public void initialize() {
230         registerMXBean(MESSAGE_INTELLIGENCE_AGENCY, MESSAGE_INTELLIGENCE_AGENCY_MX_BEAN_NAME);
231
232         // TODO: copied from OpenFlowPluginProvider (Helium) misusesing the old way of distributing extension converters
233         // TODO: rewrite later!
234         OFSessionUtil.getSessionManager().setExtensionConverterProvider(extensionConverterManager);
235
236         // Creates a thread pool that creates new threads as needed, but will reuse previously
237         // constructed threads when they are available.
238         // Threads that have not been used for x seconds are terminated and removed from the cache.
239         executorService = new ThreadPoolLoggingExecutor(
240                 config.getThreadPoolMinThreads().toJava(),
241                 config.getThreadPoolMaxThreads().getValue().toJava(),
242                 config.getThreadPoolTimeout().toJava(),
243                 TimeUnit.SECONDS, new SynchronousQueue<>(), POOL_NAME);
244
245         deviceManager = new DeviceManagerImpl(
246                 config,
247                 dataBroker,
248                 getMessageIntelligenceAgency(),
249                 notificationPublishService,
250                 hashedWheelTimer,
251                 convertorManager,
252                 deviceInitializerProvider,
253                 executorService);
254
255         TranslatorLibraryUtil.injectBasicTranslatorLibrary(deviceManager, convertorManager);
256         ((ExtensionConverterProviderKeeper) deviceManager).setExtensionConverterProvider(extensionConverterManager);
257
258         rpcManager = new RpcManagerImpl(
259                 config,
260                 rpcProviderRegistry,
261                 extensionConverterManager,
262                 convertorManager,
263                 notificationPublishService);
264
265         statisticsManager = new StatisticsManagerImpl(
266                 config,
267                 rpcProviderRegistry,
268                 convertorManager,
269                 executorService);
270
271         roleManager = new RoleManagerImpl(hashedWheelTimer, config, executorService);
272
273         contextChainHolder = new ContextChainHolderImpl(
274                 executorService,
275                 singletonServicesProvider,
276                 entityOwnershipService,
277                 mastershipChangeServiceManager,
278                 config);
279
280         contextChainHolder.addManager(deviceManager);
281         contextChainHolder.addManager(statisticsManager);
282         contextChainHolder.addManager(rpcManager);
283         contextChainHolder.addManager(roleManager);
284
285         connectionManager = new ConnectionManagerImpl(config, executorService, dataBroker, notificationPublishService);
286         connectionManager.setDeviceConnectedHandler(contextChainHolder);
287         connectionManager.setDeviceDisconnectedHandler(contextChainHolder);
288
289         deviceManager.setContextChainHolder(contextChainHolder);
290         deviceManager.initialize();
291         systemReadyMonitor.registerListener(this);
292         LOG.info("registered onSystemBootReady() listener for OpenFlowPluginProvider");
293     }
294
295     @Override
296     public ExtensionConverterRegistrator getExtensionConverterRegistrator() {
297         return extensionConverterManager;
298     }
299
300     @Override
301     public Map<NodeId, FlowGroupInfoHistory> getAllFlowGroupHistories() {
302         return deviceManager.getAllFlowGroupHistories();
303     }
304
305     @Override
306     public FlowGroupInfoHistory getFlowGroupHistory(final NodeId nodeId) {
307         return deviceManager.getFlowGroupHistory(nodeId);
308     }
309
310     @Override
311     @PreDestroy
312     @SuppressWarnings("checkstyle:IllegalCatch")
313     public void close() {
314         try {
315             shutdownSwitchConnections().get(10, TimeUnit.SECONDS);
316         } catch (InterruptedException | ExecutionException | TimeoutException e) {
317             LOG.warn("Failed to shut down switch connections in time {}s", 10, e);
318         }
319
320         gracefulShutdown(contextChainHolder);
321         gracefulShutdown(connectionManager);
322         gracefulShutdown(deviceManager);
323         gracefulShutdown(rpcManager);
324         gracefulShutdown(statisticsManager);
325         gracefulShutdown(roleManager);
326         gracefulShutdown(executorService);
327         gracefulShutdown(hashedWheelTimer);
328         unregisterMXBean(MESSAGE_INTELLIGENCE_AGENCY_MX_BEAN_NAME);
329         diagStatusProvider.reportStatus(ServiceState.UNREGISTERED);
330         try {
331             if (connectionManager != null) {
332                 connectionManager.close();
333                 connectionManager = null;
334             }
335         } catch (Exception e) {
336             LOG.error("Failed to close ConnectionManager", e);
337         }
338     }
339
340     @SuppressWarnings("checkstyle:IllegalCatch")
341     private static void gracefulShutdown(final AutoCloseable closeable) {
342         if (closeable != null) {
343             try {
344                 closeable.close();
345             } catch (Exception e) {
346                 LOG.warn("Failed to shutdown {} gracefully.", closeable);
347             }
348         }
349     }
350
351     private static void gracefulShutdown(final Timer timer) {
352         if (timer != null) {
353             try {
354                 timer.stop();
355             } catch (IllegalStateException e) {
356                 LOG.warn("Failed to shutdown {} gracefully.", timer);
357             }
358         }
359     }
360
361     private static void gracefulShutdown(final ExecutorService executorService) {
362         if (executorService != null) {
363             try {
364                 executorService.shutdownNow();
365             } catch (SecurityException e) {
366                 LOG.warn("Failed to shutdown {} gracefully.", executorService);
367             }
368         }
369     }
370
371     private static void registerMXBean(final Object bean, final String beanName) {
372         final MBeanServer mbs = ManagementFactory.getPlatformMBeanServer();
373
374         try {
375             mbs.registerMBean(bean, new ObjectName(beanName));
376         } catch (MalformedObjectNameException
377                 | NotCompliantMBeanException
378                 | MBeanRegistrationException
379                 | InstanceAlreadyExistsException e) {
380             LOG.warn("Error registering MBean {}", beanName, e);
381         }
382     }
383
384     private static void unregisterMXBean(final String beanName) {
385         final MBeanServer mbs = ManagementFactory.getPlatformMBeanServer();
386
387         try {
388             mbs.unregisterMBean(new ObjectName(beanName));
389         } catch (InstanceNotFoundException
390                 | MBeanRegistrationException
391                 | MalformedObjectNameException e) {
392             LOG.warn("Error unregistering MBean {}", beanName, e);
393         }
394     }
395 }