Refactor ShutdownProvider.shutdown()
[openflowplugin.git] / openflowplugin-impl / src / main / java / org / opendaylight / openflowplugin / impl / OpenFlowPluginProviderImpl.java
1 /*
2  * Copyright (c) 2015 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.openflowplugin.impl;
9
10 import static java.util.Objects.requireNonNull;
11
12 import com.google.common.annotations.VisibleForTesting;
13 import com.google.common.util.concurrent.FutureCallback;
14 import com.google.common.util.concurrent.Futures;
15 import com.google.common.util.concurrent.ListenableFuture;
16 import com.google.common.util.concurrent.MoreExecutors;
17 import com.google.common.util.concurrent.SettableFuture;
18 import io.netty.util.HashedWheelTimer;
19 import io.netty.util.Timer;
20 import java.util.List;
21 import java.util.Map;
22 import java.util.concurrent.ExecutionException;
23 import java.util.concurrent.ExecutorService;
24 import java.util.concurrent.Future;
25 import java.util.concurrent.SynchronousQueue;
26 import java.util.concurrent.TimeUnit;
27 import java.util.concurrent.TimeoutException;
28 import java.util.stream.Collectors;
29 import javax.annotation.PreDestroy;
30 import javax.inject.Inject;
31 import javax.inject.Singleton;
32 import org.opendaylight.infrautils.diagstatus.ServiceState;
33 import org.opendaylight.infrautils.ready.SystemReadyListener;
34 import org.opendaylight.infrautils.ready.SystemReadyMonitor;
35 import org.opendaylight.mdsal.binding.api.DataBroker;
36 import org.opendaylight.mdsal.binding.api.NotificationPublishService;
37 import org.opendaylight.mdsal.binding.api.RpcProviderService;
38 import org.opendaylight.mdsal.eos.binding.api.EntityOwnershipService;
39 import org.opendaylight.mdsal.singleton.api.ClusterSingletonServiceProvider;
40 import org.opendaylight.openflowjava.protocol.spi.connection.SwitchConnectionProvider;
41 import org.opendaylight.openflowplugin.api.openflow.FlowGroupInfoHistories;
42 import org.opendaylight.openflowplugin.api.openflow.FlowGroupInfoHistory;
43 import org.opendaylight.openflowplugin.api.openflow.configuration.ConfigurationService;
44 import org.opendaylight.openflowplugin.api.openflow.connection.ConnectionManager;
45 import org.opendaylight.openflowplugin.api.openflow.device.DeviceManager;
46 import org.opendaylight.openflowplugin.api.openflow.mastership.MastershipChangeServiceManager;
47 import org.opendaylight.openflowplugin.api.openflow.role.RoleManager;
48 import org.opendaylight.openflowplugin.api.openflow.rpc.RpcManager;
49 import org.opendaylight.openflowplugin.api.openflow.statistics.StatisticsManager;
50 import org.opendaylight.openflowplugin.api.openflow.statistics.ofpspecific.MessageIntelligenceAgency;
51 import org.opendaylight.openflowplugin.extension.api.ExtensionConverterRegistrator;
52 import org.opendaylight.openflowplugin.extension.api.OpenFlowPluginExtensionRegistratorProvider;
53 import org.opendaylight.openflowplugin.extension.api.core.extension.ExtensionConverterManager;
54 import org.opendaylight.openflowplugin.impl.configuration.OpenFlowProviderConfigImpl;
55 import org.opendaylight.openflowplugin.impl.connection.ConnectionManagerImpl;
56 import org.opendaylight.openflowplugin.impl.device.DeviceManagerImpl;
57 import org.opendaylight.openflowplugin.impl.device.initialization.DeviceInitializerProvider;
58 import org.opendaylight.openflowplugin.impl.device.initialization.DeviceInitializerProviderFactory;
59 import org.opendaylight.openflowplugin.impl.lifecycle.ContextChainHolderImpl;
60 import org.opendaylight.openflowplugin.impl.protocol.deserialization.DeserializerInjector;
61 import org.opendaylight.openflowplugin.impl.protocol.serialization.SerializerInjector;
62 import org.opendaylight.openflowplugin.impl.role.RoleManagerImpl;
63 import org.opendaylight.openflowplugin.impl.rpc.RpcManagerImpl;
64 import org.opendaylight.openflowplugin.impl.statistics.StatisticsManagerImpl;
65 import org.opendaylight.openflowplugin.impl.util.ThreadPoolLoggingExecutor;
66 import org.opendaylight.openflowplugin.impl.util.TranslatorLibraryUtil;
67 import org.opendaylight.openflowplugin.openflow.md.core.extension.ExtensionConverterManagerImpl;
68 import org.opendaylight.openflowplugin.openflow.md.core.sal.convertor.ConvertorManager;
69 import org.opendaylight.openflowplugin.openflow.md.core.sal.convertor.ConvertorManagerFactory;
70 import org.opendaylight.openflowplugin.openflow.md.core.session.OFSessionUtil;
71 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeId;
72 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.openflow.provider.config.rev160510.OpenflowProviderConfig;
73 import org.osgi.service.component.annotations.Activate;
74 import org.osgi.service.component.annotations.Component;
75 import org.osgi.service.component.annotations.Deactivate;
76 import org.osgi.service.component.annotations.Reference;
77 import org.osgi.service.component.annotations.ReferenceCardinality;
78 import org.osgi.service.component.annotations.ReferencePolicyOption;
79 import org.slf4j.Logger;
80 import org.slf4j.LoggerFactory;
81
82 @Singleton
83 @Component(immediate = true, service = {
84     OpenFlowPluginExtensionRegistratorProvider.class,
85     FlowGroupInfoHistories.class
86 })
87 public final class OpenFlowPluginProviderImpl
88         implements OpenFlowPluginExtensionRegistratorProvider, FlowGroupInfoHistories, SystemReadyListener,
89                    AutoCloseable {
90     private static final Logger LOG = LoggerFactory.getLogger(OpenFlowPluginProviderImpl.class);
91
92     private static final int TICKS_PER_WHEEL = 500; // 0.5 sec.
93     private static final long TICK_DURATION = 10;
94     private static final String POOL_NAME = "ofppool";
95
96     // TODO: Split this out into a separate component, which requires proper timer cancellation from all users. But is
97     //       that worth the complications?
98     private final HashedWheelTimer hashedWheelTimer =
99             new HashedWheelTimer(TICK_DURATION, TimeUnit.MILLISECONDS, TICKS_PER_WHEEL);
100     private final ExtensionConverterManager extensionConverterManager;
101     private final List<SwitchConnectionProvider> switchConnectionProviders;
102     private final DeviceInitializerProvider deviceInitializerProvider;
103     private final ConvertorManager convertorManager;
104     private final OpenflowProviderConfig config;
105     private final DeviceManager deviceManager;
106     private final RpcManager rpcManager;
107     private final StatisticsManager statisticsManager;
108     private final RoleManager roleManager;
109     private final ExecutorService executorService;
110     private final ContextChainHolderImpl contextChainHolder;
111     private final DiagStatusProvider diagStatusProvider;
112     private final SettableFuture<Void> fullyStarted = SettableFuture.create();
113
114     private ConnectionManager connectionManager;
115
116     @Inject
117     @Activate
118     public OpenFlowPluginProviderImpl(@Reference final ConfigurationService configurationService,
119             @Reference(cardinality = ReferenceCardinality.AT_LEAST_ONE, policyOption = ReferencePolicyOption.GREEDY)
120             final List<SwitchConnectionProvider> switchConnectionProviders,
121             @Reference final DataBroker dataBroker, @Reference final RpcProviderService rpcProviderRegistry,
122             @Reference final NotificationPublishService notificationPublishService,
123             @Reference final ClusterSingletonServiceProvider singletonServiceProvider,
124             @Reference final EntityOwnershipService entityOwnershipService,
125             @Reference final MastershipChangeServiceManager mastershipChangeServiceManager,
126             @Reference final MessageIntelligenceAgency messageIntelligenceAgency,
127             @Reference final DiagStatusProvider diagStatusProvider,
128             @Reference final SystemReadyMonitor systemReadyMonitor) {
129         config = new OpenFlowProviderConfigImpl(configurationService);
130         this.switchConnectionProviders = List.copyOf(switchConnectionProviders);
131         final var ppdb = new PingPongDataBroker(dataBroker);
132         this.diagStatusProvider = requireNonNull(diagStatusProvider);
133
134         convertorManager = ConvertorManagerFactory.createDefaultManager();
135         extensionConverterManager = new ExtensionConverterManagerImpl();
136         deviceInitializerProvider = DeviceInitializerProviderFactory.createDefaultProvider();
137
138         // TODO: copied from OpenFlowPluginProvider (Helium) misusesing the old way of distributing extension converters
139         // TODO: rewrite later!
140         OFSessionUtil.getSessionManager().setExtensionConverterProvider(extensionConverterManager);
141
142         // Creates a thread pool that creates new threads as needed, but will reuse previously
143         // constructed threads when they are available.
144         // Threads that have not been used for x seconds are terminated and removed from the cache.
145         executorService = new ThreadPoolLoggingExecutor(
146                 config.getThreadPoolMinThreads().toJava(),
147                 config.getThreadPoolMaxThreads().getValue().toJava(),
148                 config.getThreadPoolTimeout().toJava(),
149                 TimeUnit.SECONDS, new SynchronousQueue<>(), POOL_NAME);
150
151         final var devMgr = new DeviceManagerImpl(
152                 config,
153                 ppdb,
154                 messageIntelligenceAgency,
155                 notificationPublishService,
156                 hashedWheelTimer,
157                 convertorManager,
158                 deviceInitializerProvider,
159                 executorService);
160         deviceManager = devMgr;
161
162         TranslatorLibraryUtil.injectBasicTranslatorLibrary(deviceManager, convertorManager);
163         devMgr.setExtensionConverterProvider(extensionConverterManager);
164
165         rpcManager = new RpcManagerImpl(
166                 config,
167                 rpcProviderRegistry,
168                 extensionConverterManager,
169                 convertorManager,
170                 notificationPublishService);
171
172         statisticsManager = new StatisticsManagerImpl(
173                 config,
174                 rpcProviderRegistry,
175                 convertorManager,
176                 executorService);
177
178         roleManager = new RoleManagerImpl(hashedWheelTimer, config, executorService);
179
180         contextChainHolder = new ContextChainHolderImpl(
181                 executorService,
182                 singletonServiceProvider,
183                 entityOwnershipService,
184                 mastershipChangeServiceManager,
185                 config);
186
187         contextChainHolder.addManager(deviceManager);
188         contextChainHolder.addManager(statisticsManager);
189         contextChainHolder.addManager(rpcManager);
190         contextChainHolder.addManager(roleManager);
191
192         connectionManager = new ConnectionManagerImpl(config, executorService, ppdb, notificationPublishService);
193         connectionManager.setDeviceConnectedHandler(contextChainHolder);
194         connectionManager.setDeviceDisconnectedHandler(contextChainHolder);
195
196         deviceManager.setContextChainHolder(contextChainHolder);
197         deviceManager.initialize();
198         systemReadyMonitor.registerListener(this);
199         LOG.info("registered onSystemBootReady() listener for OpenFlowPluginProvider");
200     }
201
202     @Override
203     public void onSystemBootReady() {
204         LOG.info("onSystemBootReady() received, starting the switch connections");
205         Futures.addCallback(Futures.allAsList(switchConnectionProviders.stream().map(switchConnectionProvider -> {
206             // Inject OpenFlowPlugin custom serializers and deserializers into OpenFlowJava
207             if (config.getUseSingleLayerSerialization()) {
208                 SerializerInjector.injectSerializers(switchConnectionProvider,
209                         switchConnectionProvider.getConfiguration().isGroupAddModEnabled());
210                 DeserializerInjector.injectDeserializers(switchConnectionProvider);
211             } else {
212                 DeserializerInjector.revertDeserializers(switchConnectionProvider);
213             }
214
215             // Set handler of incoming connections and start switch connection provider
216             return switchConnectionProvider.startup(connectionManager);
217         }).collect(Collectors.toSet())), new FutureCallback<List<Void>>() {
218             @Override
219             public void onSuccess(final List<Void> result) {
220                 LOG.info("All switchConnectionProviders are up and running ({}).", result.size());
221                 diagStatusProvider.reportStatus(ServiceState.OPERATIONAL);
222                 fullyStarted.set(null);
223             }
224
225             @Override
226             public void onFailure(final Throwable throwable) {
227                 LOG.warn("Some switchConnectionProviders failed to start.", throwable);
228                 diagStatusProvider.reportStatus(ServiceState.ERROR, throwable);
229                 fullyStarted.setException(throwable);
230             }
231         }, MoreExecutors.directExecutor());
232     }
233
234     @VisibleForTesting
235     public Future<Void> getFullyStarted() {
236         return fullyStarted;
237     }
238
239     private ListenableFuture<List<Void>> shutdownSwitchConnections() {
240         final var listListenableFuture = Futures.allAsList(switchConnectionProviders.stream()
241             .map(switchConnectionProvider -> {
242                 // Revert deserializers to their original state
243                 if (config.getUseSingleLayerSerialization()) {
244                     DeserializerInjector.revertDeserializers(switchConnectionProvider);
245                 }
246
247                 // Shutdown switch connection provider
248                 return switchConnectionProvider.shutdown();
249             }).collect(Collectors.toList()));
250
251         Futures.addCallback(listListenableFuture, new FutureCallback<>() {
252             @Override
253             public void onSuccess(final List<Void> result) {
254                 LOG.info("All switchConnectionProviders were successfully shut down ({}).", result.size());
255             }
256
257             @Override
258             public void onFailure(final Throwable throwable) {
259                 LOG.warn("Some switchConnectionProviders failed to shutdown.", throwable);
260             }
261         }, MoreExecutors.directExecutor());
262
263         return listListenableFuture;
264     }
265
266     @Override
267     public ExtensionConverterRegistrator getExtensionConverterRegistrator() {
268         return extensionConverterManager;
269     }
270
271     @Override
272     public Map<NodeId, FlowGroupInfoHistory> getAllFlowGroupHistories() {
273         return deviceManager.getAllFlowGroupHistories();
274     }
275
276     @Override
277     public FlowGroupInfoHistory getFlowGroupHistory(final NodeId nodeId) {
278         return deviceManager.getFlowGroupHistory(nodeId);
279     }
280
281     @Override
282     @PreDestroy
283     @Deactivate
284     @SuppressWarnings("checkstyle:IllegalCatch")
285     public void close() {
286         try {
287             shutdownSwitchConnections().get(10, TimeUnit.SECONDS);
288         } catch (InterruptedException | ExecutionException | TimeoutException e) {
289             LOG.warn("Failed to shut down switch connections in time {}s", 10, e);
290         }
291
292         gracefulShutdown(contextChainHolder);
293         gracefulShutdown(connectionManager);
294         gracefulShutdown(deviceManager);
295         gracefulShutdown(rpcManager);
296         gracefulShutdown(statisticsManager);
297         gracefulShutdown(roleManager);
298         gracefulShutdown(executorService);
299         gracefulShutdown(hashedWheelTimer);
300         diagStatusProvider.reportStatus(ServiceState.UNREGISTERED);
301         try {
302             if (connectionManager != null) {
303                 connectionManager.close();
304                 connectionManager = null;
305             }
306         } catch (Exception e) {
307             LOG.error("Failed to close ConnectionManager", e);
308         }
309     }
310
311     @SuppressWarnings("checkstyle:IllegalCatch")
312     private static void gracefulShutdown(final AutoCloseable closeable) {
313         if (closeable != null) {
314             try {
315                 closeable.close();
316             } catch (Exception e) {
317                 LOG.warn("Failed to shutdown {} gracefully.", closeable);
318             }
319         }
320     }
321
322     private static void gracefulShutdown(final Timer timer) {
323         if (timer != null) {
324             try {
325                 timer.stop();
326             } catch (IllegalStateException e) {
327                 LOG.warn("Failed to shutdown {} gracefully.", timer);
328             }
329         }
330     }
331
332     private static void gracefulShutdown(final ExecutorService executorService) {
333         if (executorService != null) {
334             try {
335                 executorService.shutdownNow();
336             } catch (SecurityException e) {
337                 LOG.warn("Failed to shutdown {} gracefully.", executorService);
338             }
339         }
340     }
341
342
343 }