2 * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.openflowplugin.impl.device;
10 import com.google.common.annotations.VisibleForTesting;
11 import com.google.common.base.Preconditions;
12 import com.google.common.collect.Iterators;
13 import com.google.common.util.concurrent.FutureCallback;
14 import com.google.common.util.concurrent.Futures;
15 import com.google.common.util.concurrent.ListenableFuture;
16 import io.netty.util.HashedWheelTimer;
17 import io.netty.util.TimerTask;
18 import java.util.Collections;
19 import java.util.Iterator;
20 import java.util.Objects;
21 import java.util.concurrent.ConcurrentHashMap;
22 import java.util.concurrent.ConcurrentMap;
23 import java.util.concurrent.ExecutionException;
24 import java.util.concurrent.ScheduledThreadPoolExecutor;
25 import java.util.concurrent.TimeUnit;
26 import javax.annotation.CheckForNull;
27 import javax.annotation.Nonnull;
28 import org.opendaylight.controller.md.sal.binding.api.DataBroker;
29 import org.opendaylight.controller.md.sal.binding.api.NotificationPublishService;
30 import org.opendaylight.controller.md.sal.binding.api.WriteTransaction;
31 import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
32 import org.opendaylight.mdsal.singleton.common.api.ClusterSingletonServiceProvider;
33 import org.opendaylight.openflowjava.protocol.api.connection.ConnectionAdapter;
34 import org.opendaylight.openflowjava.protocol.api.connection.OutboundQueueHandlerRegistration;
35 import org.opendaylight.openflowplugin.api.openflow.OFPContext;
36 import org.opendaylight.openflowplugin.api.openflow.connection.ConnectionContext;
37 import org.opendaylight.openflowplugin.api.openflow.connection.ConnectionStatus;
38 import org.opendaylight.openflowplugin.api.openflow.connection.OutboundQueueProvider;
39 import org.opendaylight.openflowplugin.api.openflow.device.DeviceContext;
40 import org.opendaylight.openflowplugin.api.openflow.device.DeviceInfo;
41 import org.opendaylight.openflowplugin.api.openflow.device.DeviceManager;
42 import org.opendaylight.openflowplugin.api.openflow.device.TranslatorLibrary;
43 import org.opendaylight.openflowplugin.api.openflow.device.handlers.DeviceInitializationPhaseHandler;
44 import org.opendaylight.openflowplugin.api.openflow.device.handlers.DeviceTerminationPhaseHandler;
45 import org.opendaylight.openflowplugin.api.openflow.lifecycle.LifecycleService;
46 import org.opendaylight.openflowplugin.api.openflow.statistics.ofpspecific.MessageSpy;
47 import org.opendaylight.openflowplugin.extension.api.ExtensionConverterProviderKeeper;
48 import org.opendaylight.openflowplugin.extension.api.core.extension.ExtensionConverterProvider;
49 import org.opendaylight.openflowplugin.impl.connection.OutboundQueueProviderImpl;
50 import org.opendaylight.openflowplugin.impl.device.listener.OpenflowProtocolListenerFullImpl;
51 import org.opendaylight.openflowplugin.impl.lifecycle.LifecycleServiceImpl;
52 import org.opendaylight.openflowplugin.openflow.md.core.sal.convertor.ConvertorExecutor;
53 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.Nodes;
54 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodesBuilder;
55 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node;
56 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
57 import org.slf4j.Logger;
58 import org.slf4j.LoggerFactory;
63 public class DeviceManagerImpl implements DeviceManager, ExtensionConverterProviderKeeper {
65 private static final Logger LOG = LoggerFactory.getLogger(DeviceManagerImpl.class);
67 private final long globalNotificationQuota;
68 private final boolean switchFeaturesMandatory;
69 private boolean isNotificationFlowRemovedOff;
70 private boolean skipTableFeatures;
71 private static final int SPY_RATE = 10;
73 private final DataBroker dataBroker;
74 private final ConvertorExecutor convertorExecutor;
75 private TranslatorLibrary translatorLibrary;
76 private DeviceInitializationPhaseHandler deviceInitPhaseHandler;
77 private DeviceTerminationPhaseHandler deviceTerminPhaseHandler;
79 private final ConcurrentMap<DeviceInfo, DeviceContext> deviceContexts = new ConcurrentHashMap<>();
80 private final ConcurrentMap<DeviceInfo, LifecycleService> lifecycleServices = new ConcurrentHashMap<>();
82 private final long barrierIntervalNanos;
83 private final int barrierCountLimit;
85 private ExtensionConverterProvider extensionConverterProvider;
86 private ScheduledThreadPoolExecutor spyPool;
87 private final ClusterSingletonServiceProvider singletonServiceProvider;
88 private final NotificationPublishService notificationPublishService;
89 private final MessageSpy messageSpy;
90 private final HashedWheelTimer hashedWheelTimer;
92 public DeviceManagerImpl(@Nonnull final DataBroker dataBroker,
93 final long globalNotificationQuota,
94 final boolean switchFeaturesMandatory,
95 final long barrierInterval,
96 final int barrierCountLimit,
97 final MessageSpy messageSpy,
98 final boolean isNotificationFlowRemovedOff,
99 final ClusterSingletonServiceProvider singletonServiceProvider,
100 final NotificationPublishService notificationPublishService,
101 final HashedWheelTimer hashedWheelTimer,
102 final ConvertorExecutor convertorExecutor,
103 final boolean skipTableFeatures) {
105 this.switchFeaturesMandatory = switchFeaturesMandatory;
106 this.globalNotificationQuota = globalNotificationQuota;
107 this.isNotificationFlowRemovedOff = isNotificationFlowRemovedOff;
108 this.skipTableFeatures = skipTableFeatures;
109 this.dataBroker = Preconditions.checkNotNull(dataBroker);
110 this.convertorExecutor = convertorExecutor;
111 this.hashedWheelTimer = hashedWheelTimer;
112 /* merge empty nodes to oper DS to predict any problems with missing parent for Node */
113 final WriteTransaction tx = dataBroker.newWriteOnlyTransaction();
115 final NodesBuilder nodesBuilder = new NodesBuilder();
116 nodesBuilder.setNode(Collections.<Node>emptyList());
117 tx.merge(LogicalDatastoreType.OPERATIONAL, InstanceIdentifier.create(Nodes.class), nodesBuilder.build());
120 } catch (ExecutionException | InterruptedException e) {
121 LOG.error("Creation of node failed.", e);
122 throw new IllegalStateException(e);
125 this.barrierIntervalNanos = TimeUnit.MILLISECONDS.toNanos(barrierInterval);
126 this.barrierCountLimit = barrierCountLimit;
128 spyPool = new ScheduledThreadPoolExecutor(1);
129 this.singletonServiceProvider = singletonServiceProvider;
130 this.notificationPublishService = notificationPublishService;
131 this.messageSpy = messageSpy;
136 public void setDeviceInitializationPhaseHandler(final DeviceInitializationPhaseHandler handler) {
137 this.deviceInitPhaseHandler = handler;
141 public void onDeviceContextLevelUp(@CheckForNull DeviceInfo deviceInfo, final LifecycleService lifecycleService) throws Exception {
142 // final phase - we have to add new Device to MD-SAL DataStore
143 LOG.debug("Final phase of DeviceContextLevelUp for Node: {} ", deviceInfo.getNodeId());
144 DeviceContext deviceContext = Preconditions.checkNotNull(deviceContexts.get(deviceInfo));
145 deviceContext.onPublished();
146 lifecycleService.registerService(this.singletonServiceProvider);
150 public ConnectionStatus deviceConnected(@CheckForNull final ConnectionContext connectionContext) throws Exception {
151 Preconditions.checkArgument(connectionContext != null);
153 DeviceInfo deviceInfo = connectionContext.getDeviceInfo();
155 * This part prevent destroy another device context. Throwing here an exception result to propagate close connection
156 * in {@link org.opendaylight.openflowplugin.impl.connection.org.opendaylight.openflowplugin.impl.connection.HandshakeContextImpl}
157 * If context already exist we are in state closing process (connection flapping) and we should not propagate connection close
159 if (deviceContexts.containsKey(deviceInfo)) {
160 DeviceContext deviceContext = deviceContexts.get(deviceInfo);
161 LOG.warn("Node {} already connected disconnecting device. Rejecting connection", deviceInfo);
162 if (!deviceContext.getState().equals(OFPContext.CONTEXT_STATE.TERMINATION)) {
163 LOG.warn("Node {} context state not in TERMINATION state.",
164 connectionContext.getDeviceInfo().getLOGValue());
165 return ConnectionStatus.ALREADY_CONNECTED;
167 return ConnectionStatus.CLOSING;
171 LOG.info("ConnectionEvent: Device connected to controller, Device:{}, NodeId:{}",
172 connectionContext.getConnectionAdapter().getRemoteAddress(), deviceInfo.getNodeId());
174 // Add Disconnect handler
175 connectionContext.setDeviceDisconnectedHandler(DeviceManagerImpl.this);
176 // Cache this for clarity
177 final ConnectionAdapter connectionAdapter = connectionContext.getConnectionAdapter();
179 //FIXME: as soon as auxiliary connection are fully supported then this is needed only before device context published
180 connectionAdapter.setPacketInFiltering(true);
182 final OutboundQueueProvider outboundQueueProvider = new OutboundQueueProviderImpl(deviceInfo.getVersion());
184 connectionContext.setOutboundQueueProvider(outboundQueueProvider);
185 final OutboundQueueHandlerRegistration<OutboundQueueProvider> outboundQueueHandlerRegistration =
186 connectionAdapter.registerOutboundQueueHandler(outboundQueueProvider, barrierCountLimit, barrierIntervalNanos);
187 connectionContext.setOutboundQueueHandleRegistration(outboundQueueHandlerRegistration);
189 final DeviceContext deviceContext = new DeviceContextImpl(
198 deviceContexts.putIfAbsent(deviceInfo, deviceContext);
200 final LifecycleService lifecycleService = new LifecycleServiceImpl();
201 lifecycleService.setDeviceContext(deviceContext);
202 deviceContext.putLifecycleServiceIntoTxChainManager(lifecycleService);
204 lifecycleServices.putIfAbsent(deviceInfo, lifecycleService);
206 deviceContext.setSwitchFeaturesMandatory(switchFeaturesMandatory);
208 ((ExtensionConverterProviderKeeper) deviceContext).setExtensionConverterProvider(extensionConverterProvider);
209 deviceContext.setNotificationPublishService(notificationPublishService);
211 updatePacketInRateLimiters();
213 final OpenflowProtocolListenerFullImpl messageListener = new OpenflowProtocolListenerFullImpl(
214 connectionAdapter, deviceContext);
216 connectionAdapter.setMessageListener(messageListener);
217 deviceInitPhaseHandler.onDeviceContextLevelUp(connectionContext.getDeviceInfo(), lifecycleService);
218 return ConnectionStatus.MAY_CONTINUE;
221 private void updatePacketInRateLimiters() {
222 synchronized (deviceContexts) {
223 final int deviceContextsSize = deviceContexts.size();
224 if (deviceContextsSize > 0) {
225 long freshNotificationLimit = globalNotificationQuota / deviceContextsSize;
226 if (freshNotificationLimit < 100) {
227 freshNotificationLimit = 100;
229 LOG.debug("fresh notification limit = {}", freshNotificationLimit);
230 for (final DeviceContext deviceContext : deviceContexts.values()) {
231 deviceContext.updatePacketInRateLimit(freshNotificationLimit);
238 public TranslatorLibrary oook() {
239 return translatorLibrary;
243 public void setTranslatorLibrary(final TranslatorLibrary translatorLibrary) {
244 this.translatorLibrary = translatorLibrary;
248 public void close() {
249 for (final Iterator<DeviceContext> iterator = Iterators.consumingIterator(deviceContexts.values().iterator());
250 iterator.hasNext();) {
251 final DeviceContext deviceCtx = iterator.next();
252 deviceCtx.shutdownConnection();
253 deviceCtx.shuttingDownDataStoreTransactions();
256 if (spyPool != null) {
257 spyPool.shutdownNow();
263 public void onDeviceContextLevelDown(final DeviceInfo deviceInfo) {
265 LifecycleService lifecycleService = lifecycleServices.remove(deviceInfo);
266 if (LOG.isDebugEnabled()) {
267 LOG.debug("Lifecycle service removed for node {}", deviceInfo.getLOGValue());
270 updatePacketInRateLimiters();
271 if (Objects.nonNull(lifecycleService)) {
273 lifecycleService.close();
274 LOG.debug("Lifecycle service successfully closed for node {}", deviceInfo.getLOGValue());
275 } catch (Exception e) {
276 LOG.warn("Closing lifecycle service for node {} was unsuccessful ", deviceInfo.getLOGValue(), e);
280 deviceContexts.remove(deviceInfo);
281 if (LOG.isDebugEnabled()) {
282 LOG.debug("Device context removed for node {}", deviceInfo.getLOGValue());
288 public void initialize() {
289 spyPool.scheduleAtFixedRate(messageSpy, SPY_RATE, SPY_RATE, TimeUnit.SECONDS);
293 public void setExtensionConverterProvider(final ExtensionConverterProvider extensionConverterProvider) {
294 this.extensionConverterProvider = extensionConverterProvider;
298 public ExtensionConverterProvider getExtensionConverterProvider() {
299 return extensionConverterProvider;
303 public void setDeviceTerminationPhaseHandler(final DeviceTerminationPhaseHandler handler) {
304 this.deviceTerminPhaseHandler = handler;
308 public void onDeviceDisconnected(final ConnectionContext connectionContext) {
309 LOG.trace("onDeviceDisconnected method call for Node: {}", connectionContext.getNodeId());
310 final DeviceInfo deviceInfo = connectionContext.getDeviceInfo();
311 final DeviceContext deviceCtx = this.deviceContexts.get(deviceInfo);
313 if (null == deviceCtx) {
314 LOG.info("DeviceContext for Node {} was not found. Connection is terminated without OFP context suite.", deviceInfo.getLOGValue());
318 if (deviceCtx.getState().equals(OFPContext.CONTEXT_STATE.TERMINATION)) {
319 LOG.debug("Device context for node {} is already is termination state, waiting for close all context");
323 deviceCtx.setState(OFPContext.CONTEXT_STATE.TERMINATION);
325 if (!connectionContext.equals(deviceCtx.getPrimaryConnectionContext())) {
326 LOG.debug("Node {} disconnected, but not primary connection.", connectionContext.getDeviceInfo().getLOGValue());
327 /* Connection is not PrimaryConnection so try to remove from Auxiliary Connections */
328 deviceCtx.removeAuxiliaryConnectionContext(connectionContext);
330 //TODO: Auxiliary connections supported ?
332 /* Device is disconnected and so we need to close TxManager */
333 final ListenableFuture<Void> future = deviceCtx.shuttingDownDataStoreTransactions();
334 Futures.addCallback(future, new FutureCallback<Void>() {
337 public void onSuccess(final Void result) {
338 LOG.debug("TxChainManager for device {} is closed successful.", deviceInfo.getLOGValue());
339 deviceTerminPhaseHandler.onDeviceContextLevelDown(deviceInfo);
343 public void onFailure(final Throwable t) {
344 LOG.warn("TxChainManager for device {} failed by closing.", deviceInfo.getLOGValue());
345 LOG.trace("TxChainManager failed by closing. ", t);
346 deviceTerminPhaseHandler.onDeviceContextLevelDown(deviceInfo);
349 /* Add timer for Close TxManager because it could fain ind cluster without notification */
350 final TimerTask timerTask = timeout -> {
351 if (!future.isDone()) {
352 LOG.warn("Shutting down TxChain for node {} not completed during 10 sec. Continue anyway.", deviceInfo.getLOGValue());
353 future.cancel(false);
356 hashedWheelTimer.newTimeout(timerTask, 10, TimeUnit.SECONDS);
361 void addDeviceContextToMap(final DeviceInfo deviceInfo, final DeviceContext deviceContext){
362 deviceContexts.put(deviceInfo, deviceContext);
366 public void setIsNotificationFlowRemovedOff(boolean isNotificationFlowRemovedOff) {
367 this.isNotificationFlowRemovedOff = isNotificationFlowRemovedOff;
371 public boolean getIsNotificationFlowRemovedOff() {
372 return this.isNotificationFlowRemovedOff;
377 public void setSkipTableFeatures(boolean skipTableFeaturesValue) {
378 skipTableFeatures = skipTableFeaturesValue;