2 * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.openflowplugin.impl.device;
10 import com.google.common.annotations.VisibleForTesting;
11 import com.google.common.base.Preconditions;
12 import com.google.common.base.Verify;
13 import com.google.common.collect.Iterators;
14 import com.google.common.util.concurrent.FutureCallback;
15 import com.google.common.util.concurrent.Futures;
16 import com.google.common.util.concurrent.ListenableFuture;
17 import io.netty.util.HashedWheelTimer;
18 import io.netty.util.TimerTask;
19 import java.util.Collections;
20 import java.util.Iterator;
21 import java.util.concurrent.ConcurrentHashMap;
22 import java.util.concurrent.ConcurrentMap;
23 import java.util.concurrent.ExecutionException;
24 import java.util.concurrent.ScheduledThreadPoolExecutor;
25 import java.util.concurrent.TimeUnit;
26 import javax.annotation.CheckForNull;
27 import javax.annotation.Nonnull;
28 import org.opendaylight.controller.md.sal.binding.api.DataBroker;
29 import org.opendaylight.controller.md.sal.binding.api.NotificationPublishService;
30 import org.opendaylight.controller.md.sal.binding.api.WriteTransaction;
31 import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
32 import org.opendaylight.mdsal.singleton.common.api.ClusterSingletonServiceProvider;
33 import org.opendaylight.openflowjava.protocol.api.connection.ConnectionAdapter;
34 import org.opendaylight.openflowjava.protocol.api.connection.OutboundQueueHandlerRegistration;
35 import org.opendaylight.openflowplugin.api.openflow.OFPContext;
36 import org.opendaylight.openflowplugin.api.openflow.connection.ConnectionContext;
37 import org.opendaylight.openflowplugin.api.openflow.connection.OutboundQueueProvider;
38 import org.opendaylight.openflowplugin.api.openflow.device.DeviceContext;
39 import org.opendaylight.openflowplugin.api.openflow.device.DeviceInfo;
40 import org.opendaylight.openflowplugin.api.openflow.device.DeviceManager;
41 import org.opendaylight.openflowplugin.api.openflow.device.TranslatorLibrary;
42 import org.opendaylight.openflowplugin.api.openflow.device.handlers.DeviceInitializationPhaseHandler;
43 import org.opendaylight.openflowplugin.api.openflow.device.handlers.DeviceTerminationPhaseHandler;
44 import org.opendaylight.openflowplugin.api.openflow.lifecycle.LifecycleService;
45 import org.opendaylight.openflowplugin.api.openflow.statistics.ofpspecific.MessageSpy;
46 import org.opendaylight.openflowplugin.extension.api.ExtensionConverterProviderKeeper;
47 import org.opendaylight.openflowplugin.extension.api.core.extension.ExtensionConverterProvider;
48 import org.opendaylight.openflowplugin.impl.connection.OutboundQueueProviderImpl;
49 import org.opendaylight.openflowplugin.impl.device.listener.OpenflowProtocolListenerFullImpl;
50 import org.opendaylight.openflowplugin.impl.lifecycle.LifecycleServiceImpl;
51 import org.opendaylight.openflowplugin.impl.util.DeviceInitializationUtils;
52 import org.opendaylight.openflowplugin.openflow.md.core.sal.convertor.ConvertorExecutor;
53 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.Nodes;
54 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodesBuilder;
55 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node;
56 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
57 import org.slf4j.Logger;
58 import org.slf4j.LoggerFactory;
63 public class DeviceManagerImpl implements DeviceManager, ExtensionConverterProviderKeeper {
65 private static final Logger LOG = LoggerFactory.getLogger(DeviceManagerImpl.class);
67 private final long globalNotificationQuota;
68 private final boolean switchFeaturesMandatory;
69 private boolean isNotificationFlowRemovedOff;
71 private static final int SPY_RATE = 10;
73 private final DataBroker dataBroker;
74 private final ConvertorExecutor convertorExecutor;
75 private TranslatorLibrary translatorLibrary;
76 private DeviceInitializationPhaseHandler deviceInitPhaseHandler;
77 private DeviceTerminationPhaseHandler deviceTerminPhaseHandler;
79 private final ConcurrentMap<DeviceInfo, DeviceContext> deviceContexts = new ConcurrentHashMap<>();
80 private final ConcurrentMap<DeviceInfo, LifecycleService> lifecycleServices = new ConcurrentHashMap<>();
82 private final long barrierIntervalNanos;
83 private final int barrierCountLimit;
84 private ExtensionConverterProvider extensionConverterProvider;
85 private ScheduledThreadPoolExecutor spyPool;
86 private final ClusterSingletonServiceProvider singletonServiceProvider;
87 private final NotificationPublishService notificationPublishService;
88 private final MessageSpy messageSpy;
89 private final HashedWheelTimer hashedWheelTimer;
91 public DeviceManagerImpl(@Nonnull final DataBroker dataBroker,
92 final long globalNotificationQuota,
93 final boolean switchFeaturesMandatory,
94 final long barrierInterval,
95 final int barrierCountLimit,
97 final MessageSpy messageSpy,
98 final boolean isNotificationFlowRemovedOff,
99 final ClusterSingletonServiceProvider singletonServiceProvider,
100 final NotificationPublishService notificationPublishService,
101 final HashedWheelTimer hashedWheelTimer,
102 final ConvertorExecutor convertorExecutor) {
103 this.switchFeaturesMandatory = switchFeaturesMandatory;
104 this.globalNotificationQuota = globalNotificationQuota;
105 this.isNotificationFlowRemovedOff = isNotificationFlowRemovedOff;
106 this.dataBroker = Preconditions.checkNotNull(dataBroker);
107 this.convertorExecutor = convertorExecutor;
108 this.hashedWheelTimer = hashedWheelTimer;
109 /* merge empty nodes to oper DS to predict any problems with missing parent for Node */
110 final WriteTransaction tx = dataBroker.newWriteOnlyTransaction();
112 final NodesBuilder nodesBuilder = new NodesBuilder();
113 nodesBuilder.setNode(Collections.<Node>emptyList());
114 tx.merge(LogicalDatastoreType.OPERATIONAL, InstanceIdentifier.create(Nodes.class), nodesBuilder.build());
117 } catch (ExecutionException | InterruptedException e) {
118 LOG.error("Creation of node failed.", e);
119 throw new IllegalStateException(e);
122 this.barrierIntervalNanos = TimeUnit.MILLISECONDS.toNanos(barrierInterval);
123 this.barrierCountLimit = barrierCountLimit;
125 spyPool = new ScheduledThreadPoolExecutor(1);
126 this.singletonServiceProvider = singletonServiceProvider;
127 this.notificationPublishService = notificationPublishService;
128 this.messageSpy = messageSpy;
133 public void setDeviceInitializationPhaseHandler(final DeviceInitializationPhaseHandler handler) {
134 this.deviceInitPhaseHandler = handler;
138 public void onDeviceContextLevelUp(@CheckForNull DeviceInfo deviceInfo, final LifecycleService lifecycleService) throws Exception {
139 // final phase - we have to add new Device to MD-SAL DataStore
140 LOG.debug("Final phase of DeviceContextLevelUp for Node: {} ", deviceInfo.getNodeId());
141 DeviceContext deviceContext = Preconditions.checkNotNull(deviceContexts.get(deviceInfo));
142 deviceContext.onPublished();
143 lifecycleService.registerService(this.singletonServiceProvider);
147 public boolean deviceConnected(@CheckForNull final ConnectionContext connectionContext) throws Exception {
148 Preconditions.checkArgument(connectionContext != null);
150 DeviceInfo deviceInfo = connectionContext.getDeviceInfo();
152 * This part prevent destroy another device context. Throwing here an exception result to propagate close connection
153 * in {@link org.opendaylight.openflowplugin.impl.connection.org.opendaylight.openflowplugin.impl.connection.HandshakeContextImpl}
154 * If context already exist we are in state closing process (connection flapping) and we should not propagate connection close
156 if (deviceContexts.containsKey(deviceInfo)) {
157 LOG.warn("Rejecting connection from node which is already connected and there exist deviceContext for it: {}", connectionContext.getNodeId());
161 LOG.info("ConnectionEvent: Device connected to controller, Device:{}, NodeId:{}",
162 connectionContext.getConnectionAdapter().getRemoteAddress(), deviceInfo.getNodeId());
164 // Add Disconnect handler
165 connectionContext.setDeviceDisconnectedHandler(DeviceManagerImpl.this);
166 // Cache this for clarity
167 final ConnectionAdapter connectionAdapter = connectionContext.getConnectionAdapter();
169 //FIXME: as soon as auxiliary connection are fully supported then this is needed only before device context published
170 connectionAdapter.setPacketInFiltering(true);
172 final OutboundQueueProvider outboundQueueProvider = new OutboundQueueProviderImpl(deviceInfo.getVersion());
174 connectionContext.setOutboundQueueProvider(outboundQueueProvider);
175 final OutboundQueueHandlerRegistration<OutboundQueueProvider> outboundQueueHandlerRegistration =
176 connectionAdapter.registerOutboundQueueHandler(outboundQueueProvider, barrierCountLimit, barrierIntervalNanos);
177 connectionContext.setOutboundQueueHandleRegistration(outboundQueueHandlerRegistration);
179 final DeviceContext deviceContext = new DeviceContextImpl(
187 Verify.verify(deviceContexts.putIfAbsent(deviceInfo, deviceContext) == null, "DeviceCtx still not closed.");
189 final LifecycleService lifecycleService = new LifecycleServiceImpl();
190 lifecycleService.setDeviceContext(deviceContext);
192 lifecycleServices.putIfAbsent(deviceInfo, lifecycleService);
194 deviceContext.setSwitchFeaturesMandatory(switchFeaturesMandatory);
196 ((ExtensionConverterProviderKeeper) deviceContext).setExtensionConverterProvider(extensionConverterProvider);
197 deviceContext.setNotificationPublishService(notificationPublishService);
199 updatePacketInRateLimiters();
201 final OpenflowProtocolListenerFullImpl messageListener = new OpenflowProtocolListenerFullImpl(
202 connectionAdapter, deviceContext);
204 connectionAdapter.setMessageListener(messageListener);
205 deviceInitPhaseHandler.onDeviceContextLevelUp(connectionContext.getDeviceInfo(), lifecycleService);
209 private void updatePacketInRateLimiters() {
210 synchronized (deviceContexts) {
211 final int deviceContextsSize = deviceContexts.size();
212 if (deviceContextsSize > 0) {
213 long freshNotificationLimit = globalNotificationQuota / deviceContextsSize;
214 if (freshNotificationLimit < 100) {
215 freshNotificationLimit = 100;
217 LOG.debug("fresh notification limit = {}", freshNotificationLimit);
218 for (final DeviceContext deviceContext : deviceContexts.values()) {
219 deviceContext.updatePacketInRateLimit(freshNotificationLimit);
226 public TranslatorLibrary oook() {
227 return translatorLibrary;
231 public void setTranslatorLibrary(final TranslatorLibrary translatorLibrary) {
232 this.translatorLibrary = translatorLibrary;
236 public void close() {
237 for (final Iterator<DeviceContext> iterator = Iterators.consumingIterator(deviceContexts.values().iterator());
238 iterator.hasNext();) {
239 final DeviceContext deviceCtx = iterator.next();
240 deviceCtx.shutdownConnection();
241 deviceCtx.shuttingDownDataStoreTransactions();
244 if (spyPool != null) {
245 spyPool.shutdownNow();
251 public void onDeviceContextLevelDown(final DeviceInfo deviceInfo) {
252 LOG.debug("onDeviceContextClosed for Node {}", deviceInfo.getNodeId());
253 deviceContexts.remove(deviceInfo);
254 updatePacketInRateLimiters();
255 LifecycleService lifecycleService = lifecycleServices.remove(deviceInfo);
257 lifecycleService.close();
258 } catch (Exception e) {
259 LOG.warn("Closing service for node {} was unsuccessful ", deviceInfo.getNodeId().getValue(), e);
264 public void initialize() {
265 spyPool.scheduleAtFixedRate(messageSpy, SPY_RATE, SPY_RATE, TimeUnit.SECONDS);
269 public void setExtensionConverterProvider(final ExtensionConverterProvider extensionConverterProvider) {
270 this.extensionConverterProvider = extensionConverterProvider;
274 public ExtensionConverterProvider getExtensionConverterProvider() {
275 return extensionConverterProvider;
279 public void setDeviceTerminationPhaseHandler(final DeviceTerminationPhaseHandler handler) {
280 this.deviceTerminPhaseHandler = handler;
284 public void onDeviceDisconnected(final ConnectionContext connectionContext) {
285 LOG.trace("onDeviceDisconnected method call for Node: {}", connectionContext.getNodeId());
286 final DeviceInfo deviceInfo = connectionContext.getDeviceInfo();
287 final DeviceContext deviceCtx = this.deviceContexts.get(deviceInfo);
289 if (null == deviceCtx) {
290 LOG.info("DeviceContext for Node {} was not found. Connection is terminated without OFP context suite.", deviceInfo.getNodeId());
294 if (!connectionContext.equals(deviceCtx.getPrimaryConnectionContext())) {
295 /* Connection is not PrimaryConnection so try to remove from Auxiliary Connections */
296 deviceCtx.removeAuxiliaryConnectionContext(connectionContext);
298 /* Device is disconnected and so we need to close TxManager */
299 final ListenableFuture<Void> future = deviceCtx.shuttingDownDataStoreTransactions();
300 Futures.addCallback(future, new FutureCallback<Void>() {
303 public void onSuccess(final Void result) {
304 LOG.debug("TxChainManager for device {} is closed successful.", deviceInfo.getNodeId());
305 deviceTerminPhaseHandler.onDeviceContextLevelDown(deviceInfo);
309 public void onFailure(final Throwable t) {
310 LOG.warn("TxChainManager for device {} failed by closing.", deviceInfo.getNodeId(), t);
311 deviceTerminPhaseHandler.onDeviceContextLevelDown(deviceInfo);
314 /* Add timer for Close TxManager because it could fain ind cluster without notification */
315 final TimerTask timerTask = timeout -> {
316 if (!future.isDone()) {
317 LOG.info("Shutting down TxChain for node {} not completed during 10 sec. Continue anyway.", deviceInfo.getNodeId());
318 future.cancel(false);
321 hashedWheelTimer.newTimeout(timerTask, 10, TimeUnit.SECONDS);
326 void addDeviceContextToMap(final DeviceInfo deviceInfo, final DeviceContext deviceContext){
327 deviceContexts.put(deviceInfo, deviceContext);
331 void removeDeviceContextFromMap(final DeviceInfo deviceInfo){
332 deviceContexts.remove(deviceInfo);
336 public <T extends OFPContext> T gainContext(final DeviceInfo deviceInfo) {
337 return (T) deviceContexts.get(deviceInfo);
341 public void setIsNotificationFlowRemovedOff(boolean isNotificationFlowRemovedOff) {
342 this.isNotificationFlowRemovedOff = isNotificationFlowRemovedOff;
346 public boolean getIsNotificationFlowRemovedOff() {
347 return this.isNotificationFlowRemovedOff;