2 * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.openflowplugin.impl.device;
10 import com.google.common.annotations.VisibleForTesting;
11 import com.google.common.base.Preconditions;
12 import com.google.common.collect.Iterators;
13 import com.google.common.util.concurrent.FutureCallback;
14 import com.google.common.util.concurrent.Futures;
15 import com.google.common.util.concurrent.ListenableFuture;
16 import io.netty.util.HashedWheelTimer;
17 import io.netty.util.TimerTask;
18 import java.util.Collections;
19 import java.util.Iterator;
20 import java.util.Objects;
21 import java.util.concurrent.ConcurrentHashMap;
22 import java.util.concurrent.ConcurrentMap;
23 import java.util.concurrent.ExecutionException;
24 import java.util.concurrent.ScheduledThreadPoolExecutor;
25 import java.util.concurrent.TimeUnit;
26 import javax.annotation.CheckForNull;
27 import javax.annotation.Nonnull;
28 import org.opendaylight.controller.md.sal.binding.api.DataBroker;
29 import org.opendaylight.controller.md.sal.binding.api.NotificationPublishService;
30 import org.opendaylight.controller.md.sal.binding.api.WriteTransaction;
31 import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
32 import org.opendaylight.mdsal.singleton.common.api.ClusterSingletonServiceProvider;
33 import org.opendaylight.openflowjava.protocol.api.connection.ConnectionAdapter;
34 import org.opendaylight.openflowjava.protocol.api.connection.OutboundQueueHandlerRegistration;
35 import org.opendaylight.openflowplugin.api.openflow.OFPContext;
36 import org.opendaylight.openflowplugin.api.openflow.connection.ConnectionContext;
37 import org.opendaylight.openflowplugin.api.openflow.connection.ConnectionStatus;
38 import org.opendaylight.openflowplugin.api.openflow.connection.OutboundQueueProvider;
39 import org.opendaylight.openflowplugin.api.openflow.device.DeviceContext;
40 import org.opendaylight.openflowplugin.api.openflow.device.DeviceInfo;
41 import org.opendaylight.openflowplugin.api.openflow.device.DeviceManager;
42 import org.opendaylight.openflowplugin.api.openflow.device.TranslatorLibrary;
43 import org.opendaylight.openflowplugin.api.openflow.device.handlers.DeviceInitializationPhaseHandler;
44 import org.opendaylight.openflowplugin.api.openflow.device.handlers.DeviceTerminationPhaseHandler;
45 import org.opendaylight.openflowplugin.api.openflow.lifecycle.LifecycleService;
46 import org.opendaylight.openflowplugin.api.openflow.statistics.ofpspecific.MessageSpy;
47 import org.opendaylight.openflowplugin.extension.api.ExtensionConverterProviderKeeper;
48 import org.opendaylight.openflowplugin.extension.api.core.extension.ExtensionConverterProvider;
49 import org.opendaylight.openflowplugin.impl.connection.OutboundQueueProviderImpl;
50 import org.opendaylight.openflowplugin.impl.device.listener.OpenflowProtocolListenerFullImpl;
51 import org.opendaylight.openflowplugin.impl.lifecycle.LifecycleServiceImpl;
52 import org.opendaylight.openflowplugin.openflow.md.core.sal.convertor.ConvertorExecutor;
53 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.Nodes;
54 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodesBuilder;
55 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node;
56 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
57 import org.slf4j.Logger;
58 import org.slf4j.LoggerFactory;
63 public class DeviceManagerImpl implements DeviceManager, ExtensionConverterProviderKeeper {
65 private static final Logger LOG = LoggerFactory.getLogger(DeviceManagerImpl.class);
67 private final long globalNotificationQuota;
68 private final boolean switchFeaturesMandatory;
69 private boolean isNotificationFlowRemovedOff;
71 private static final int SPY_RATE = 10;
73 private final DataBroker dataBroker;
74 private final ConvertorExecutor convertorExecutor;
75 private TranslatorLibrary translatorLibrary;
76 private DeviceInitializationPhaseHandler deviceInitPhaseHandler;
77 private DeviceTerminationPhaseHandler deviceTerminPhaseHandler;
79 private final ConcurrentMap<DeviceInfo, DeviceContext> deviceContexts = new ConcurrentHashMap<>();
80 private final ConcurrentMap<DeviceInfo, LifecycleService> lifecycleServices = new ConcurrentHashMap<>();
82 private final long barrierIntervalNanos;
83 private final int barrierCountLimit;
84 private ExtensionConverterProvider extensionConverterProvider;
85 private ScheduledThreadPoolExecutor spyPool;
86 private final ClusterSingletonServiceProvider singletonServiceProvider;
87 private final NotificationPublishService notificationPublishService;
88 private final MessageSpy messageSpy;
89 private final HashedWheelTimer hashedWheelTimer;
91 public DeviceManagerImpl(@Nonnull final DataBroker dataBroker,
92 final long globalNotificationQuota,
93 final boolean switchFeaturesMandatory,
94 final long barrierInterval,
95 final int barrierCountLimit,
97 final MessageSpy messageSpy,
98 final boolean isNotificationFlowRemovedOff,
99 final ClusterSingletonServiceProvider singletonServiceProvider,
100 final NotificationPublishService notificationPublishService,
101 final HashedWheelTimer hashedWheelTimer,
102 final ConvertorExecutor convertorExecutor) {
103 this.switchFeaturesMandatory = switchFeaturesMandatory;
104 this.globalNotificationQuota = globalNotificationQuota;
105 this.isNotificationFlowRemovedOff = isNotificationFlowRemovedOff;
106 this.dataBroker = Preconditions.checkNotNull(dataBroker);
107 this.convertorExecutor = convertorExecutor;
108 this.hashedWheelTimer = hashedWheelTimer;
109 /* merge empty nodes to oper DS to predict any problems with missing parent for Node */
110 final WriteTransaction tx = dataBroker.newWriteOnlyTransaction();
112 final NodesBuilder nodesBuilder = new NodesBuilder();
113 nodesBuilder.setNode(Collections.<Node>emptyList());
114 tx.merge(LogicalDatastoreType.OPERATIONAL, InstanceIdentifier.create(Nodes.class), nodesBuilder.build());
117 } catch (ExecutionException | InterruptedException e) {
118 LOG.error("Creation of node failed.", e);
119 throw new IllegalStateException(e);
122 this.barrierIntervalNanos = TimeUnit.MILLISECONDS.toNanos(barrierInterval);
123 this.barrierCountLimit = barrierCountLimit;
125 spyPool = new ScheduledThreadPoolExecutor(1);
126 this.singletonServiceProvider = singletonServiceProvider;
127 this.notificationPublishService = notificationPublishService;
128 this.messageSpy = messageSpy;
133 public void setDeviceInitializationPhaseHandler(final DeviceInitializationPhaseHandler handler) {
134 this.deviceInitPhaseHandler = handler;
138 public void onDeviceContextLevelUp(@CheckForNull DeviceInfo deviceInfo, final LifecycleService lifecycleService) throws Exception {
139 // final phase - we have to add new Device to MD-SAL DataStore
140 LOG.debug("Final phase of DeviceContextLevelUp for Node: {} ", deviceInfo.getNodeId());
141 DeviceContext deviceContext = Preconditions.checkNotNull(deviceContexts.get(deviceInfo));
142 deviceContext.onPublished();
143 lifecycleService.registerService(this.singletonServiceProvider);
147 public ConnectionStatus deviceConnected(@CheckForNull final ConnectionContext connectionContext) throws Exception {
148 Preconditions.checkArgument(connectionContext != null);
150 DeviceInfo deviceInfo = connectionContext.getDeviceInfo();
152 * This part prevent destroy another device context. Throwing here an exception result to propagate close connection
153 * in {@link org.opendaylight.openflowplugin.impl.connection.org.opendaylight.openflowplugin.impl.connection.HandshakeContextImpl}
154 * If context already exist we are in state closing process (connection flapping) and we should not propagate connection close
156 if (deviceContexts.containsKey(deviceInfo)) {
157 DeviceContext deviceContext = deviceContexts.get(deviceInfo);
158 if (!deviceContext.getState().equals(OFPContext.CONTEXT_STATE.TERMINATION)) {
159 LOG.info("Node {} already connected but context state not in TERMINATION state, replacing connection context",
160 connectionContext.getDeviceInfo().getLOGValue());
161 deviceContext.replaceConnectionContext(connectionContext);
162 return ConnectionStatus.ALREADY_CONNECTED;
164 LOG.warn("Rejecting connection from node which is already connected and there exist deviceContext for it: {}",
165 connectionContext.getDeviceInfo().getLOGValue());
166 return ConnectionStatus.CLOSING;
170 LOG.info("ConnectionEvent: Device connected to controller, Device:{}, NodeId:{}",
171 connectionContext.getConnectionAdapter().getRemoteAddress(), deviceInfo.getNodeId());
173 // Add Disconnect handler
174 connectionContext.setDeviceDisconnectedHandler(DeviceManagerImpl.this);
175 // Cache this for clarity
176 final ConnectionAdapter connectionAdapter = connectionContext.getConnectionAdapter();
178 //FIXME: as soon as auxiliary connection are fully supported then this is needed only before device context published
179 connectionAdapter.setPacketInFiltering(true);
181 final OutboundQueueProvider outboundQueueProvider = new OutboundQueueProviderImpl(deviceInfo.getVersion());
183 connectionContext.setOutboundQueueProvider(outboundQueueProvider);
184 final OutboundQueueHandlerRegistration<OutboundQueueProvider> outboundQueueHandlerRegistration =
185 connectionAdapter.registerOutboundQueueHandler(outboundQueueProvider, barrierCountLimit, barrierIntervalNanos);
186 connectionContext.setOutboundQueueHandleRegistration(outboundQueueHandlerRegistration);
188 final DeviceContext deviceContext = new DeviceContextImpl(
196 deviceContexts.putIfAbsent(deviceInfo, deviceContext);
198 final LifecycleService lifecycleService = new LifecycleServiceImpl();
199 lifecycleService.setDeviceContext(deviceContext);
200 deviceContext.putLifecycleServiceIntoTxChainManager(lifecycleService);
202 lifecycleServices.putIfAbsent(deviceInfo, lifecycleService);
204 deviceContext.setSwitchFeaturesMandatory(switchFeaturesMandatory);
206 ((ExtensionConverterProviderKeeper) deviceContext).setExtensionConverterProvider(extensionConverterProvider);
207 deviceContext.setNotificationPublishService(notificationPublishService);
209 updatePacketInRateLimiters();
211 final OpenflowProtocolListenerFullImpl messageListener = new OpenflowProtocolListenerFullImpl(
212 connectionAdapter, deviceContext);
214 connectionAdapter.setMessageListener(messageListener);
215 deviceInitPhaseHandler.onDeviceContextLevelUp(connectionContext.getDeviceInfo(), lifecycleService);
216 return ConnectionStatus.MAY_CONTINUE;
219 private void updatePacketInRateLimiters() {
220 synchronized (deviceContexts) {
221 final int deviceContextsSize = deviceContexts.size();
222 if (deviceContextsSize > 0) {
223 long freshNotificationLimit = globalNotificationQuota / deviceContextsSize;
224 if (freshNotificationLimit < 100) {
225 freshNotificationLimit = 100;
227 LOG.debug("fresh notification limit = {}", freshNotificationLimit);
228 for (final DeviceContext deviceContext : deviceContexts.values()) {
229 deviceContext.updatePacketInRateLimit(freshNotificationLimit);
236 public TranslatorLibrary oook() {
237 return translatorLibrary;
241 public void setTranslatorLibrary(final TranslatorLibrary translatorLibrary) {
242 this.translatorLibrary = translatorLibrary;
246 public void close() {
247 for (final Iterator<DeviceContext> iterator = Iterators.consumingIterator(deviceContexts.values().iterator());
248 iterator.hasNext();) {
249 final DeviceContext deviceCtx = iterator.next();
250 deviceCtx.shutdownConnection();
251 deviceCtx.shuttingDownDataStoreTransactions();
254 if (spyPool != null) {
255 spyPool.shutdownNow();
261 public void onDeviceContextLevelDown(final DeviceInfo deviceInfo) {
263 deviceContexts.remove(deviceInfo);
264 if (LOG.isDebugEnabled()) {
265 LOG.debug("Device context removed for node {}", deviceInfo.getLOGValue());
268 LifecycleService lifecycleService = lifecycleServices.remove(deviceInfo);
269 if (LOG.isDebugEnabled()) {
270 LOG.debug("Lifecycle service removed for node {}", deviceInfo.getLOGValue());
273 updatePacketInRateLimiters();
274 if (Objects.nonNull(lifecycleService)) {
276 lifecycleService.close();
277 LOG.debug("Lifecycle service successfully closed for node {}", deviceInfo.getLOGValue());
278 } catch (Exception e) {
279 LOG.warn("Closing lifecycle service for node {} was unsuccessful ", deviceInfo.getLOGValue(), e);
285 public void initialize() {
286 spyPool.scheduleAtFixedRate(messageSpy, SPY_RATE, SPY_RATE, TimeUnit.SECONDS);
290 public void setExtensionConverterProvider(final ExtensionConverterProvider extensionConverterProvider) {
291 this.extensionConverterProvider = extensionConverterProvider;
295 public ExtensionConverterProvider getExtensionConverterProvider() {
296 return extensionConverterProvider;
300 public void setDeviceTerminationPhaseHandler(final DeviceTerminationPhaseHandler handler) {
301 this.deviceTerminPhaseHandler = handler;
305 public void onDeviceDisconnected(final ConnectionContext connectionContext) {
306 LOG.trace("onDeviceDisconnected method call for Node: {}", connectionContext.getNodeId());
307 final DeviceInfo deviceInfo = connectionContext.getDeviceInfo();
308 final DeviceContext deviceCtx = this.deviceContexts.get(deviceInfo);
310 if (null == deviceCtx) {
311 LOG.info("DeviceContext for Node {} was not found. Connection is terminated without OFP context suite.", deviceInfo.getLOGValue());
315 if (deviceCtx.getState().equals(OFPContext.CONTEXT_STATE.TERMINATION)) {
316 LOG.debug("Device context for node {} is already is termination state, waiting for close all context");
320 deviceCtx.setState(OFPContext.CONTEXT_STATE.TERMINATION);
322 if (!connectionContext.equals(deviceCtx.getPrimaryConnectionContext())) {
323 LOG.debug("Node {} disconnected, but not primary connection.", connectionContext.getDeviceInfo().getLOGValue());
324 /* Connection is not PrimaryConnection so try to remove from Auxiliary Connections */
325 deviceCtx.removeAuxiliaryConnectionContext(connectionContext);
327 //TODO: Auxiliary connections supported ?
329 /* Device is disconnected and so we need to close TxManager */
330 final ListenableFuture<Void> future = deviceCtx.shuttingDownDataStoreTransactions();
331 Futures.addCallback(future, new FutureCallback<Void>() {
334 public void onSuccess(final Void result) {
335 LOG.debug("TxChainManager for device {} is closed successful.", deviceInfo.getLOGValue());
336 deviceTerminPhaseHandler.onDeviceContextLevelDown(deviceInfo);
340 public void onFailure(final Throwable t) {
341 LOG.warn("TxChainManager for device {} failed by closing.", deviceInfo.getLOGValue());
342 LOG.trace("TxChainManager failed by closing. ", t);
343 deviceTerminPhaseHandler.onDeviceContextLevelDown(deviceInfo);
346 /* Add timer for Close TxManager because it could fain ind cluster without notification */
347 final TimerTask timerTask = timeout -> {
348 if (!future.isDone()) {
349 LOG.warn("Shutting down TxChain for node {} not completed during 10 sec. Continue anyway.", deviceInfo.getLOGValue());
350 future.cancel(false);
353 hashedWheelTimer.newTimeout(timerTask, 10, TimeUnit.SECONDS);
358 void addDeviceContextToMap(final DeviceInfo deviceInfo, final DeviceContext deviceContext){
359 deviceContexts.put(deviceInfo, deviceContext);
363 public <T extends OFPContext> T gainContext(final DeviceInfo deviceInfo) {
364 return (T) deviceContexts.get(deviceInfo);
368 public void setIsNotificationFlowRemovedOff(boolean isNotificationFlowRemovedOff) {
369 this.isNotificationFlowRemovedOff = isNotificationFlowRemovedOff;
373 public boolean getIsNotificationFlowRemovedOff() {
374 return this.isNotificationFlowRemovedOff;