2 * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.openflowplugin.impl;
10 import static java.util.Objects.requireNonNull;
12 import com.google.common.annotations.VisibleForTesting;
13 import com.google.common.util.concurrent.FutureCallback;
14 import com.google.common.util.concurrent.Futures;
15 import com.google.common.util.concurrent.ListenableFuture;
16 import com.google.common.util.concurrent.MoreExecutors;
17 import com.google.common.util.concurrent.SettableFuture;
18 import io.netty.util.HashedWheelTimer;
19 import io.netty.util.Timer;
20 import java.util.List;
22 import java.util.concurrent.ExecutionException;
23 import java.util.concurrent.ExecutorService;
24 import java.util.concurrent.Future;
25 import java.util.concurrent.SynchronousQueue;
26 import java.util.concurrent.TimeUnit;
27 import java.util.concurrent.TimeoutException;
28 import java.util.stream.Collectors;
29 import javax.annotation.PreDestroy;
30 import javax.inject.Inject;
31 import javax.inject.Singleton;
32 import org.opendaylight.infrautils.diagstatus.ServiceState;
33 import org.opendaylight.infrautils.ready.SystemReadyListener;
34 import org.opendaylight.infrautils.ready.SystemReadyMonitor;
35 import org.opendaylight.mdsal.binding.api.DataBroker;
36 import org.opendaylight.mdsal.binding.api.NotificationPublishService;
37 import org.opendaylight.mdsal.binding.api.RpcProviderService;
38 import org.opendaylight.mdsal.eos.binding.api.EntityOwnershipService;
39 import org.opendaylight.mdsal.singleton.api.ClusterSingletonServiceProvider;
40 import org.opendaylight.openflowjava.protocol.spi.connection.SwitchConnectionProvider;
41 import org.opendaylight.openflowplugin.api.openflow.FlowGroupInfoHistories;
42 import org.opendaylight.openflowplugin.api.openflow.FlowGroupInfoHistory;
43 import org.opendaylight.openflowplugin.api.openflow.configuration.ConfigurationService;
44 import org.opendaylight.openflowplugin.api.openflow.connection.ConnectionManager;
45 import org.opendaylight.openflowplugin.api.openflow.device.DeviceManager;
46 import org.opendaylight.openflowplugin.api.openflow.mastership.MastershipChangeServiceManager;
47 import org.opendaylight.openflowplugin.api.openflow.role.RoleManager;
48 import org.opendaylight.openflowplugin.api.openflow.rpc.RpcManager;
49 import org.opendaylight.openflowplugin.api.openflow.statistics.StatisticsManager;
50 import org.opendaylight.openflowplugin.api.openflow.statistics.ofpspecific.MessageIntelligenceAgency;
51 import org.opendaylight.openflowplugin.extension.api.ExtensionConverterRegistrator;
52 import org.opendaylight.openflowplugin.extension.api.OpenFlowPluginExtensionRegistratorProvider;
53 import org.opendaylight.openflowplugin.extension.api.core.extension.ExtensionConverterManager;
54 import org.opendaylight.openflowplugin.impl.configuration.OpenFlowProviderConfigImpl;
55 import org.opendaylight.openflowplugin.impl.connection.ConnectionManagerImpl;
56 import org.opendaylight.openflowplugin.impl.device.DeviceManagerImpl;
57 import org.opendaylight.openflowplugin.impl.device.initialization.DeviceInitializerProvider;
58 import org.opendaylight.openflowplugin.impl.device.initialization.DeviceInitializerProviderFactory;
59 import org.opendaylight.openflowplugin.impl.lifecycle.ContextChainHolderImpl;
60 import org.opendaylight.openflowplugin.impl.protocol.deserialization.DeserializerInjector;
61 import org.opendaylight.openflowplugin.impl.protocol.serialization.SerializerInjector;
62 import org.opendaylight.openflowplugin.impl.role.RoleManagerImpl;
63 import org.opendaylight.openflowplugin.impl.rpc.RpcManagerImpl;
64 import org.opendaylight.openflowplugin.impl.statistics.StatisticsManagerImpl;
65 import org.opendaylight.openflowplugin.impl.util.ThreadPoolLoggingExecutor;
66 import org.opendaylight.openflowplugin.impl.util.TranslatorLibraryUtil;
67 import org.opendaylight.openflowplugin.openflow.md.core.extension.ExtensionConverterManagerImpl;
68 import org.opendaylight.openflowplugin.openflow.md.core.sal.convertor.ConvertorManager;
69 import org.opendaylight.openflowplugin.openflow.md.core.sal.convertor.ConvertorManagerFactory;
70 import org.opendaylight.openflowplugin.openflow.md.core.session.OFSessionUtil;
71 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeId;
72 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.openflow.provider.config.rev160510.OpenflowProviderConfig;
73 import org.osgi.service.component.annotations.Activate;
74 import org.osgi.service.component.annotations.Component;
75 import org.osgi.service.component.annotations.Deactivate;
76 import org.osgi.service.component.annotations.Reference;
77 import org.osgi.service.component.annotations.ReferenceCardinality;
78 import org.osgi.service.component.annotations.ReferencePolicyOption;
79 import org.slf4j.Logger;
80 import org.slf4j.LoggerFactory;
83 @Component(immediate = true, service = {
84 OpenFlowPluginExtensionRegistratorProvider.class,
85 FlowGroupInfoHistories.class
87 public final class OpenFlowPluginProviderImpl
88 implements OpenFlowPluginExtensionRegistratorProvider, FlowGroupInfoHistories, SystemReadyListener,
90 private static final Logger LOG = LoggerFactory.getLogger(OpenFlowPluginProviderImpl.class);
92 private static final int TICKS_PER_WHEEL = 500; // 0.5 sec.
93 private static final long TICK_DURATION = 10;
94 private static final String POOL_NAME = "ofppool";
96 // TODO: Split this out into a separate component, which requires proper timer cancellation from all users. But is
97 // that worth the complications?
98 private final HashedWheelTimer hashedWheelTimer =
99 new HashedWheelTimer(TICK_DURATION, TimeUnit.MILLISECONDS, TICKS_PER_WHEEL);
100 private final ExtensionConverterManager extensionConverterManager;
101 private final List<SwitchConnectionProvider> switchConnectionProviders;
102 private final DeviceInitializerProvider deviceInitializerProvider;
103 private final ConvertorManager convertorManager;
104 private final OpenflowProviderConfig config;
105 private final DeviceManager deviceManager;
106 private final RpcManager rpcManager;
107 private final StatisticsManager statisticsManager;
108 private final RoleManager roleManager;
109 private final ExecutorService executorService;
110 private final ContextChainHolderImpl contextChainHolder;
111 private final DiagStatusProvider diagStatusProvider;
112 private final SettableFuture<Void> fullyStarted = SettableFuture.create();
114 private ConnectionManager connectionManager;
118 public OpenFlowPluginProviderImpl(@Reference final ConfigurationService configurationService,
119 @Reference(cardinality = ReferenceCardinality.AT_LEAST_ONE, policyOption = ReferencePolicyOption.GREEDY)
120 final List<SwitchConnectionProvider> switchConnectionProviders,
121 @Reference final DataBroker dataBroker, @Reference final RpcProviderService rpcProviderRegistry,
122 @Reference final NotificationPublishService notificationPublishService,
123 @Reference final ClusterSingletonServiceProvider singletonServiceProvider,
124 @Reference final EntityOwnershipService entityOwnershipService,
125 @Reference final MastershipChangeServiceManager mastershipChangeServiceManager,
126 @Reference final MessageIntelligenceAgency messageIntelligenceAgency,
127 @Reference final DiagStatusProvider diagStatusProvider,
128 @Reference final SystemReadyMonitor systemReadyMonitor) {
129 config = new OpenFlowProviderConfigImpl(configurationService);
130 this.switchConnectionProviders = List.copyOf(switchConnectionProviders);
131 final var ppdb = new PingPongDataBroker(dataBroker);
132 this.diagStatusProvider = requireNonNull(diagStatusProvider);
134 convertorManager = ConvertorManagerFactory.createDefaultManager();
135 extensionConverterManager = new ExtensionConverterManagerImpl();
136 deviceInitializerProvider = DeviceInitializerProviderFactory.createDefaultProvider();
138 // TODO: copied from OpenFlowPluginProvider (Helium) misusesing the old way of distributing extension converters
139 // TODO: rewrite later!
140 OFSessionUtil.getSessionManager().setExtensionConverterProvider(extensionConverterManager);
142 // Creates a thread pool that creates new threads as needed, but will reuse previously
143 // constructed threads when they are available.
144 // Threads that have not been used for x seconds are terminated and removed from the cache.
145 executorService = new ThreadPoolLoggingExecutor(
146 config.getThreadPoolMinThreads().toJava(),
147 config.getThreadPoolMaxThreads().getValue().toJava(),
148 config.getThreadPoolTimeout().toJava(),
149 TimeUnit.SECONDS, new SynchronousQueue<>(), POOL_NAME);
151 final var devMgr = new DeviceManagerImpl(
154 messageIntelligenceAgency,
155 notificationPublishService,
158 deviceInitializerProvider,
160 deviceManager = devMgr;
162 TranslatorLibraryUtil.injectBasicTranslatorLibrary(deviceManager, convertorManager);
163 devMgr.setExtensionConverterProvider(extensionConverterManager);
165 rpcManager = new RpcManagerImpl(
168 extensionConverterManager,
170 notificationPublishService);
172 statisticsManager = new StatisticsManagerImpl(
178 roleManager = new RoleManagerImpl(hashedWheelTimer, config, executorService);
180 contextChainHolder = new ContextChainHolderImpl(
182 singletonServiceProvider,
183 entityOwnershipService,
184 mastershipChangeServiceManager,
187 contextChainHolder.addManager(deviceManager);
188 contextChainHolder.addManager(statisticsManager);
189 contextChainHolder.addManager(rpcManager);
190 contextChainHolder.addManager(roleManager);
192 connectionManager = new ConnectionManagerImpl(config, executorService, ppdb, notificationPublishService);
193 connectionManager.setDeviceConnectedHandler(contextChainHolder);
194 connectionManager.setDeviceDisconnectedHandler(contextChainHolder);
196 deviceManager.setContextChainHolder(contextChainHolder);
197 deviceManager.initialize();
198 systemReadyMonitor.registerListener(this);
199 LOG.info("registered onSystemBootReady() listener for OpenFlowPluginProvider");
203 public void onSystemBootReady() {
204 LOG.info("onSystemBootReady() received, starting the switch connections");
205 Futures.addCallback(Futures.allAsList(switchConnectionProviders.stream().map(switchConnectionProvider -> {
206 // Inject OpenFlowPlugin custom serializers and deserializers into OpenFlowJava
207 if (config.getUseSingleLayerSerialization()) {
208 SerializerInjector.injectSerializers(switchConnectionProvider,
209 switchConnectionProvider.getConfiguration().isGroupAddModEnabled());
210 DeserializerInjector.injectDeserializers(switchConnectionProvider);
212 DeserializerInjector.revertDeserializers(switchConnectionProvider);
215 // Set handler of incoming connections and start switch connection provider
216 return switchConnectionProvider.startup(connectionManager);
217 }).collect(Collectors.toSet())), new FutureCallback<List<Void>>() {
219 public void onSuccess(final List<Void> result) {
220 LOG.info("All switchConnectionProviders are up and running ({}).", result.size());
221 diagStatusProvider.reportStatus(ServiceState.OPERATIONAL);
222 fullyStarted.set(null);
226 public void onFailure(final Throwable throwable) {
227 LOG.warn("Some switchConnectionProviders failed to start.", throwable);
228 diagStatusProvider.reportStatus(ServiceState.ERROR, throwable);
229 fullyStarted.setException(throwable);
231 }, MoreExecutors.directExecutor());
235 public Future<Void> getFullyStarted() {
239 private ListenableFuture<List<Void>> shutdownSwitchConnections() {
240 final var listListenableFuture = Futures.allAsList(switchConnectionProviders.stream()
241 .map(switchConnectionProvider -> {
242 // Revert deserializers to their original state
243 if (config.getUseSingleLayerSerialization()) {
244 DeserializerInjector.revertDeserializers(switchConnectionProvider);
247 // Shutdown switch connection provider
248 return switchConnectionProvider.shutdown();
249 }).collect(Collectors.toList()));
251 Futures.addCallback(listListenableFuture, new FutureCallback<>() {
253 public void onSuccess(final List<Void> result) {
254 LOG.info("All switchConnectionProviders were successfully shut down ({}).", result.size());
258 public void onFailure(final Throwable throwable) {
259 LOG.warn("Some switchConnectionProviders failed to shutdown.", throwable);
261 }, MoreExecutors.directExecutor());
263 return listListenableFuture;
267 public ExtensionConverterRegistrator getExtensionConverterRegistrator() {
268 return extensionConverterManager;
272 public Map<NodeId, FlowGroupInfoHistory> getAllFlowGroupHistories() {
273 return deviceManager.getAllFlowGroupHistories();
277 public FlowGroupInfoHistory getFlowGroupHistory(final NodeId nodeId) {
278 return deviceManager.getFlowGroupHistory(nodeId);
284 @SuppressWarnings("checkstyle:IllegalCatch")
285 public void close() {
287 shutdownSwitchConnections().get(10, TimeUnit.SECONDS);
288 } catch (InterruptedException | ExecutionException | TimeoutException e) {
289 LOG.warn("Failed to shut down switch connections in time {}s", 10, e);
292 gracefulShutdown(contextChainHolder);
293 gracefulShutdown(connectionManager);
294 gracefulShutdown(deviceManager);
295 gracefulShutdown(rpcManager);
296 gracefulShutdown(statisticsManager);
297 gracefulShutdown(roleManager);
298 gracefulShutdown(executorService);
299 gracefulShutdown(hashedWheelTimer);
300 diagStatusProvider.reportStatus(ServiceState.UNREGISTERED);
302 if (connectionManager != null) {
303 connectionManager.close();
304 connectionManager = null;
306 } catch (Exception e) {
307 LOG.error("Failed to close ConnectionManager", e);
311 @SuppressWarnings("checkstyle:IllegalCatch")
312 private static void gracefulShutdown(final AutoCloseable closeable) {
313 if (closeable != null) {
316 } catch (Exception e) {
317 LOG.warn("Failed to shutdown {} gracefully.", closeable);
322 private static void gracefulShutdown(final Timer timer) {
326 } catch (IllegalStateException e) {
327 LOG.warn("Failed to shutdown {} gracefully.", timer);
332 private static void gracefulShutdown(final ExecutorService executorService) {
333 if (executorService != null) {
335 executorService.shutdownNow();
336 } catch (SecurityException e) {
337 LOG.warn("Failed to shutdown {} gracefully.", executorService);