2 * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.openflowplugin.impl.device;
10 import com.google.common.annotations.VisibleForTesting;
11 import com.google.common.base.Preconditions;
12 import com.google.common.collect.Iterators;
13 import com.google.common.util.concurrent.CheckedFuture;
14 import com.google.common.util.concurrent.FutureCallback;
15 import com.google.common.util.concurrent.Futures;
16 import com.google.common.util.concurrent.ListenableFuture;
17 import io.netty.util.HashedWheelTimer;
18 import io.netty.util.TimerTask;
19 import java.util.Collections;
20 import java.util.Iterator;
21 import java.util.Objects;
22 import java.util.Optional;
23 import java.util.concurrent.ConcurrentHashMap;
24 import java.util.concurrent.ConcurrentMap;
25 import java.util.concurrent.ExecutionException;
26 import java.util.concurrent.ScheduledThreadPoolExecutor;
27 import java.util.concurrent.TimeUnit;
28 import javax.annotation.CheckForNull;
29 import javax.annotation.Nonnull;
30 import org.opendaylight.controller.md.sal.binding.api.DataBroker;
31 import org.opendaylight.controller.md.sal.binding.api.NotificationPublishService;
32 import org.opendaylight.controller.md.sal.binding.api.WriteTransaction;
33 import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
34 import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException;
35 import org.opendaylight.mdsal.singleton.common.api.ClusterSingletonServiceProvider;
36 import org.opendaylight.openflowjava.protocol.api.connection.OutboundQueueHandlerRegistration;
37 import org.opendaylight.openflowplugin.api.openflow.OFPContext;
38 import org.opendaylight.openflowplugin.api.openflow.connection.ConnectionContext;
39 import org.opendaylight.openflowplugin.api.openflow.connection.ConnectionStatus;
40 import org.opendaylight.openflowplugin.api.openflow.connection.OutboundQueueProvider;
41 import org.opendaylight.openflowplugin.api.openflow.device.DeviceContext;
42 import org.opendaylight.openflowplugin.api.openflow.device.DeviceInfo;
43 import org.opendaylight.openflowplugin.api.openflow.device.DeviceManager;
44 import org.opendaylight.openflowplugin.api.openflow.device.TranslatorLibrary;
45 import org.opendaylight.openflowplugin.api.openflow.device.handlers.DeviceInitializationPhaseHandler;
46 import org.opendaylight.openflowplugin.api.openflow.device.handlers.DeviceTerminationPhaseHandler;
47 import org.opendaylight.openflowplugin.api.openflow.lifecycle.LifecycleService;
48 import org.opendaylight.openflowplugin.api.openflow.statistics.ofpspecific.MessageSpy;
49 import org.opendaylight.openflowplugin.extension.api.ExtensionConverterProviderKeeper;
50 import org.opendaylight.openflowplugin.extension.api.core.extension.ExtensionConverterProvider;
51 import org.opendaylight.openflowplugin.impl.connection.OutboundQueueProviderImpl;
52 import org.opendaylight.openflowplugin.impl.device.initialization.DeviceInitializerProvider;
53 import org.opendaylight.openflowplugin.impl.device.listener.OpenflowProtocolListenerFullImpl;
54 import org.opendaylight.openflowplugin.impl.services.sal.SalRoleServiceImpl;
55 import org.opendaylight.openflowplugin.openflow.md.core.sal.convertor.ConvertorExecutor;
56 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.Nodes;
57 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodesBuilder;
58 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node;
59 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
60 import org.slf4j.Logger;
61 import org.slf4j.LoggerFactory;
66 public class DeviceManagerImpl implements DeviceManager, ExtensionConverterProviderKeeper {
68 private static final Logger LOG = LoggerFactory.getLogger(DeviceManagerImpl.class);
70 private final long globalNotificationQuota;
71 private final boolean switchFeaturesMandatory;
72 private boolean isFlowRemovedNotificationOn;
73 private boolean skipTableFeatures;
74 private static final int SPY_RATE = 10;
76 private final DataBroker dataBroker;
77 private final DeviceInitializerProvider deviceInitializerProvider;
78 private final ConvertorExecutor convertorExecutor;
79 private TranslatorLibrary translatorLibrary;
80 private DeviceInitializationPhaseHandler deviceInitPhaseHandler;
81 private DeviceTerminationPhaseHandler deviceTerminPhaseHandler;
83 private final ConcurrentMap<DeviceInfo, DeviceContext> deviceContexts = new ConcurrentHashMap<>();
84 private final ConcurrentMap<DeviceInfo, LifecycleService> lifecycleServices = new ConcurrentHashMap<>();
86 private long barrierIntervalNanos;
87 private int barrierCountLimit;
89 private ExtensionConverterProvider extensionConverterProvider;
90 private ScheduledThreadPoolExecutor spyPool;
91 private final NotificationPublishService notificationPublishService;
92 private final MessageSpy messageSpy;
93 private final HashedWheelTimer hashedWheelTimer;
94 private boolean useSingleLayerSerialization;
96 public DeviceManagerImpl(@Nonnull final DataBroker dataBroker,
97 final long globalNotificationQuota,
98 final boolean switchFeaturesMandatory,
99 final long barrierInterval,
100 final int barrierCountLimit,
101 final MessageSpy messageSpy,
102 final boolean isFlowRemovedNotificationOn,
103 final ClusterSingletonServiceProvider singletonServiceProvider,
104 final NotificationPublishService notificationPublishService,
105 final HashedWheelTimer hashedWheelTimer,
106 final ConvertorExecutor convertorExecutor,
107 final boolean skipTableFeatures,
108 final boolean useSingleLayerSerialization,
109 final DeviceInitializerProvider deviceInitializerProvider) {
111 this.dataBroker = dataBroker;
112 this.deviceInitializerProvider = deviceInitializerProvider;
114 /* merge empty nodes to oper DS to predict any problems with missing parent for Node */
115 final WriteTransaction tx = dataBroker.newWriteOnlyTransaction();
116 final NodesBuilder nodesBuilder = new NodesBuilder();
117 nodesBuilder.setNode(Collections.<Node>emptyList());
118 tx.merge(LogicalDatastoreType.OPERATIONAL, InstanceIdentifier.create(Nodes.class), nodesBuilder.build());
121 } catch (ExecutionException | InterruptedException e) {
122 LOG.error("Creation of node failed.", e);
123 throw new IllegalStateException(e);
126 this.switchFeaturesMandatory = switchFeaturesMandatory;
127 this.globalNotificationQuota = globalNotificationQuota;
128 this.isFlowRemovedNotificationOn = isFlowRemovedNotificationOn;
129 this.skipTableFeatures = skipTableFeatures;
130 this.convertorExecutor = convertorExecutor;
131 this.hashedWheelTimer = hashedWheelTimer;
132 this.barrierIntervalNanos = TimeUnit.MILLISECONDS.toNanos(barrierInterval);
133 this.barrierCountLimit = barrierCountLimit;
134 this.spyPool = new ScheduledThreadPoolExecutor(1);
135 this.notificationPublishService = notificationPublishService;
136 this.messageSpy = messageSpy;
137 this.useSingleLayerSerialization = useSingleLayerSerialization;
142 public void setDeviceInitializationPhaseHandler(final DeviceInitializationPhaseHandler handler) {
143 this.deviceInitPhaseHandler = handler;
147 public void onDeviceContextLevelUp(@CheckForNull DeviceInfo deviceInfo, final LifecycleService lifecycleService) throws Exception {
148 // final phase - we have to add new Device to MD-SAL DataStore
149 LOG.debug("Final phase of DeviceContextLevelUp for Node: {} ", deviceInfo.getNodeId());
150 DeviceContext deviceContext = Preconditions.checkNotNull(deviceContexts.get(deviceInfo));
151 deviceContext.onPublished();
152 lifecycleService.registerDeviceRemovedHandler(this);
156 public ConnectionStatus deviceConnected(@CheckForNull final ConnectionContext connectionContext) throws Exception {
157 return ConnectionStatus.MAY_CONTINUE;
161 public TranslatorLibrary oook() {
162 return translatorLibrary;
166 public void setTranslatorLibrary(final TranslatorLibrary translatorLibrary) {
167 this.translatorLibrary = translatorLibrary;
171 public void close() {
172 for (final Iterator<DeviceContext> iterator = Iterators.consumingIterator(deviceContexts.values().iterator());
173 iterator.hasNext();) {
174 final DeviceContext deviceCtx = iterator.next();
175 deviceCtx.shutdownConnection();
176 deviceCtx.shuttingDownDataStoreTransactions();
179 Optional.ofNullable(spyPool).ifPresent(ScheduledThreadPoolExecutor::shutdownNow);
185 public void onDeviceContextLevelDown(final DeviceInfo deviceInfo) {
186 updatePacketInRateLimiters();
187 Optional.ofNullable(lifecycleServices.get(deviceInfo)).ifPresent(LifecycleService::close);
191 public void initialize() {
192 spyPool.scheduleAtFixedRate(messageSpy, SPY_RATE, SPY_RATE, TimeUnit.SECONDS);
196 public void setExtensionConverterProvider(final ExtensionConverterProvider extensionConverterProvider) {
197 this.extensionConverterProvider = extensionConverterProvider;
201 public ExtensionConverterProvider getExtensionConverterProvider() {
202 return extensionConverterProvider;
206 public void setDeviceTerminationPhaseHandler(final DeviceTerminationPhaseHandler handler) {
207 this.deviceTerminPhaseHandler = handler;
211 public void onDeviceDisconnected(final ConnectionContext connectionContext) {
212 LOG.trace("onDeviceDisconnected method call for Node: {}", connectionContext.getNodeId());
213 final DeviceInfo deviceInfo = connectionContext.getDeviceInfo();
214 final DeviceContext deviceCtx = this.deviceContexts.get(deviceInfo);
216 if (Objects.isNull(deviceCtx)) {
217 LOG.info("DeviceContext for Node {} was not found. Connection is terminated without OFP context suite.", deviceInfo.getLOGValue());
221 if (!connectionContext.equals(deviceCtx.getPrimaryConnectionContext())) {
222 LOG.debug("Node {} disconnected, but not primary connection.", connectionContext.getDeviceInfo().getLOGValue());
223 // Connection is not PrimaryConnection so try to remove from Auxiliary Connections
224 deviceCtx.removeAuxiliaryConnectionContext(connectionContext);
225 // If this is not primary connection, we should not continue disabling everything
229 if (deviceCtx.getState().equals(OFPContext.CONTEXT_STATE.TERMINATION)) {
230 LOG.info("Device context for node {} is already is termination state, waiting for close all context", deviceInfo.getLOGValue());
236 // TODO: Auxiliary connections supported ?
237 // Device is disconnected and so we need to close TxManager
238 final ListenableFuture<Void> future = deviceCtx.shuttingDownDataStoreTransactions();
239 Futures.addCallback(future, new FutureCallback<Void>() {
241 public void onSuccess(final Void result) {
242 LOG.debug("TxChainManager for device {} is closed successful.", deviceInfo.getLOGValue());
243 deviceTerminPhaseHandler.onDeviceContextLevelDown(deviceInfo);
247 public void onFailure(final Throwable t) {
248 LOG.warn("TxChainManager for device {} failed by closing.", deviceInfo.getLOGValue());
249 LOG.trace("TxChainManager failed by closing. ", t);
250 deviceTerminPhaseHandler.onDeviceContextLevelDown(deviceInfo);
254 // Add timer for Close TxManager because it could fail in cluster without notification
255 final TimerTask timerTask = timeout -> {
256 if (!future.isDone()) {
257 LOG.warn("Shutting down TxChain for node {} not completed during 10 sec. Continue anyway.", deviceInfo.getLOGValue());
258 future.cancel(false);
262 hashedWheelTimer.newTimeout(timerTask, 10, TimeUnit.SECONDS);
266 void addDeviceContextToMap(final DeviceInfo deviceInfo, final DeviceContext deviceContext){
267 deviceContexts.put(deviceInfo, deviceContext);
271 public void setFlowRemovedNotificationOn(boolean isNotificationFlowRemovedOff) {
272 this.isFlowRemovedNotificationOn = isNotificationFlowRemovedOff;
276 public boolean isFlowRemovedNotificationOn() {
277 return this.isFlowRemovedNotificationOn;
282 public void setSkipTableFeatures(boolean skipTableFeaturesValue) {
283 skipTableFeatures = skipTableFeaturesValue;
287 public void setBarrierCountLimit(final int barrierCountLimit) {
288 this.barrierCountLimit = barrierCountLimit;
292 public void setBarrierInterval(final long barrierTimeoutLimit) {
293 this.barrierIntervalNanos = TimeUnit.MILLISECONDS.toNanos(barrierTimeoutLimit);
297 public CheckedFuture<Void, TransactionCommitFailedException> removeDeviceFromOperationalDS(final DeviceInfo deviceInfo) {
298 final WriteTransaction delWtx = dataBroker.newWriteOnlyTransaction();
299 delWtx.delete(LogicalDatastoreType.OPERATIONAL, deviceInfo.getNodeInstanceIdentifier());
300 final CheckedFuture<Void, TransactionCommitFailedException> delFuture = delWtx.submit();
302 Futures.addCallback(delFuture, new FutureCallback<Void>() {
304 public void onSuccess(final Void result) {
305 if (LOG.isDebugEnabled()) {
306 LOG.debug("Delete Node {} was successful", deviceInfo.getLOGValue());
311 public void onFailure(@Nonnull final Throwable t) {
312 LOG.warn("Delete node {} failed with exception {}", deviceInfo.getLOGValue(), t);
320 public void setUseSingleLayerSerialization(final Boolean useSingleLayerSerialization) {
321 this.useSingleLayerSerialization = useSingleLayerSerialization;
324 public DeviceContext createContext(@CheckForNull final ConnectionContext connectionContext) {
326 LOG.info("ConnectionEvent: Device connected to controller, Device:{}, NodeId:{}",
327 connectionContext.getConnectionAdapter().getRemoteAddress(),
328 connectionContext.getDeviceInfo().getNodeId());
330 connectionContext.getConnectionAdapter().setPacketInFiltering(true);
332 final OutboundQueueProvider outboundQueueProvider
333 = new OutboundQueueProviderImpl(connectionContext.getDeviceInfo().getVersion());
335 connectionContext.setOutboundQueueProvider(outboundQueueProvider);
336 final OutboundQueueHandlerRegistration<OutboundQueueProvider> outboundQueueHandlerRegistration =
337 connectionContext.getConnectionAdapter().registerOutboundQueueHandler(
338 outboundQueueProvider,
340 barrierIntervalNanos);
341 connectionContext.setOutboundQueueHandleRegistration(outboundQueueHandlerRegistration);
344 final DeviceContext deviceContext = new DeviceContextImpl(
353 useSingleLayerSerialization,
354 deviceInitializerProvider);
356 deviceContext.setSalRoleService(new SalRoleServiceImpl(deviceContext, deviceContext));
357 deviceContext.setSwitchFeaturesMandatory(switchFeaturesMandatory);
358 ((ExtensionConverterProviderKeeper) deviceContext).setExtensionConverterProvider(extensionConverterProvider);
359 deviceContext.setNotificationPublishService(notificationPublishService);
361 deviceContexts.put(connectionContext.getDeviceInfo(), deviceContext);
362 updatePacketInRateLimiters();
364 final OpenflowProtocolListenerFullImpl messageListener = new OpenflowProtocolListenerFullImpl(
365 connectionContext.getConnectionAdapter(), deviceContext);
367 connectionContext.getConnectionAdapter().setMessageListener(messageListener);
369 return deviceContext;
372 private void updatePacketInRateLimiters() {
373 synchronized (deviceContexts) {
374 final int deviceContextsSize = deviceContexts.size();
375 if (deviceContextsSize > 0) {
376 long freshNotificationLimit = globalNotificationQuota / deviceContextsSize;
377 if (freshNotificationLimit < 100) {
378 freshNotificationLimit = 100;
380 if (LOG.isDebugEnabled()) {
381 LOG.debug("fresh notification limit = {}", freshNotificationLimit);
383 for (final DeviceContext deviceContext : deviceContexts.values()) {
384 deviceContext.updatePacketInRateLimit(freshNotificationLimit);
390 public void onDeviceRemoved(DeviceInfo deviceInfo) {
391 deviceContexts.remove(deviceInfo);
392 LOG.debug("Device context removed for node {}", deviceInfo.getLOGValue());
394 lifecycleServices.remove(deviceInfo);
395 LOG.debug("Lifecycle service removed for node {}", deviceInfo.getLOGValue());
399 public long getBarrierIntervalNanos() {
400 return barrierIntervalNanos;
404 public int getBarrierCountLimit() {
405 return barrierCountLimit;