Use ByteBuf.readRetainedSlice()
[openflowplugin.git] / openflowplugin-impl / src / main / java / org / opendaylight / openflowplugin / impl / OpenFlowPluginProviderImpl.java
1 /*
2  * Copyright (c) 2015 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.openflowplugin.impl;
9
10 import static java.util.Objects.requireNonNull;
11
12 import com.google.common.util.concurrent.FutureCallback;
13 import com.google.common.util.concurrent.Futures;
14 import com.google.common.util.concurrent.ListenableFuture;
15 import com.google.common.util.concurrent.MoreExecutors;
16 import io.netty.util.HashedWheelTimer;
17 import io.netty.util.Timer;
18 import java.util.ArrayList;
19 import java.util.List;
20 import java.util.Map;
21 import java.util.concurrent.ExecutionException;
22 import java.util.concurrent.ExecutorService;
23 import java.util.concurrent.SynchronousQueue;
24 import java.util.concurrent.TimeUnit;
25 import java.util.concurrent.TimeoutException;
26 import java.util.stream.Collectors;
27 import javax.annotation.PreDestroy;
28 import javax.inject.Inject;
29 import javax.inject.Singleton;
30 import org.opendaylight.infrautils.diagstatus.ServiceState;
31 import org.opendaylight.infrautils.ready.SystemReadyListener;
32 import org.opendaylight.infrautils.ready.SystemReadyMonitor;
33 import org.opendaylight.mdsal.binding.api.DataBroker;
34 import org.opendaylight.mdsal.binding.api.NotificationPublishService;
35 import org.opendaylight.mdsal.binding.api.RpcProviderService;
36 import org.opendaylight.mdsal.eos.binding.api.EntityOwnershipService;
37 import org.opendaylight.mdsal.singleton.api.ClusterSingletonServiceProvider;
38 import org.opendaylight.openflowjava.protocol.spi.connection.SwitchConnectionProvider;
39 import org.opendaylight.openflowplugin.api.openflow.FlowGroupInfoHistories;
40 import org.opendaylight.openflowplugin.api.openflow.FlowGroupInfoHistory;
41 import org.opendaylight.openflowplugin.api.openflow.configuration.ConfigurationService;
42 import org.opendaylight.openflowplugin.api.openflow.connection.ConnectionManager;
43 import org.opendaylight.openflowplugin.api.openflow.device.DeviceManager;
44 import org.opendaylight.openflowplugin.api.openflow.mastership.MastershipChangeServiceManager;
45 import org.opendaylight.openflowplugin.api.openflow.role.RoleManager;
46 import org.opendaylight.openflowplugin.api.openflow.rpc.RpcManager;
47 import org.opendaylight.openflowplugin.api.openflow.statistics.StatisticsManager;
48 import org.opendaylight.openflowplugin.api.openflow.statistics.ofpspecific.MessageIntelligenceAgency;
49 import org.opendaylight.openflowplugin.extension.api.ExtensionConverterRegistrator;
50 import org.opendaylight.openflowplugin.extension.api.OpenFlowPluginExtensionRegistratorProvider;
51 import org.opendaylight.openflowplugin.extension.api.core.extension.ExtensionConverterManager;
52 import org.opendaylight.openflowplugin.impl.configuration.OpenFlowProviderConfigImpl;
53 import org.opendaylight.openflowplugin.impl.connection.ConnectionManagerImpl;
54 import org.opendaylight.openflowplugin.impl.device.DeviceManagerImpl;
55 import org.opendaylight.openflowplugin.impl.device.initialization.DeviceInitializerProvider;
56 import org.opendaylight.openflowplugin.impl.device.initialization.DeviceInitializerProviderFactory;
57 import org.opendaylight.openflowplugin.impl.lifecycle.ContextChainHolderImpl;
58 import org.opendaylight.openflowplugin.impl.protocol.deserialization.DeserializerInjector;
59 import org.opendaylight.openflowplugin.impl.protocol.serialization.SerializerInjector;
60 import org.opendaylight.openflowplugin.impl.role.RoleManagerImpl;
61 import org.opendaylight.openflowplugin.impl.rpc.RpcManagerImpl;
62 import org.opendaylight.openflowplugin.impl.statistics.StatisticsManagerImpl;
63 import org.opendaylight.openflowplugin.impl.util.ThreadPoolLoggingExecutor;
64 import org.opendaylight.openflowplugin.impl.util.TranslatorLibraryUtil;
65 import org.opendaylight.openflowplugin.openflow.md.core.extension.ExtensionConverterManagerImpl;
66 import org.opendaylight.openflowplugin.openflow.md.core.sal.convertor.ConvertorManager;
67 import org.opendaylight.openflowplugin.openflow.md.core.sal.convertor.ConvertorManagerFactory;
68 import org.opendaylight.openflowplugin.openflow.md.core.session.OFSessionUtil;
69 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeId;
70 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.openflow.provider.config.rev160510.OpenflowProviderConfig;
71 import org.osgi.service.component.annotations.Activate;
72 import org.osgi.service.component.annotations.Component;
73 import org.osgi.service.component.annotations.Deactivate;
74 import org.osgi.service.component.annotations.Reference;
75 import org.osgi.service.component.annotations.ReferenceCardinality;
76 import org.osgi.service.component.annotations.ReferencePolicy;
77 import org.osgi.service.component.annotations.ReferencePolicyOption;
78 import org.slf4j.Logger;
79 import org.slf4j.LoggerFactory;
80
81 @Singleton
82 @Component(immediate = true, service = {
83     OpenFlowPluginExtensionRegistratorProvider.class,
84     FlowGroupInfoHistories.class
85 })
86 public final class OpenFlowPluginProviderImpl
87         implements OpenFlowPluginExtensionRegistratorProvider, FlowGroupInfoHistories, SystemReadyListener,
88                    AutoCloseable {
89     private static final Logger LOG = LoggerFactory.getLogger(OpenFlowPluginProviderImpl.class);
90
91     private static final int TICKS_PER_WHEEL = 500; // 0.5 sec.
92     private static final long TICK_DURATION = 10;
93     private static final String POOL_NAME = "ofppool";
94
95     // TODO: Split this out into a separate component, which requires proper timer cancellation from all users. But is
96     //       that worth the complications?
97     private final HashedWheelTimer hashedWheelTimer =
98             new HashedWheelTimer(TICK_DURATION, TimeUnit.MILLISECONDS, TICKS_PER_WHEEL);
99     private final ExtensionConverterManager extensionConverterManager;
100     private final DeviceInitializerProvider deviceInitializerProvider;
101     private final ConvertorManager convertorManager;
102     private final OpenflowProviderConfig config;
103     private final DeviceManager deviceManager;
104     private final RpcManager rpcManager;
105     private final StatisticsManager statisticsManager;
106     private final RoleManager roleManager;
107     private final ExecutorService executorService;
108     private final ContextChainHolderImpl contextChainHolder;
109     private final DiagStatusProvider diagStatusProvider;
110
111     private final List<SwitchConnectionProvider> connectionProviders = new ArrayList<>();
112
113     private List<SwitchConnectionProvider> startedProviders;
114     private ConnectionManager connectionManager;
115     private int startingProviders;
116
117     @Inject
118     @Activate
119     public OpenFlowPluginProviderImpl(@Reference final ConfigurationService configurationService,
120             @Reference final DataBroker dataBroker, @Reference final RpcProviderService rpcProviderRegistry,
121             @Reference final NotificationPublishService notificationPublishService,
122             @Reference final ClusterSingletonServiceProvider singletonServiceProvider,
123             @Reference final EntityOwnershipService entityOwnershipService,
124             @Reference final MastershipChangeServiceManager mastershipChangeServiceManager,
125             @Reference final MessageIntelligenceAgency messageIntelligenceAgency,
126             @Reference final DiagStatusProvider diagStatusProvider,
127             @Reference final SystemReadyMonitor systemReadyMonitor) {
128         config = new OpenFlowProviderConfigImpl(configurationService);
129         final var ppdb = new PingPongDataBroker(dataBroker);
130         this.diagStatusProvider = requireNonNull(diagStatusProvider);
131
132         convertorManager = ConvertorManagerFactory.createDefaultManager();
133         extensionConverterManager = new ExtensionConverterManagerImpl();
134         deviceInitializerProvider = DeviceInitializerProviderFactory.createDefaultProvider();
135
136         // TODO: copied from OpenFlowPluginProvider (Helium) misusesing the old way of distributing extension converters
137         // TODO: rewrite later!
138         OFSessionUtil.getSessionManager().setExtensionConverterProvider(extensionConverterManager);
139
140         // Creates a thread pool that creates new threads as needed, but will reuse previously
141         // constructed threads when they are available.
142         // Threads that have not been used for x seconds are terminated and removed from the cache.
143         executorService = new ThreadPoolLoggingExecutor(
144                 config.getThreadPoolMinThreads().toJava(),
145                 config.getThreadPoolMaxThreads().getValue().toJava(),
146                 config.getThreadPoolTimeout().toJava(),
147                 TimeUnit.SECONDS, new SynchronousQueue<>(), POOL_NAME);
148
149         final var devMgr = new DeviceManagerImpl(
150                 config,
151                 ppdb,
152                 messageIntelligenceAgency,
153                 notificationPublishService,
154                 hashedWheelTimer,
155                 convertorManager,
156                 deviceInitializerProvider,
157                 executorService);
158         deviceManager = devMgr;
159
160         TranslatorLibraryUtil.injectBasicTranslatorLibrary(deviceManager, convertorManager);
161         devMgr.setExtensionConverterProvider(extensionConverterManager);
162
163         rpcManager = new RpcManagerImpl(
164                 config,
165                 rpcProviderRegistry,
166                 extensionConverterManager,
167                 convertorManager,
168                 notificationPublishService);
169
170         statisticsManager = new StatisticsManagerImpl(
171                 config,
172                 rpcProviderRegistry,
173                 convertorManager,
174                 executorService);
175
176         roleManager = new RoleManagerImpl(hashedWheelTimer, config, executorService);
177
178         contextChainHolder = new ContextChainHolderImpl(
179                 executorService,
180                 singletonServiceProvider,
181                 entityOwnershipService,
182                 mastershipChangeServiceManager,
183                 config);
184
185         contextChainHolder.addManager(deviceManager);
186         contextChainHolder.addManager(statisticsManager);
187         contextChainHolder.addManager(rpcManager);
188         contextChainHolder.addManager(roleManager);
189
190         connectionManager = new ConnectionManagerImpl(config, executorService, ppdb, notificationPublishService);
191         connectionManager.setDeviceConnectedHandler(contextChainHolder);
192         connectionManager.setDeviceDisconnectedHandler(contextChainHolder);
193
194         deviceManager.setContextChainHolder(contextChainHolder);
195         deviceManager.initialize();
196         systemReadyMonitor.registerListener(this);
197         LOG.info("OpenFlowPluginProvider started, waiting for onSystemBootReady()");
198     }
199
200     @Reference(cardinality = ReferenceCardinality.AT_LEAST_ONE,
201                policy = ReferencePolicy.DYNAMIC, policyOption = ReferencePolicyOption.GREEDY)
202     public synchronized void bindConnectionProvider(final SwitchConnectionProvider switchConnectionProvider) {
203         connectionProviders.add(switchConnectionProvider);
204         LOG.info("Added connection provider {}", switchConnectionProvider);
205
206         if (startedProviders != null) {
207             LOG.info("Starting latecomer connection provider {}", switchConnectionProvider);
208             startingProviders += 1;
209             startProvider(switchConnectionProvider);
210         }
211     }
212
213     public synchronized void unbindConnectionProvider(final SwitchConnectionProvider switchConnectionProvider) {
214         connectionProviders.remove(switchConnectionProvider);
215         if (startedProviders != null && startedProviders.remove(switchConnectionProvider)) {
216             switchConnectionProvider.shutdown();
217         }
218         LOG.info("Removed connection provider {}", switchConnectionProvider);
219     }
220
221     private ListenableFuture<Void> startProvider(final SwitchConnectionProvider provider) {
222         // Inject OpenFlowPlugin custom serializers and deserializers into OpenFlowJava
223         if (config.getUseSingleLayerSerialization()) {
224             SerializerInjector.injectSerializers(provider,  provider.getConfiguration().isGroupAddModEnabled());
225             DeserializerInjector.injectDeserializers(provider);
226         } else {
227             DeserializerInjector.revertDeserializers(provider);
228         }
229
230         // Set handler of incoming connections and start switch connection provider
231         final var future = provider.startup(connectionManager);
232         startedProviders.add(provider);
233         Futures.addCallback(future, new FutureCallback<>() {
234             @Override
235             public void onSuccess(final Void result) {
236                 LOG.info("Connection provider {} started", provider);
237                 connectionStarted();
238             }
239
240             @Override
241             public void onFailure(final Throwable cause) {
242                 LOG.warn("Connection provider {} failed to start", provider, cause);
243                 connectionFailed(cause);
244             }
245         }, MoreExecutors.directExecutor());
246         return future;
247     }
248
249     @Override
250     public synchronized void onSystemBootReady() {
251         LOG.info("onSystemBootReady() received, starting the switch connections");
252
253         final var size = connectionProviders.size();
254         startedProviders = new ArrayList<>(size);
255         startingProviders = size;
256         connectionProviders.forEach(this::startProvider);
257     }
258
259     private synchronized void connectionFailed(final Throwable cause) {
260         // Decrement below zero, so we do not arrive to zero
261         startingProviders = -1;
262         diagStatusProvider.reportStatus(ServiceState.ERROR, cause);
263     }
264
265     private synchronized void connectionStarted() {
266         if (--startingProviders == 0 && startedProviders.equals(connectionProviders)) {
267             LOG.info("All switchConnectionProviders are up and running ({}).", startedProviders.size());
268             diagStatusProvider.reportStatus(ServiceState.OPERATIONAL);
269         }
270     }
271
272     private ListenableFuture<List<Void>> shutdownSwitchConnections() {
273         final var future = Futures.allAsList(startedProviders.stream()
274             .map(switchConnectionProvider -> {
275                 // Revert deserializers to their original state
276                 if (config.getUseSingleLayerSerialization()) {
277                     DeserializerInjector.revertDeserializers(switchConnectionProvider);
278                 }
279
280                 // Shutdown switch connection provider
281                 return switchConnectionProvider.shutdown();
282             }).collect(Collectors.toList()));
283         startedProviders.clear();
284
285         Futures.addCallback(future, new FutureCallback<>() {
286             @Override
287             public void onSuccess(final List<Void> result) {
288                 LOG.info("All switchConnectionProviders were successfully shut down ({}).", result.size());
289             }
290
291             @Override
292             public void onFailure(final Throwable throwable) {
293                 LOG.warn("Some switchConnectionProviders failed to shutdown.", throwable);
294             }
295         }, MoreExecutors.directExecutor());
296
297         return future;
298     }
299
300     @Override
301     public ExtensionConverterRegistrator getExtensionConverterRegistrator() {
302         return extensionConverterManager;
303     }
304
305     @Override
306     public Map<NodeId, FlowGroupInfoHistory> getAllFlowGroupHistories() {
307         return deviceManager.getAllFlowGroupHistories();
308     }
309
310     @Override
311     public FlowGroupInfoHistory getFlowGroupHistory(final NodeId nodeId) {
312         return deviceManager.getFlowGroupHistory(nodeId);
313     }
314
315     @Override
316     @PreDestroy
317     @Deactivate
318     @SuppressWarnings("checkstyle:IllegalCatch")
319     public synchronized void close() {
320         LOG.info("OpenFlowPluginProvider stopping");
321         try {
322             shutdownSwitchConnections().get(10, TimeUnit.SECONDS);
323         } catch (InterruptedException | ExecutionException | TimeoutException e) {
324             LOG.warn("Failed to shut down switch connections in time {}s", 10, e);
325         }
326
327         gracefulShutdown(contextChainHolder);
328         gracefulShutdown(connectionManager);
329         gracefulShutdown(deviceManager);
330         gracefulShutdown(rpcManager);
331         gracefulShutdown(statisticsManager);
332         gracefulShutdown(roleManager);
333         gracefulShutdown(executorService);
334         gracefulShutdown(hashedWheelTimer);
335         diagStatusProvider.reportStatus(ServiceState.UNREGISTERED);
336         try {
337             if (connectionManager != null) {
338                 connectionManager.close();
339                 connectionManager = null;
340             }
341         } catch (Exception e) {
342             LOG.error("Failed to close ConnectionManager", e);
343         }
344         LOG.info("OpenFlowPluginProvider stopped");
345     }
346
347     @SuppressWarnings("checkstyle:IllegalCatch")
348     private static void gracefulShutdown(final AutoCloseable closeable) {
349         if (closeable != null) {
350             try {
351                 closeable.close();
352             } catch (Exception e) {
353                 LOG.warn("Failed to shutdown {} gracefully.", closeable);
354             }
355         }
356     }
357
358     private static void gracefulShutdown(final Timer timer) {
359         if (timer != null) {
360             try {
361                 timer.stop();
362             } catch (IllegalStateException e) {
363                 LOG.warn("Failed to shutdown {} gracefully.", timer);
364             }
365         }
366     }
367
368     private static void gracefulShutdown(final ExecutorService executorService) {
369         if (executorService != null) {
370             try {
371                 executorService.shutdownNow();
372             } catch (SecurityException e) {
373                 LOG.warn("Failed to shutdown {} gracefully.", executorService);
374             }
375         }
376     }
377 }