2 * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.openflowplugin.impl.device;
10 import com.google.common.annotations.VisibleForTesting;
11 import com.google.common.base.Preconditions;
12 import com.google.common.collect.Iterators;
13 import com.google.common.util.concurrent.FutureCallback;
14 import com.google.common.util.concurrent.Futures;
15 import com.google.common.util.concurrent.ListenableFuture;
16 import io.netty.util.HashedWheelTimer;
17 import io.netty.util.TimerTask;
18 import java.util.Collections;
19 import java.util.Iterator;
20 import java.util.Objects;
21 import java.util.concurrent.ConcurrentHashMap;
22 import java.util.concurrent.ConcurrentMap;
23 import java.util.concurrent.ExecutionException;
24 import java.util.concurrent.ScheduledThreadPoolExecutor;
25 import java.util.concurrent.TimeUnit;
26 import javax.annotation.CheckForNull;
27 import javax.annotation.Nonnull;
28 import org.opendaylight.controller.md.sal.binding.api.DataBroker;
29 import org.opendaylight.controller.md.sal.binding.api.NotificationPublishService;
30 import org.opendaylight.controller.md.sal.binding.api.WriteTransaction;
31 import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
32 import org.opendaylight.mdsal.singleton.common.api.ClusterSingletonServiceProvider;
33 import org.opendaylight.openflowjava.protocol.api.connection.ConnectionAdapter;
34 import org.opendaylight.openflowjava.protocol.api.connection.OutboundQueueHandlerRegistration;
35 import org.opendaylight.openflowplugin.api.openflow.OFPContext;
36 import org.opendaylight.openflowplugin.api.openflow.connection.ConnectionContext;
37 import org.opendaylight.openflowplugin.api.openflow.connection.ConnectionStatus;
38 import org.opendaylight.openflowplugin.api.openflow.connection.OutboundQueueProvider;
39 import org.opendaylight.openflowplugin.api.openflow.device.DeviceContext;
40 import org.opendaylight.openflowplugin.api.openflow.device.DeviceInfo;
41 import org.opendaylight.openflowplugin.api.openflow.device.DeviceManager;
42 import org.opendaylight.openflowplugin.api.openflow.device.TranslatorLibrary;
43 import org.opendaylight.openflowplugin.api.openflow.device.handlers.DeviceInitializationPhaseHandler;
44 import org.opendaylight.openflowplugin.api.openflow.device.handlers.DeviceTerminationPhaseHandler;
45 import org.opendaylight.openflowplugin.api.openflow.lifecycle.LifecycleService;
46 import org.opendaylight.openflowplugin.api.openflow.statistics.ofpspecific.MessageSpy;
47 import org.opendaylight.openflowplugin.extension.api.ExtensionConverterProviderKeeper;
48 import org.opendaylight.openflowplugin.extension.api.core.extension.ExtensionConverterProvider;
49 import org.opendaylight.openflowplugin.impl.connection.OutboundQueueProviderImpl;
50 import org.opendaylight.openflowplugin.impl.device.listener.OpenflowProtocolListenerFullImpl;
51 import org.opendaylight.openflowplugin.impl.lifecycle.LifecycleServiceImpl;
52 import org.opendaylight.openflowplugin.openflow.md.core.sal.convertor.ConvertorExecutor;
53 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.Nodes;
54 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodesBuilder;
55 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node;
56 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
57 import org.slf4j.Logger;
58 import org.slf4j.LoggerFactory;
63 public class DeviceManagerImpl implements DeviceManager, ExtensionConverterProviderKeeper {
65 private static final Logger LOG = LoggerFactory.getLogger(DeviceManagerImpl.class);
67 private final long globalNotificationQuota;
68 private final boolean switchFeaturesMandatory;
69 private boolean isNotificationFlowRemovedOff;
71 private static final int SPY_RATE = 10;
73 private final DataBroker dataBroker;
74 private final ConvertorExecutor convertorExecutor;
75 private TranslatorLibrary translatorLibrary;
76 private DeviceInitializationPhaseHandler deviceInitPhaseHandler;
77 private DeviceTerminationPhaseHandler deviceTerminPhaseHandler;
79 private final ConcurrentMap<DeviceInfo, DeviceContext> deviceContexts = new ConcurrentHashMap<>();
80 private final ConcurrentMap<DeviceInfo, LifecycleService> lifecycleServices = new ConcurrentHashMap<>();
82 private final long barrierIntervalNanos;
83 private final int barrierCountLimit;
84 private ExtensionConverterProvider extensionConverterProvider;
85 private ScheduledThreadPoolExecutor spyPool;
86 private final ClusterSingletonServiceProvider singletonServiceProvider;
87 private final NotificationPublishService notificationPublishService;
88 private final MessageSpy messageSpy;
89 private final HashedWheelTimer hashedWheelTimer;
91 public DeviceManagerImpl(@Nonnull final DataBroker dataBroker,
92 final long globalNotificationQuota,
93 final boolean switchFeaturesMandatory,
94 final long barrierInterval,
95 final int barrierCountLimit,
97 final MessageSpy messageSpy,
98 final boolean isNotificationFlowRemovedOff,
99 final ClusterSingletonServiceProvider singletonServiceProvider,
100 final NotificationPublishService notificationPublishService,
101 final HashedWheelTimer hashedWheelTimer,
102 final ConvertorExecutor convertorExecutor) {
103 this.switchFeaturesMandatory = switchFeaturesMandatory;
104 this.globalNotificationQuota = globalNotificationQuota;
105 this.isNotificationFlowRemovedOff = isNotificationFlowRemovedOff;
106 this.dataBroker = Preconditions.checkNotNull(dataBroker);
107 this.convertorExecutor = convertorExecutor;
108 this.hashedWheelTimer = hashedWheelTimer;
109 /* merge empty nodes to oper DS to predict any problems with missing parent for Node */
110 final WriteTransaction tx = dataBroker.newWriteOnlyTransaction();
112 final NodesBuilder nodesBuilder = new NodesBuilder();
113 nodesBuilder.setNode(Collections.<Node>emptyList());
114 tx.merge(LogicalDatastoreType.OPERATIONAL, InstanceIdentifier.create(Nodes.class), nodesBuilder.build());
117 } catch (ExecutionException | InterruptedException e) {
118 LOG.error("Creation of node failed.", e);
119 throw new IllegalStateException(e);
122 this.barrierIntervalNanos = TimeUnit.MILLISECONDS.toNanos(barrierInterval);
123 this.barrierCountLimit = barrierCountLimit;
125 spyPool = new ScheduledThreadPoolExecutor(1);
126 this.singletonServiceProvider = singletonServiceProvider;
127 this.notificationPublishService = notificationPublishService;
128 this.messageSpy = messageSpy;
133 public void setDeviceInitializationPhaseHandler(final DeviceInitializationPhaseHandler handler) {
134 this.deviceInitPhaseHandler = handler;
138 public void onDeviceContextLevelUp(@CheckForNull DeviceInfo deviceInfo, final LifecycleService lifecycleService) throws Exception {
139 // final phase - we have to add new Device to MD-SAL DataStore
140 LOG.debug("Final phase of DeviceContextLevelUp for Node: {} ", deviceInfo.getNodeId());
141 DeviceContext deviceContext = Preconditions.checkNotNull(deviceContexts.get(deviceInfo));
142 deviceContext.onPublished();
143 lifecycleService.registerService(this.singletonServiceProvider);
147 public ConnectionStatus deviceConnected(@CheckForNull final ConnectionContext connectionContext) throws Exception {
148 Preconditions.checkArgument(connectionContext != null);
150 DeviceInfo deviceInfo = connectionContext.getDeviceInfo();
152 * This part prevent destroy another device context. Throwing here an exception result to propagate close connection
153 * in {@link org.opendaylight.openflowplugin.impl.connection.org.opendaylight.openflowplugin.impl.connection.HandshakeContextImpl}
154 * If context already exist we are in state closing process (connection flapping) and we should not propagate connection close
156 if (deviceContexts.containsKey(deviceInfo)) {
157 DeviceContext deviceContext = deviceContexts.get(deviceInfo);
158 if (!deviceContext.getState().equals(OFPContext.CONTEXT_STATE.TERMINATION)) {
159 LOG.info("Node {} already connected but context state not in TERMINATION state, replacing connection context",
160 connectionContext.getDeviceInfo().getLOGValue());
161 deviceContext.replaceConnectionContext(connectionContext);
162 return ConnectionStatus.ALREADY_CONNECTED;
164 LOG.warn("Rejecting connection from node which is already connected and there exist deviceContext for it: {}",
165 connectionContext.getDeviceInfo().getLOGValue());
166 return ConnectionStatus.CLOSING;
170 LOG.info("ConnectionEvent: Device connected to controller, Device:{}, NodeId:{}",
171 connectionContext.getConnectionAdapter().getRemoteAddress(), deviceInfo.getNodeId());
173 // Add Disconnect handler
174 connectionContext.setDeviceDisconnectedHandler(DeviceManagerImpl.this);
175 // Cache this for clarity
176 final ConnectionAdapter connectionAdapter = connectionContext.getConnectionAdapter();
178 //FIXME: as soon as auxiliary connection are fully supported then this is needed only before device context published
179 connectionAdapter.setPacketInFiltering(true);
181 final OutboundQueueProvider outboundQueueProvider = new OutboundQueueProviderImpl(deviceInfo.getVersion());
183 connectionContext.setOutboundQueueProvider(outboundQueueProvider);
184 final OutboundQueueHandlerRegistration<OutboundQueueProvider> outboundQueueHandlerRegistration =
185 connectionAdapter.registerOutboundQueueHandler(outboundQueueProvider, barrierCountLimit, barrierIntervalNanos);
186 connectionContext.setOutboundQueueHandleRegistration(outboundQueueHandlerRegistration);
188 final DeviceContext deviceContext = new DeviceContextImpl(
196 deviceContexts.putIfAbsent(deviceInfo, deviceContext);
198 final LifecycleService lifecycleService = new LifecycleServiceImpl();
199 lifecycleService.setDeviceContext(deviceContext);
200 deviceContext.putLifecycleServiceIntoTxChainManager(lifecycleService);
202 lifecycleServices.putIfAbsent(deviceInfo, lifecycleService);
204 deviceContext.setSwitchFeaturesMandatory(switchFeaturesMandatory);
206 ((ExtensionConverterProviderKeeper) deviceContext).setExtensionConverterProvider(extensionConverterProvider);
207 deviceContext.setNotificationPublishService(notificationPublishService);
209 updatePacketInRateLimiters();
211 final OpenflowProtocolListenerFullImpl messageListener = new OpenflowProtocolListenerFullImpl(
212 connectionAdapter, deviceContext);
214 connectionAdapter.setMessageListener(messageListener);
215 deviceInitPhaseHandler.onDeviceContextLevelUp(connectionContext.getDeviceInfo(), lifecycleService);
216 return ConnectionStatus.MAY_CONTINUE;
219 private void updatePacketInRateLimiters() {
220 synchronized (deviceContexts) {
221 final int deviceContextsSize = deviceContexts.size();
222 if (deviceContextsSize > 0) {
223 long freshNotificationLimit = globalNotificationQuota / deviceContextsSize;
224 if (freshNotificationLimit < 100) {
225 freshNotificationLimit = 100;
227 LOG.debug("fresh notification limit = {}", freshNotificationLimit);
228 for (final DeviceContext deviceContext : deviceContexts.values()) {
229 deviceContext.updatePacketInRateLimit(freshNotificationLimit);
236 public TranslatorLibrary oook() {
237 return translatorLibrary;
241 public void setTranslatorLibrary(final TranslatorLibrary translatorLibrary) {
242 this.translatorLibrary = translatorLibrary;
246 public void close() {
247 for (final Iterator<DeviceContext> iterator = Iterators.consumingIterator(deviceContexts.values().iterator());
248 iterator.hasNext();) {
249 final DeviceContext deviceCtx = iterator.next();
250 deviceCtx.shutdownConnection();
251 deviceCtx.shuttingDownDataStoreTransactions();
254 if (spyPool != null) {
255 spyPool.shutdownNow();
261 public void onDeviceContextLevelDown(final DeviceInfo deviceInfo) {
263 LifecycleService lifecycleService = lifecycleServices.remove(deviceInfo);
264 if (LOG.isDebugEnabled()) {
265 LOG.debug("Lifecycle service removed for node {}", deviceInfo.getLOGValue());
268 updatePacketInRateLimiters();
269 if (Objects.nonNull(lifecycleService)) {
271 lifecycleService.close();
272 LOG.debug("Lifecycle service successfully closed for node {}", deviceInfo.getLOGValue());
273 } catch (Exception e) {
274 LOG.warn("Closing lifecycle service for node {} was unsuccessful ", deviceInfo.getLOGValue(), e);
278 deviceContexts.remove(deviceInfo);
279 if (LOG.isDebugEnabled()) {
280 LOG.debug("Device context removed for node {}", deviceInfo.getLOGValue());
286 public void initialize() {
287 spyPool.scheduleAtFixedRate(messageSpy, SPY_RATE, SPY_RATE, TimeUnit.SECONDS);
291 public void setExtensionConverterProvider(final ExtensionConverterProvider extensionConverterProvider) {
292 this.extensionConverterProvider = extensionConverterProvider;
296 public ExtensionConverterProvider getExtensionConverterProvider() {
297 return extensionConverterProvider;
301 public void setDeviceTerminationPhaseHandler(final DeviceTerminationPhaseHandler handler) {
302 this.deviceTerminPhaseHandler = handler;
306 public void onDeviceDisconnected(final ConnectionContext connectionContext) {
307 LOG.trace("onDeviceDisconnected method call for Node: {}", connectionContext.getNodeId());
308 final DeviceInfo deviceInfo = connectionContext.getDeviceInfo();
309 final DeviceContext deviceCtx = this.deviceContexts.get(deviceInfo);
311 if (null == deviceCtx) {
312 LOG.info("DeviceContext for Node {} was not found. Connection is terminated without OFP context suite.", deviceInfo.getLOGValue());
316 if (deviceCtx.getState().equals(OFPContext.CONTEXT_STATE.TERMINATION)) {
317 LOG.debug("Device context for node {} is already is termination state, waiting for close all context");
321 deviceCtx.setState(OFPContext.CONTEXT_STATE.TERMINATION);
323 if (!connectionContext.equals(deviceCtx.getPrimaryConnectionContext())) {
324 LOG.debug("Node {} disconnected, but not primary connection.", connectionContext.getDeviceInfo().getLOGValue());
325 /* Connection is not PrimaryConnection so try to remove from Auxiliary Connections */
326 deviceCtx.removeAuxiliaryConnectionContext(connectionContext);
328 //TODO: Auxiliary connections supported ?
330 /* Device is disconnected and so we need to close TxManager */
331 final ListenableFuture<Void> future = deviceCtx.shuttingDownDataStoreTransactions();
332 Futures.addCallback(future, new FutureCallback<Void>() {
335 public void onSuccess(final Void result) {
336 LOG.debug("TxChainManager for device {} is closed successful.", deviceInfo.getLOGValue());
337 deviceTerminPhaseHandler.onDeviceContextLevelDown(deviceInfo);
341 public void onFailure(final Throwable t) {
342 LOG.warn("TxChainManager for device {} failed by closing.", deviceInfo.getLOGValue());
343 LOG.trace("TxChainManager failed by closing. ", t);
344 deviceTerminPhaseHandler.onDeviceContextLevelDown(deviceInfo);
347 /* Add timer for Close TxManager because it could fain ind cluster without notification */
348 final TimerTask timerTask = timeout -> {
349 if (!future.isDone()) {
350 LOG.warn("Shutting down TxChain for node {} not completed during 10 sec. Continue anyway.", deviceInfo.getLOGValue());
351 future.cancel(false);
354 hashedWheelTimer.newTimeout(timerTask, 10, TimeUnit.SECONDS);
359 void addDeviceContextToMap(final DeviceInfo deviceInfo, final DeviceContext deviceContext){
360 deviceContexts.put(deviceInfo, deviceContext);
364 public <T extends OFPContext> T gainContext(final DeviceInfo deviceInfo) {
365 return (T) deviceContexts.get(deviceInfo);
369 public void setIsNotificationFlowRemovedOff(boolean isNotificationFlowRemovedOff) {
370 this.isNotificationFlowRemovedOff = isNotificationFlowRemovedOff;
374 public boolean getIsNotificationFlowRemovedOff() {
375 return this.isNotificationFlowRemovedOff;