2 * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.openflowplugin.impl;
10 import static java.util.Objects.requireNonNull;
12 import com.google.common.util.concurrent.FutureCallback;
13 import com.google.common.util.concurrent.Futures;
14 import com.google.common.util.concurrent.ListenableFuture;
15 import com.google.common.util.concurrent.MoreExecutors;
16 import io.netty.util.HashedWheelTimer;
17 import io.netty.util.Timer;
18 import java.util.ArrayList;
19 import java.util.List;
21 import java.util.concurrent.ExecutionException;
22 import java.util.concurrent.ExecutorService;
23 import java.util.concurrent.SynchronousQueue;
24 import java.util.concurrent.TimeUnit;
25 import java.util.concurrent.TimeoutException;
26 import java.util.stream.Collectors;
27 import javax.annotation.PreDestroy;
28 import javax.inject.Inject;
29 import javax.inject.Singleton;
30 import org.opendaylight.infrautils.diagstatus.ServiceState;
31 import org.opendaylight.infrautils.ready.SystemReadyListener;
32 import org.opendaylight.infrautils.ready.SystemReadyMonitor;
33 import org.opendaylight.mdsal.binding.api.DataBroker;
34 import org.opendaylight.mdsal.binding.api.NotificationPublishService;
35 import org.opendaylight.mdsal.binding.api.RpcProviderService;
36 import org.opendaylight.mdsal.eos.binding.api.EntityOwnershipService;
37 import org.opendaylight.mdsal.singleton.api.ClusterSingletonServiceProvider;
38 import org.opendaylight.openflowjava.protocol.spi.connection.SwitchConnectionProvider;
39 import org.opendaylight.openflowplugin.api.openflow.FlowGroupInfoHistories;
40 import org.opendaylight.openflowplugin.api.openflow.FlowGroupInfoHistory;
41 import org.opendaylight.openflowplugin.api.openflow.configuration.ConfigurationService;
42 import org.opendaylight.openflowplugin.api.openflow.connection.ConnectionManager;
43 import org.opendaylight.openflowplugin.api.openflow.device.DeviceManager;
44 import org.opendaylight.openflowplugin.api.openflow.mastership.MastershipChangeServiceManager;
45 import org.opendaylight.openflowplugin.api.openflow.role.RoleManager;
46 import org.opendaylight.openflowplugin.api.openflow.rpc.RpcManager;
47 import org.opendaylight.openflowplugin.api.openflow.statistics.StatisticsManager;
48 import org.opendaylight.openflowplugin.api.openflow.statistics.ofpspecific.MessageIntelligenceAgency;
49 import org.opendaylight.openflowplugin.extension.api.ExtensionConverterRegistrator;
50 import org.opendaylight.openflowplugin.extension.api.OpenFlowPluginExtensionRegistratorProvider;
51 import org.opendaylight.openflowplugin.extension.api.core.extension.ExtensionConverterManager;
52 import org.opendaylight.openflowplugin.impl.configuration.OpenFlowProviderConfigImpl;
53 import org.opendaylight.openflowplugin.impl.connection.ConnectionManagerImpl;
54 import org.opendaylight.openflowplugin.impl.device.DeviceManagerImpl;
55 import org.opendaylight.openflowplugin.impl.device.initialization.DeviceInitializerProvider;
56 import org.opendaylight.openflowplugin.impl.device.initialization.DeviceInitializerProviderFactory;
57 import org.opendaylight.openflowplugin.impl.lifecycle.ContextChainHolderImpl;
58 import org.opendaylight.openflowplugin.impl.protocol.deserialization.DeserializerInjector;
59 import org.opendaylight.openflowplugin.impl.protocol.serialization.SerializerInjector;
60 import org.opendaylight.openflowplugin.impl.role.RoleManagerImpl;
61 import org.opendaylight.openflowplugin.impl.rpc.RpcManagerImpl;
62 import org.opendaylight.openflowplugin.impl.statistics.StatisticsManagerImpl;
63 import org.opendaylight.openflowplugin.impl.util.ThreadPoolLoggingExecutor;
64 import org.opendaylight.openflowplugin.impl.util.TranslatorLibraryUtil;
65 import org.opendaylight.openflowplugin.openflow.md.core.extension.ExtensionConverterManagerImpl;
66 import org.opendaylight.openflowplugin.openflow.md.core.sal.convertor.ConvertorManager;
67 import org.opendaylight.openflowplugin.openflow.md.core.sal.convertor.ConvertorManagerFactory;
68 import org.opendaylight.openflowplugin.openflow.md.core.session.OFSessionUtil;
69 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeId;
70 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.openflow.provider.config.rev160510.OpenflowProviderConfig;
71 import org.osgi.service.component.annotations.Activate;
72 import org.osgi.service.component.annotations.Component;
73 import org.osgi.service.component.annotations.Deactivate;
74 import org.osgi.service.component.annotations.Reference;
75 import org.osgi.service.component.annotations.ReferenceCardinality;
76 import org.osgi.service.component.annotations.ReferencePolicy;
77 import org.osgi.service.component.annotations.ReferencePolicyOption;
78 import org.slf4j.Logger;
79 import org.slf4j.LoggerFactory;
82 @Component(immediate = true, service = {
83 OpenFlowPluginExtensionRegistratorProvider.class,
84 FlowGroupInfoHistories.class
86 public final class OpenFlowPluginProviderImpl
87 implements OpenFlowPluginExtensionRegistratorProvider, FlowGroupInfoHistories, SystemReadyListener,
89 private static final Logger LOG = LoggerFactory.getLogger(OpenFlowPluginProviderImpl.class);
91 private static final int TICKS_PER_WHEEL = 500; // 0.5 sec.
92 private static final long TICK_DURATION = 10;
93 private static final String POOL_NAME = "ofppool";
95 // TODO: Split this out into a separate component, which requires proper timer cancellation from all users. But is
96 // that worth the complications?
97 private final HashedWheelTimer hashedWheelTimer =
98 new HashedWheelTimer(TICK_DURATION, TimeUnit.MILLISECONDS, TICKS_PER_WHEEL);
99 private final ExtensionConverterManager extensionConverterManager;
100 private final DeviceInitializerProvider deviceInitializerProvider;
101 private final ConvertorManager convertorManager;
102 private final OpenflowProviderConfig config;
103 private final DeviceManager deviceManager;
104 private final RpcManager rpcManager;
105 private final StatisticsManager statisticsManager;
106 private final RoleManager roleManager;
107 private final ExecutorService executorService;
108 private final ContextChainHolderImpl contextChainHolder;
109 private final DiagStatusProvider diagStatusProvider;
111 private final List<SwitchConnectionProvider> connectionProviders = new ArrayList<>();
113 private List<SwitchConnectionProvider> startedProviders;
114 private ConnectionManager connectionManager;
115 private int startingProviders;
119 public OpenFlowPluginProviderImpl(@Reference final ConfigurationService configurationService,
120 @Reference final DataBroker dataBroker, @Reference final RpcProviderService rpcProviderRegistry,
121 @Reference final NotificationPublishService notificationPublishService,
122 @Reference final ClusterSingletonServiceProvider singletonServiceProvider,
123 @Reference final EntityOwnershipService entityOwnershipService,
124 @Reference final MastershipChangeServiceManager mastershipChangeServiceManager,
125 @Reference final MessageIntelligenceAgency messageIntelligenceAgency,
126 @Reference final DiagStatusProvider diagStatusProvider,
127 @Reference final SystemReadyMonitor systemReadyMonitor) {
128 config = new OpenFlowProviderConfigImpl(configurationService);
129 final var ppdb = new PingPongDataBroker(dataBroker);
130 this.diagStatusProvider = requireNonNull(diagStatusProvider);
132 convertorManager = ConvertorManagerFactory.createDefaultManager();
133 extensionConverterManager = new ExtensionConverterManagerImpl();
134 deviceInitializerProvider = DeviceInitializerProviderFactory.createDefaultProvider();
136 // TODO: copied from OpenFlowPluginProvider (Helium) misusesing the old way of distributing extension converters
137 // TODO: rewrite later!
138 OFSessionUtil.getSessionManager().setExtensionConverterProvider(extensionConverterManager);
140 // Creates a thread pool that creates new threads as needed, but will reuse previously
141 // constructed threads when they are available.
142 // Threads that have not been used for x seconds are terminated and removed from the cache.
143 executorService = new ThreadPoolLoggingExecutor(
144 config.getThreadPoolMinThreads().toJava(),
145 config.getThreadPoolMaxThreads().getValue().toJava(),
146 config.getThreadPoolTimeout().toJava(),
147 TimeUnit.SECONDS, new SynchronousQueue<>(), POOL_NAME);
149 final var devMgr = new DeviceManagerImpl(
152 messageIntelligenceAgency,
153 notificationPublishService,
156 deviceInitializerProvider,
158 deviceManager = devMgr;
160 TranslatorLibraryUtil.injectBasicTranslatorLibrary(deviceManager, convertorManager);
161 devMgr.setExtensionConverterProvider(extensionConverterManager);
163 rpcManager = new RpcManagerImpl(
166 extensionConverterManager,
168 notificationPublishService);
170 statisticsManager = new StatisticsManagerImpl(
176 roleManager = new RoleManagerImpl(hashedWheelTimer, config, executorService);
178 contextChainHolder = new ContextChainHolderImpl(
180 singletonServiceProvider,
181 entityOwnershipService,
182 mastershipChangeServiceManager,
185 contextChainHolder.addManager(deviceManager);
186 contextChainHolder.addManager(statisticsManager);
187 contextChainHolder.addManager(rpcManager);
188 contextChainHolder.addManager(roleManager);
190 connectionManager = new ConnectionManagerImpl(config, executorService, ppdb, notificationPublishService);
191 connectionManager.setDeviceConnectedHandler(contextChainHolder);
192 connectionManager.setDeviceDisconnectedHandler(contextChainHolder);
194 deviceManager.setContextChainHolder(contextChainHolder);
195 deviceManager.initialize();
196 systemReadyMonitor.registerListener(this);
197 LOG.info("OpenFlowPluginProvider started, waiting for onSystemBootReady()");
200 @Reference(cardinality = ReferenceCardinality.AT_LEAST_ONE,
201 policy = ReferencePolicy.DYNAMIC, policyOption = ReferencePolicyOption.GREEDY)
202 public synchronized void bindConnectionProvider(final SwitchConnectionProvider switchConnectionProvider) {
203 connectionProviders.add(switchConnectionProvider);
204 LOG.info("Added connection provider {}", switchConnectionProvider);
206 if (startedProviders != null) {
207 LOG.info("Starting latecomer connection provider {}", switchConnectionProvider);
208 startingProviders += 1;
209 startProvider(switchConnectionProvider);
213 public synchronized void unbindConnectionProvider(final SwitchConnectionProvider switchConnectionProvider) {
214 connectionProviders.remove(switchConnectionProvider);
215 if (startedProviders != null && startedProviders.remove(switchConnectionProvider)) {
216 switchConnectionProvider.shutdown();
218 LOG.info("Removed connection provider {}", switchConnectionProvider);
221 private ListenableFuture<Void> startProvider(final SwitchConnectionProvider provider) {
222 // Inject OpenFlowPlugin custom serializers and deserializers into OpenFlowJava
223 if (config.getUseSingleLayerSerialization()) {
224 SerializerInjector.injectSerializers(provider, provider.getConfiguration().isGroupAddModEnabled());
225 DeserializerInjector.injectDeserializers(provider);
227 DeserializerInjector.revertDeserializers(provider);
230 // Set handler of incoming connections and start switch connection provider
231 final var future = provider.startup(connectionManager);
232 startedProviders.add(provider);
233 Futures.addCallback(future, new FutureCallback<>() {
235 public void onSuccess(final Void result) {
236 LOG.info("Connection provider {} started", provider);
241 public void onFailure(final Throwable cause) {
242 LOG.warn("Connection provider {} failed to start", provider, cause);
243 connectionFailed(cause);
245 }, MoreExecutors.directExecutor());
250 public synchronized void onSystemBootReady() {
251 LOG.info("onSystemBootReady() received, starting the switch connections");
253 final var size = connectionProviders.size();
254 startedProviders = new ArrayList<>(size);
255 startingProviders = size;
256 connectionProviders.forEach(this::startProvider);
259 private synchronized void connectionFailed(final Throwable cause) {
260 // Decrement below zero, so we do not arrive to zero
261 startingProviders = -1;
262 diagStatusProvider.reportStatus(ServiceState.ERROR, cause);
265 private synchronized void connectionStarted() {
266 if (--startingProviders == 0 && startedProviders.equals(connectionProviders)) {
267 LOG.info("All switchConnectionProviders are up and running ({}).", startedProviders.size());
268 diagStatusProvider.reportStatus(ServiceState.OPERATIONAL);
272 private ListenableFuture<List<Void>> shutdownSwitchConnections() {
273 final var future = Futures.allAsList(startedProviders.stream()
274 .map(switchConnectionProvider -> {
275 // Revert deserializers to their original state
276 if (config.getUseSingleLayerSerialization()) {
277 DeserializerInjector.revertDeserializers(switchConnectionProvider);
280 // Shutdown switch connection provider
281 return switchConnectionProvider.shutdown();
282 }).collect(Collectors.toList()));
283 startedProviders.clear();
285 Futures.addCallback(future, new FutureCallback<>() {
287 public void onSuccess(final List<Void> result) {
288 LOG.info("All switchConnectionProviders were successfully shut down ({}).", result.size());
292 public void onFailure(final Throwable throwable) {
293 LOG.warn("Some switchConnectionProviders failed to shutdown.", throwable);
295 }, MoreExecutors.directExecutor());
301 public ExtensionConverterRegistrator getExtensionConverterRegistrator() {
302 return extensionConverterManager;
306 public Map<NodeId, FlowGroupInfoHistory> getAllFlowGroupHistories() {
307 return deviceManager.getAllFlowGroupHistories();
311 public FlowGroupInfoHistory getFlowGroupHistory(final NodeId nodeId) {
312 return deviceManager.getFlowGroupHistory(nodeId);
318 @SuppressWarnings("checkstyle:IllegalCatch")
319 public synchronized void close() {
320 LOG.info("OpenFlowPluginProvider stopping");
322 shutdownSwitchConnections().get(10, TimeUnit.SECONDS);
323 } catch (InterruptedException | ExecutionException | TimeoutException e) {
324 LOG.warn("Failed to shut down switch connections in time {}s", 10, e);
327 gracefulShutdown(contextChainHolder);
328 gracefulShutdown(connectionManager);
329 gracefulShutdown(deviceManager);
330 gracefulShutdown(rpcManager);
331 gracefulShutdown(statisticsManager);
332 gracefulShutdown(roleManager);
333 gracefulShutdown(executorService);
334 gracefulShutdown(hashedWheelTimer);
335 diagStatusProvider.reportStatus(ServiceState.UNREGISTERED);
337 if (connectionManager != null) {
338 connectionManager.close();
339 connectionManager = null;
341 } catch (Exception e) {
342 LOG.error("Failed to close ConnectionManager", e);
344 LOG.info("OpenFlowPluginProvider stopped");
347 @SuppressWarnings("checkstyle:IllegalCatch")
348 private static void gracefulShutdown(final AutoCloseable closeable) {
349 if (closeable != null) {
352 } catch (Exception e) {
353 LOG.warn("Failed to shutdown {} gracefully.", closeable);
358 private static void gracefulShutdown(final Timer timer) {
362 } catch (IllegalStateException e) {
363 LOG.warn("Failed to shutdown {} gracefully.", timer);
368 private static void gracefulShutdown(final ExecutorService executorService) {
369 if (executorService != null) {
371 executorService.shutdownNow();
372 } catch (SecurityException e) {
373 LOG.warn("Failed to shutdown {} gracefully.", executorService);