2 * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.openflowplugin.impl.device;
10 import com.google.common.annotations.VisibleForTesting;
11 import com.google.common.base.Preconditions;
12 import com.google.common.base.Verify;
13 import com.google.common.collect.Iterators;
14 import com.google.common.util.concurrent.FutureCallback;
15 import com.google.common.util.concurrent.Futures;
16 import com.google.common.util.concurrent.ListenableFuture;
17 import io.netty.util.HashedWheelTimer;
18 import io.netty.util.TimerTask;
19 import java.util.Collections;
20 import java.util.Iterator;
21 import java.util.Objects;
22 import java.util.concurrent.ConcurrentHashMap;
23 import java.util.concurrent.ConcurrentMap;
24 import java.util.concurrent.ExecutionException;
25 import java.util.concurrent.ScheduledThreadPoolExecutor;
26 import java.util.concurrent.TimeUnit;
27 import javax.annotation.CheckForNull;
28 import javax.annotation.Nonnull;
29 import org.opendaylight.controller.md.sal.binding.api.DataBroker;
30 import org.opendaylight.controller.md.sal.binding.api.NotificationPublishService;
31 import org.opendaylight.controller.md.sal.binding.api.WriteTransaction;
32 import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
33 import org.opendaylight.mdsal.singleton.common.api.ClusterSingletonServiceProvider;
34 import org.opendaylight.openflowjava.protocol.api.connection.ConnectionAdapter;
35 import org.opendaylight.openflowjava.protocol.api.connection.OutboundQueueHandlerRegistration;
36 import org.opendaylight.openflowplugin.api.openflow.OFPContext;
37 import org.opendaylight.openflowplugin.api.openflow.connection.ConnectionContext;
38 import org.opendaylight.openflowplugin.api.openflow.connection.OutboundQueueProvider;
39 import org.opendaylight.openflowplugin.api.openflow.device.DeviceContext;
40 import org.opendaylight.openflowplugin.api.openflow.device.DeviceInfo;
41 import org.opendaylight.openflowplugin.api.openflow.device.DeviceManager;
42 import org.opendaylight.openflowplugin.api.openflow.device.TranslatorLibrary;
43 import org.opendaylight.openflowplugin.api.openflow.device.handlers.DeviceInitializationPhaseHandler;
44 import org.opendaylight.openflowplugin.api.openflow.device.handlers.DeviceTerminationPhaseHandler;
45 import org.opendaylight.openflowplugin.api.openflow.lifecycle.LifecycleService;
46 import org.opendaylight.openflowplugin.api.openflow.statistics.ofpspecific.MessageSpy;
47 import org.opendaylight.openflowplugin.extension.api.ExtensionConverterProviderKeeper;
48 import org.opendaylight.openflowplugin.extension.api.core.extension.ExtensionConverterProvider;
49 import org.opendaylight.openflowplugin.impl.connection.OutboundQueueProviderImpl;
50 import org.opendaylight.openflowplugin.impl.device.listener.OpenflowProtocolListenerFullImpl;
51 import org.opendaylight.openflowplugin.impl.lifecycle.LifecycleServiceImpl;
52 import org.opendaylight.openflowplugin.impl.util.DeviceInitializationUtils;
53 import org.opendaylight.openflowplugin.openflow.md.core.sal.convertor.ConvertorExecutor;
54 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.Nodes;
55 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodesBuilder;
56 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node;
57 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
58 import org.slf4j.Logger;
59 import org.slf4j.LoggerFactory;
64 public class DeviceManagerImpl implements DeviceManager, ExtensionConverterProviderKeeper {
66 private static final Logger LOG = LoggerFactory.getLogger(DeviceManagerImpl.class);
68 private final long globalNotificationQuota;
69 private final boolean switchFeaturesMandatory;
70 private boolean isNotificationFlowRemovedOff;
72 private static final int SPY_RATE = 10;
74 private final DataBroker dataBroker;
75 private final ConvertorExecutor convertorExecutor;
76 private TranslatorLibrary translatorLibrary;
77 private DeviceInitializationPhaseHandler deviceInitPhaseHandler;
78 private DeviceTerminationPhaseHandler deviceTerminPhaseHandler;
80 private final ConcurrentMap<DeviceInfo, DeviceContext> deviceContexts = new ConcurrentHashMap<>();
81 private final ConcurrentMap<DeviceInfo, LifecycleService> lifecycleServices = new ConcurrentHashMap<>();
83 private final long barrierIntervalNanos;
84 private final int barrierCountLimit;
85 private ExtensionConverterProvider extensionConverterProvider;
86 private ScheduledThreadPoolExecutor spyPool;
87 private final ClusterSingletonServiceProvider singletonServiceProvider;
88 private final NotificationPublishService notificationPublishService;
89 private final MessageSpy messageSpy;
90 private final HashedWheelTimer hashedWheelTimer;
92 public DeviceManagerImpl(@Nonnull final DataBroker dataBroker,
93 final long globalNotificationQuota,
94 final boolean switchFeaturesMandatory,
95 final long barrierInterval,
96 final int barrierCountLimit,
98 final MessageSpy messageSpy,
99 final boolean isNotificationFlowRemovedOff,
100 final ClusterSingletonServiceProvider singletonServiceProvider,
101 final NotificationPublishService notificationPublishService,
102 final HashedWheelTimer hashedWheelTimer,
103 final ConvertorExecutor convertorExecutor) {
104 this.switchFeaturesMandatory = switchFeaturesMandatory;
105 this.globalNotificationQuota = globalNotificationQuota;
106 this.isNotificationFlowRemovedOff = isNotificationFlowRemovedOff;
107 this.dataBroker = Preconditions.checkNotNull(dataBroker);
108 this.convertorExecutor = convertorExecutor;
109 this.hashedWheelTimer = hashedWheelTimer;
110 /* merge empty nodes to oper DS to predict any problems with missing parent for Node */
111 final WriteTransaction tx = dataBroker.newWriteOnlyTransaction();
113 final NodesBuilder nodesBuilder = new NodesBuilder();
114 nodesBuilder.setNode(Collections.<Node>emptyList());
115 tx.merge(LogicalDatastoreType.OPERATIONAL, InstanceIdentifier.create(Nodes.class), nodesBuilder.build());
118 } catch (ExecutionException | InterruptedException e) {
119 LOG.error("Creation of node failed.", e);
120 throw new IllegalStateException(e);
123 this.barrierIntervalNanos = TimeUnit.MILLISECONDS.toNanos(barrierInterval);
124 this.barrierCountLimit = barrierCountLimit;
126 spyPool = new ScheduledThreadPoolExecutor(1);
127 this.singletonServiceProvider = singletonServiceProvider;
128 this.notificationPublishService = notificationPublishService;
129 this.messageSpy = messageSpy;
134 public void setDeviceInitializationPhaseHandler(final DeviceInitializationPhaseHandler handler) {
135 this.deviceInitPhaseHandler = handler;
139 public void onDeviceContextLevelUp(@CheckForNull DeviceInfo deviceInfo, final LifecycleService lifecycleService) throws Exception {
140 // final phase - we have to add new Device to MD-SAL DataStore
141 LOG.debug("Final phase of DeviceContextLevelUp for Node: {} ", deviceInfo.getNodeId());
142 DeviceContext deviceContext = Preconditions.checkNotNull(deviceContexts.get(deviceInfo));
143 deviceContext.onPublished();
144 lifecycleService.registerService(this.singletonServiceProvider);
148 public boolean deviceConnected(@CheckForNull final ConnectionContext connectionContext) throws Exception {
149 Preconditions.checkArgument(connectionContext != null);
151 DeviceInfo deviceInfo = connectionContext.getDeviceInfo();
153 * This part prevent destroy another device context. Throwing here an exception result to propagate close connection
154 * in {@link org.opendaylight.openflowplugin.impl.connection.org.opendaylight.openflowplugin.impl.connection.HandshakeContextImpl}
155 * If context already exist we are in state closing process (connection flapping) and we should not propagate connection close
157 if (deviceContexts.containsKey(deviceInfo)) {
158 DeviceContext deviceContext = deviceContexts.get(deviceInfo);
159 if (!deviceContext.getState().equals(OFPContext.CONTEXT_STATE.TERMINATION)) {
160 LOG.warn("Context state for node {} is not in TERMINATION state, trying to reconnect", connectionContext.getNodeId().getValue());
162 LOG.warn("Rejecting connection from node which is already connected and there exist deviceContext for it: {}", connectionContext.getNodeId().getValue());
167 LOG.info("ConnectionEvent: Device connected to controller, Device:{}, NodeId:{}",
168 connectionContext.getConnectionAdapter().getRemoteAddress(), deviceInfo.getNodeId());
170 // Add Disconnect handler
171 connectionContext.setDeviceDisconnectedHandler(DeviceManagerImpl.this);
172 // Cache this for clarity
173 final ConnectionAdapter connectionAdapter = connectionContext.getConnectionAdapter();
175 //FIXME: as soon as auxiliary connection are fully supported then this is needed only before device context published
176 connectionAdapter.setPacketInFiltering(true);
178 final OutboundQueueProvider outboundQueueProvider = new OutboundQueueProviderImpl(deviceInfo.getVersion());
180 connectionContext.setOutboundQueueProvider(outboundQueueProvider);
181 final OutboundQueueHandlerRegistration<OutboundQueueProvider> outboundQueueHandlerRegistration =
182 connectionAdapter.registerOutboundQueueHandler(outboundQueueProvider, barrierCountLimit, barrierIntervalNanos);
183 connectionContext.setOutboundQueueHandleRegistration(outboundQueueHandlerRegistration);
185 final DeviceContext deviceContext = new DeviceContextImpl(
193 deviceContexts.putIfAbsent(deviceInfo, deviceContext);
195 final LifecycleService lifecycleService = new LifecycleServiceImpl();
196 lifecycleService.setDeviceContext(deviceContext);
198 lifecycleServices.putIfAbsent(deviceInfo, lifecycleService);
200 deviceContext.setSwitchFeaturesMandatory(switchFeaturesMandatory);
202 ((ExtensionConverterProviderKeeper) deviceContext).setExtensionConverterProvider(extensionConverterProvider);
203 deviceContext.setNotificationPublishService(notificationPublishService);
205 updatePacketInRateLimiters();
207 final OpenflowProtocolListenerFullImpl messageListener = new OpenflowProtocolListenerFullImpl(
208 connectionAdapter, deviceContext);
210 connectionAdapter.setMessageListener(messageListener);
211 deviceInitPhaseHandler.onDeviceContextLevelUp(connectionContext.getDeviceInfo(), lifecycleService);
215 private void updatePacketInRateLimiters() {
216 synchronized (deviceContexts) {
217 final int deviceContextsSize = deviceContexts.size();
218 if (deviceContextsSize > 0) {
219 long freshNotificationLimit = globalNotificationQuota / deviceContextsSize;
220 if (freshNotificationLimit < 100) {
221 freshNotificationLimit = 100;
223 LOG.debug("fresh notification limit = {}", freshNotificationLimit);
224 for (final DeviceContext deviceContext : deviceContexts.values()) {
225 deviceContext.updatePacketInRateLimit(freshNotificationLimit);
232 public TranslatorLibrary oook() {
233 return translatorLibrary;
237 public void setTranslatorLibrary(final TranslatorLibrary translatorLibrary) {
238 this.translatorLibrary = translatorLibrary;
242 public void close() {
243 for (final Iterator<DeviceContext> iterator = Iterators.consumingIterator(deviceContexts.values().iterator());
244 iterator.hasNext();) {
245 final DeviceContext deviceCtx = iterator.next();
246 deviceCtx.shutdownConnection();
247 deviceCtx.shuttingDownDataStoreTransactions();
250 if (spyPool != null) {
251 spyPool.shutdownNow();
257 public void onDeviceContextLevelDown(final DeviceInfo deviceInfo) {
258 deviceContexts.remove(deviceInfo);
259 LifecycleService lifecycleService = lifecycleServices.remove(deviceInfo);
260 updatePacketInRateLimiters();
261 if (Objects.nonNull(lifecycleService)) {
263 lifecycleService.close();
264 } catch (Exception e) {
265 LOG.warn("Closing service for node {} was unsuccessful ", deviceInfo.getNodeId().getValue(), e);
271 public void initialize() {
272 spyPool.scheduleAtFixedRate(messageSpy, SPY_RATE, SPY_RATE, TimeUnit.SECONDS);
276 public void setExtensionConverterProvider(final ExtensionConverterProvider extensionConverterProvider) {
277 this.extensionConverterProvider = extensionConverterProvider;
281 public ExtensionConverterProvider getExtensionConverterProvider() {
282 return extensionConverterProvider;
286 public void setDeviceTerminationPhaseHandler(final DeviceTerminationPhaseHandler handler) {
287 this.deviceTerminPhaseHandler = handler;
291 public void onDeviceDisconnected(final ConnectionContext connectionContext) {
292 LOG.trace("onDeviceDisconnected method call for Node: {}", connectionContext.getNodeId());
293 final DeviceInfo deviceInfo = connectionContext.getDeviceInfo();
294 final DeviceContext deviceCtx = this.deviceContexts.get(deviceInfo);
296 if (null == deviceCtx) {
297 LOG.info("DeviceContext for Node {} was not found. Connection is terminated without OFP context suite.", deviceInfo.getNodeId());
301 if (deviceCtx.getState().equals(OFPContext.CONTEXT_STATE.TERMINATION)) {
302 LOG.debug("Device context for node {} is already is termination state, waiting for close all context");
306 deviceCtx.setState(OFPContext.CONTEXT_STATE.TERMINATION);
308 if (!connectionContext.equals(deviceCtx.getPrimaryConnectionContext())) {
309 LOG.debug("Node {} disconnected, but not primary connection.", connectionContext.getDeviceInfo().getNodeId().getValue());
310 /* Connection is not PrimaryConnection so try to remove from Auxiliary Connections */
311 deviceCtx.removeAuxiliaryConnectionContext(connectionContext);
313 //TODO: Auxiliary connections supported ?
315 /* Device is disconnected and so we need to close TxManager */
316 final ListenableFuture<Void> future = deviceCtx.shuttingDownDataStoreTransactions();
317 Futures.addCallback(future, new FutureCallback<Void>() {
320 public void onSuccess(final Void result) {
321 LOG.debug("TxChainManager for device {} is closed successful.", deviceInfo.getNodeId().getValue());
322 deviceTerminPhaseHandler.onDeviceContextLevelDown(deviceInfo);
326 public void onFailure(final Throwable t) {
327 LOG.warn("TxChainManager for device {} failed by closing.", deviceInfo.getNodeId().getValue());
328 LOG.trace("TxChainManager failed by closing. ", t);
329 deviceTerminPhaseHandler.onDeviceContextLevelDown(deviceInfo);
332 /* Add timer for Close TxManager because it could fain ind cluster without notification */
333 final TimerTask timerTask = timeout -> {
334 if (!future.isDone()) {
335 LOG.warn("Shutting down TxChain for node {} not completed during 10 sec. Continue anyway.", deviceInfo.getNodeId().getValue());
336 future.cancel(false);
339 hashedWheelTimer.newTimeout(timerTask, 10, TimeUnit.SECONDS);
344 void addDeviceContextToMap(final DeviceInfo deviceInfo, final DeviceContext deviceContext){
345 deviceContexts.put(deviceInfo, deviceContext);
349 public <T extends OFPContext> T gainContext(final DeviceInfo deviceInfo) {
350 return (T) deviceContexts.get(deviceInfo);
354 public void setIsNotificationFlowRemovedOff(boolean isNotificationFlowRemovedOff) {
355 this.isNotificationFlowRemovedOff = isNotificationFlowRemovedOff;
359 public boolean getIsNotificationFlowRemovedOff() {
360 return this.isNotificationFlowRemovedOff;