2 * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.openflowplugin.impl.device;
10 import com.google.common.annotations.VisibleForTesting;
11 import com.google.common.base.Preconditions;
12 import com.google.common.collect.Iterators;
13 import com.google.common.util.concurrent.CheckedFuture;
14 import com.google.common.util.concurrent.FutureCallback;
15 import com.google.common.util.concurrent.Futures;
16 import com.google.common.util.concurrent.ListenableFuture;
17 import io.netty.util.HashedWheelTimer;
18 import io.netty.util.TimerTask;
19 import java.util.Collections;
20 import java.util.Iterator;
21 import java.util.Objects;
22 import java.util.Optional;
23 import java.util.concurrent.ConcurrentHashMap;
24 import java.util.concurrent.ConcurrentMap;
25 import java.util.concurrent.ExecutionException;
26 import java.util.concurrent.ScheduledThreadPoolExecutor;
27 import java.util.concurrent.TimeUnit;
28 import javax.annotation.CheckForNull;
29 import javax.annotation.Nonnull;
30 import org.opendaylight.controller.md.sal.binding.api.DataBroker;
31 import org.opendaylight.controller.md.sal.binding.api.NotificationPublishService;
32 import org.opendaylight.controller.md.sal.binding.api.WriteTransaction;
33 import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
34 import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException;
35 import org.opendaylight.mdsal.singleton.common.api.ClusterSingletonServiceProvider;
36 import org.opendaylight.openflowjava.protocol.api.connection.OutboundQueueHandlerRegistration;
37 import org.opendaylight.openflowplugin.api.openflow.OFPContext;
38 import org.opendaylight.openflowplugin.api.openflow.connection.ConnectionContext;
39 import org.opendaylight.openflowplugin.api.openflow.connection.ConnectionStatus;
40 import org.opendaylight.openflowplugin.api.openflow.connection.OutboundQueueProvider;
41 import org.opendaylight.openflowplugin.api.openflow.device.DeviceContext;
42 import org.opendaylight.openflowplugin.api.openflow.device.DeviceInfo;
43 import org.opendaylight.openflowplugin.api.openflow.device.DeviceManager;
44 import org.opendaylight.openflowplugin.api.openflow.device.TranslatorLibrary;
45 import org.opendaylight.openflowplugin.api.openflow.device.handlers.DeviceInitializationPhaseHandler;
46 import org.opendaylight.openflowplugin.api.openflow.device.handlers.DeviceTerminationPhaseHandler;
47 import org.opendaylight.openflowplugin.api.openflow.lifecycle.LifecycleService;
48 import org.opendaylight.openflowplugin.api.openflow.statistics.ofpspecific.MessageSpy;
49 import org.opendaylight.openflowplugin.extension.api.ExtensionConverterProviderKeeper;
50 import org.opendaylight.openflowplugin.extension.api.core.extension.ExtensionConverterProvider;
51 import org.opendaylight.openflowplugin.impl.connection.OutboundQueueProviderImpl;
52 import org.opendaylight.openflowplugin.impl.device.initialization.DeviceInitializerProvider;
53 import org.opendaylight.openflowplugin.impl.device.listener.OpenflowProtocolListenerFullImpl;
54 import org.opendaylight.openflowplugin.impl.services.sal.SalRoleServiceImpl;
55 import org.opendaylight.openflowplugin.openflow.md.core.sal.convertor.ConvertorExecutor;
56 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.Nodes;
57 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodesBuilder;
58 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node;
59 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
60 import org.slf4j.Logger;
61 import org.slf4j.LoggerFactory;
66 public class DeviceManagerImpl implements DeviceManager, ExtensionConverterProviderKeeper {
68 private static final Logger LOG = LoggerFactory.getLogger(DeviceManagerImpl.class);
70 private final long globalNotificationQuota;
71 private final boolean switchFeaturesMandatory;
72 private boolean isFlowRemovedNotificationOn;
73 private boolean skipTableFeatures;
74 private static final int SPY_RATE = 10;
76 private final DataBroker dataBroker;
77 private final DeviceInitializerProvider deviceInitializerProvider;
78 private final ConvertorExecutor convertorExecutor;
79 private TranslatorLibrary translatorLibrary;
80 private DeviceTerminationPhaseHandler deviceTerminPhaseHandler;
82 private final ConcurrentMap<DeviceInfo, DeviceContext> deviceContexts = new ConcurrentHashMap<>();
83 private final ConcurrentMap<DeviceInfo, LifecycleService> lifecycleServices = new ConcurrentHashMap<>();
85 private long barrierIntervalNanos;
86 private int barrierCountLimit;
88 private ExtensionConverterProvider extensionConverterProvider;
89 private ScheduledThreadPoolExecutor spyPool;
90 private final NotificationPublishService notificationPublishService;
91 private final MessageSpy messageSpy;
92 private final HashedWheelTimer hashedWheelTimer;
93 private boolean useSingleLayerSerialization;
95 public DeviceManagerImpl(@Nonnull final DataBroker dataBroker,
96 final long globalNotificationQuota,
97 final boolean switchFeaturesMandatory,
98 final long barrierInterval,
99 final int barrierCountLimit,
100 final MessageSpy messageSpy,
101 final boolean isFlowRemovedNotificationOn,
102 final ClusterSingletonServiceProvider singletonServiceProvider,
103 final NotificationPublishService notificationPublishService,
104 final HashedWheelTimer hashedWheelTimer,
105 final ConvertorExecutor convertorExecutor,
106 final boolean skipTableFeatures,
107 final boolean useSingleLayerSerialization,
108 final DeviceInitializerProvider deviceInitializerProvider) {
110 this.dataBroker = dataBroker;
111 this.deviceInitializerProvider = deviceInitializerProvider;
113 /* merge empty nodes to oper DS to predict any problems with missing parent for Node */
114 final WriteTransaction tx = dataBroker.newWriteOnlyTransaction();
115 final NodesBuilder nodesBuilder = new NodesBuilder();
116 nodesBuilder.setNode(Collections.<Node>emptyList());
117 tx.merge(LogicalDatastoreType.OPERATIONAL, InstanceIdentifier.create(Nodes.class), nodesBuilder.build());
120 } catch (ExecutionException | InterruptedException e) {
121 LOG.error("Creation of node failed.", e);
122 throw new IllegalStateException(e);
125 this.switchFeaturesMandatory = switchFeaturesMandatory;
126 this.globalNotificationQuota = globalNotificationQuota;
127 this.isFlowRemovedNotificationOn = isFlowRemovedNotificationOn;
128 this.skipTableFeatures = skipTableFeatures;
129 this.convertorExecutor = convertorExecutor;
130 this.hashedWheelTimer = hashedWheelTimer;
131 this.barrierIntervalNanos = TimeUnit.MILLISECONDS.toNanos(barrierInterval);
132 this.barrierCountLimit = barrierCountLimit;
133 this.spyPool = new ScheduledThreadPoolExecutor(1);
134 this.notificationPublishService = notificationPublishService;
135 this.messageSpy = messageSpy;
136 this.useSingleLayerSerialization = useSingleLayerSerialization;
141 public void setDeviceInitializationPhaseHandler(final DeviceInitializationPhaseHandler handler) {
145 public void onDeviceContextLevelUp(@CheckForNull DeviceInfo deviceInfo, final LifecycleService lifecycleService) throws Exception {
146 // final phase - we have to add new Device to MD-SAL DataStore
147 LOG.debug("Final phase of DeviceContextLevelUp for Node: {} ", deviceInfo.getNodeId());
148 DeviceContext deviceContext = Preconditions.checkNotNull(deviceContexts.get(deviceInfo));
149 deviceContext.onPublished();
150 lifecycleService.registerDeviceRemovedHandler(this);
154 public TranslatorLibrary oook() {
155 return translatorLibrary;
159 public void setTranslatorLibrary(final TranslatorLibrary translatorLibrary) {
160 this.translatorLibrary = translatorLibrary;
164 public void close() {
165 for (final Iterator<DeviceContext> iterator = Iterators.consumingIterator(deviceContexts.values().iterator());
166 iterator.hasNext();) {
167 final DeviceContext deviceCtx = iterator.next();
168 deviceCtx.shutdownConnection();
169 deviceCtx.shuttingDownDataStoreTransactions();
172 Optional.ofNullable(spyPool).ifPresent(ScheduledThreadPoolExecutor::shutdownNow);
178 public void onDeviceContextLevelDown(final DeviceInfo deviceInfo) {
179 updatePacketInRateLimiters();
180 Optional.ofNullable(lifecycleServices.get(deviceInfo)).ifPresent(LifecycleService::close);
184 public void initialize() {
185 spyPool.scheduleAtFixedRate(messageSpy, SPY_RATE, SPY_RATE, TimeUnit.SECONDS);
189 public void setExtensionConverterProvider(final ExtensionConverterProvider extensionConverterProvider) {
190 this.extensionConverterProvider = extensionConverterProvider;
194 public ExtensionConverterProvider getExtensionConverterProvider() {
195 return extensionConverterProvider;
199 public void setDeviceTerminationPhaseHandler(final DeviceTerminationPhaseHandler handler) {
200 this.deviceTerminPhaseHandler = handler;
204 public void onDeviceDisconnected(final ConnectionContext connectionContext) {
205 LOG.trace("onDeviceDisconnected method call for Node: {}", connectionContext.getNodeId());
206 final DeviceInfo deviceInfo = connectionContext.getDeviceInfo();
207 final DeviceContext deviceCtx = this.deviceContexts.get(deviceInfo);
209 if (Objects.isNull(deviceCtx)) {
210 LOG.info("DeviceContext for Node {} was not found. Connection is terminated without OFP context suite.", deviceInfo.getLOGValue());
214 if (!connectionContext.equals(deviceCtx.getPrimaryConnectionContext())) {
215 LOG.debug("Node {} disconnected, but not primary connection.", connectionContext.getDeviceInfo().getLOGValue());
216 // Connection is not PrimaryConnection so try to remove from Auxiliary Connections
217 deviceCtx.removeAuxiliaryConnectionContext(connectionContext);
218 // If this is not primary connection, we should not continue disabling everything
222 if (deviceCtx.getState().equals(OFPContext.CONTEXT_STATE.TERMINATION)) {
223 LOG.info("Device context for node {} is already is termination state, waiting for close all context", deviceInfo.getLOGValue());
229 // TODO: Auxiliary connections supported ?
230 // Device is disconnected and so we need to close TxManager
231 final ListenableFuture<Void> future = deviceCtx.shuttingDownDataStoreTransactions();
232 Futures.addCallback(future, new FutureCallback<Void>() {
234 public void onSuccess(final Void result) {
235 LOG.debug("TxChainManager for device {} is closed successful.", deviceInfo.getLOGValue());
236 deviceTerminPhaseHandler.onDeviceContextLevelDown(deviceInfo);
240 public void onFailure(final Throwable t) {
241 LOG.warn("TxChainManager for device {} failed by closing.", deviceInfo.getLOGValue());
242 LOG.trace("TxChainManager failed by closing. ", t);
243 deviceTerminPhaseHandler.onDeviceContextLevelDown(deviceInfo);
247 // Add timer for Close TxManager because it could fail in cluster without notification
248 final TimerTask timerTask = timeout -> {
249 if (!future.isDone()) {
250 LOG.warn("Shutting down TxChain for node {} not completed during 10 sec. Continue anyway.", deviceInfo.getLOGValue());
251 future.cancel(false);
255 hashedWheelTimer.newTimeout(timerTask, 10, TimeUnit.SECONDS);
259 void addDeviceContextToMap(final DeviceInfo deviceInfo, final DeviceContext deviceContext){
260 deviceContexts.put(deviceInfo, deviceContext);
264 public void setFlowRemovedNotificationOn(boolean isNotificationFlowRemovedOff) {
265 this.isFlowRemovedNotificationOn = isNotificationFlowRemovedOff;
269 public boolean isFlowRemovedNotificationOn() {
270 return this.isFlowRemovedNotificationOn;
275 public void setSkipTableFeatures(boolean skipTableFeaturesValue) {
276 skipTableFeatures = skipTableFeaturesValue;
280 public void setBarrierCountLimit(final int barrierCountLimit) {
281 this.barrierCountLimit = barrierCountLimit;
285 public void setBarrierInterval(final long barrierTimeoutLimit) {
286 this.barrierIntervalNanos = TimeUnit.MILLISECONDS.toNanos(barrierTimeoutLimit);
290 public CheckedFuture<Void, TransactionCommitFailedException> removeDeviceFromOperationalDS(final DeviceInfo deviceInfo) {
291 final WriteTransaction delWtx = dataBroker.newWriteOnlyTransaction();
292 delWtx.delete(LogicalDatastoreType.OPERATIONAL, deviceInfo.getNodeInstanceIdentifier());
293 final CheckedFuture<Void, TransactionCommitFailedException> delFuture = delWtx.submit();
295 Futures.addCallback(delFuture, new FutureCallback<Void>() {
297 public void onSuccess(final Void result) {
298 if (LOG.isDebugEnabled()) {
299 LOG.debug("Delete Node {} was successful", deviceInfo.getLOGValue());
304 public void onFailure(@Nonnull final Throwable t) {
305 LOG.warn("Delete node {} failed with exception {}", deviceInfo.getLOGValue(), t);
313 public void setUseSingleLayerSerialization(final Boolean useSingleLayerSerialization) {
314 this.useSingleLayerSerialization = useSingleLayerSerialization;
317 public DeviceContext createContext(@CheckForNull final ConnectionContext connectionContext) {
319 LOG.info("ConnectionEvent: Device connected to controller, Device:{}, NodeId:{}",
320 connectionContext.getConnectionAdapter().getRemoteAddress(),
321 connectionContext.getDeviceInfo().getNodeId());
323 connectionContext.getConnectionAdapter().setPacketInFiltering(true);
325 final OutboundQueueProvider outboundQueueProvider
326 = new OutboundQueueProviderImpl(connectionContext.getDeviceInfo().getVersion());
328 connectionContext.setOutboundQueueProvider(outboundQueueProvider);
329 final OutboundQueueHandlerRegistration<OutboundQueueProvider> outboundQueueHandlerRegistration =
330 connectionContext.getConnectionAdapter().registerOutboundQueueHandler(
331 outboundQueueProvider,
333 barrierIntervalNanos);
334 connectionContext.setOutboundQueueHandleRegistration(outboundQueueHandlerRegistration);
337 final DeviceContext deviceContext = new DeviceContextImpl(
346 useSingleLayerSerialization,
347 deviceInitializerProvider);
349 deviceContext.setSalRoleService(new SalRoleServiceImpl(deviceContext, deviceContext));
350 deviceContext.setSwitchFeaturesMandatory(switchFeaturesMandatory);
351 ((ExtensionConverterProviderKeeper) deviceContext).setExtensionConverterProvider(extensionConverterProvider);
352 deviceContext.setNotificationPublishService(notificationPublishService);
354 deviceContexts.put(connectionContext.getDeviceInfo(), deviceContext);
355 updatePacketInRateLimiters();
357 final OpenflowProtocolListenerFullImpl messageListener = new OpenflowProtocolListenerFullImpl(
358 connectionContext.getConnectionAdapter(), deviceContext);
360 connectionContext.getConnectionAdapter().setMessageListener(messageListener);
362 return deviceContext;
365 private void updatePacketInRateLimiters() {
366 synchronized (deviceContexts) {
367 final int deviceContextsSize = deviceContexts.size();
368 if (deviceContextsSize > 0) {
369 long freshNotificationLimit = globalNotificationQuota / deviceContextsSize;
370 if (freshNotificationLimit < 100) {
371 freshNotificationLimit = 100;
373 if (LOG.isDebugEnabled()) {
374 LOG.debug("fresh notification limit = {}", freshNotificationLimit);
376 for (final DeviceContext deviceContext : deviceContexts.values()) {
377 deviceContext.updatePacketInRateLimit(freshNotificationLimit);
383 public void onDeviceRemoved(DeviceInfo deviceInfo) {
384 deviceContexts.remove(deviceInfo);
385 LOG.debug("Device context removed for node {}", deviceInfo.getLOGValue());
387 lifecycleServices.remove(deviceInfo);
388 LOG.debug("Lifecycle service removed for node {}", deviceInfo.getLOGValue());
392 public long getBarrierIntervalNanos() {
393 return barrierIntervalNanos;
397 public int getBarrierCountLimit() {
398 return barrierCountLimit;