2 * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.openflowplugin.impl.device;
10 import com.google.common.annotations.VisibleForTesting;
11 import com.google.common.base.Preconditions;
12 import com.google.common.collect.Iterators;
13 import com.google.common.util.concurrent.CheckedFuture;
14 import com.google.common.util.concurrent.FutureCallback;
15 import com.google.common.util.concurrent.Futures;
16 import com.google.common.util.concurrent.ListenableFuture;
17 import io.netty.util.HashedWheelTimer;
18 import io.netty.util.TimerTask;
19 import java.util.Collections;
20 import java.util.Iterator;
21 import java.util.Objects;
22 import java.util.Optional;
23 import java.util.concurrent.ConcurrentHashMap;
24 import java.util.concurrent.ConcurrentMap;
25 import java.util.concurrent.ExecutionException;
26 import java.util.concurrent.ScheduledThreadPoolExecutor;
27 import java.util.concurrent.TimeUnit;
28 import javax.annotation.CheckForNull;
29 import javax.annotation.Nonnull;
30 import org.opendaylight.controller.md.sal.binding.api.DataBroker;
31 import org.opendaylight.controller.md.sal.binding.api.NotificationPublishService;
32 import org.opendaylight.controller.md.sal.binding.api.WriteTransaction;
33 import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
34 import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException;
35 import org.opendaylight.mdsal.singleton.common.api.ClusterSingletonServiceProvider;
36 import org.opendaylight.openflowjava.protocol.api.connection.OutboundQueueHandlerRegistration;
37 import org.opendaylight.openflowplugin.api.openflow.OFPContext;
38 import org.opendaylight.openflowplugin.api.openflow.connection.ConnectionContext;
39 import org.opendaylight.openflowplugin.api.openflow.connection.OutboundQueueProvider;
40 import org.opendaylight.openflowplugin.api.openflow.device.DeviceContext;
41 import org.opendaylight.openflowplugin.api.openflow.device.DeviceInfo;
42 import org.opendaylight.openflowplugin.api.openflow.device.DeviceManager;
43 import org.opendaylight.openflowplugin.api.openflow.device.TranslatorLibrary;
44 import org.opendaylight.openflowplugin.api.openflow.device.handlers.DeviceInitializationPhaseHandler;
45 import org.opendaylight.openflowplugin.api.openflow.device.handlers.DeviceTerminationPhaseHandler;
46 import org.opendaylight.openflowplugin.api.openflow.lifecycle.LifecycleService;
47 import org.opendaylight.openflowplugin.api.openflow.statistics.ofpspecific.MessageSpy;
48 import org.opendaylight.openflowplugin.extension.api.ExtensionConverterProviderKeeper;
49 import org.opendaylight.openflowplugin.extension.api.core.extension.ExtensionConverterProvider;
50 import org.opendaylight.openflowplugin.impl.connection.OutboundQueueProviderImpl;
51 import org.opendaylight.openflowplugin.impl.device.initialization.DeviceInitializerProvider;
52 import org.opendaylight.openflowplugin.impl.device.listener.OpenflowProtocolListenerFullImpl;
53 import org.opendaylight.openflowplugin.impl.services.sal.SalRoleServiceImpl;
54 import org.opendaylight.openflowplugin.openflow.md.core.sal.convertor.ConvertorExecutor;
55 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.Nodes;
56 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodesBuilder;
57 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node;
58 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
59 import org.slf4j.Logger;
60 import org.slf4j.LoggerFactory;
65 public class DeviceManagerImpl implements DeviceManager, ExtensionConverterProviderKeeper {
67 private static final Logger LOG = LoggerFactory.getLogger(DeviceManagerImpl.class);
69 private final long globalNotificationQuota;
70 private final boolean switchFeaturesMandatory;
71 private boolean isFlowRemovedNotificationOn;
72 private boolean skipTableFeatures;
73 private static final int SPY_RATE = 10;
75 private final DataBroker dataBroker;
76 private final DeviceInitializerProvider deviceInitializerProvider;
77 private final ConvertorExecutor convertorExecutor;
78 private TranslatorLibrary translatorLibrary;
79 private DeviceTerminationPhaseHandler deviceTerminPhaseHandler;
81 private final ConcurrentMap<DeviceInfo, DeviceContext> deviceContexts = new ConcurrentHashMap<>();
82 private final ConcurrentMap<DeviceInfo, LifecycleService> lifecycleServices = new ConcurrentHashMap<>();
84 private long barrierIntervalNanos;
85 private int barrierCountLimit;
87 private ExtensionConverterProvider extensionConverterProvider;
88 private ScheduledThreadPoolExecutor spyPool;
89 private final NotificationPublishService notificationPublishService;
90 private final MessageSpy messageSpy;
91 private final HashedWheelTimer hashedWheelTimer;
92 private boolean useSingleLayerSerialization;
94 public DeviceManagerImpl(@Nonnull final DataBroker dataBroker,
95 final long globalNotificationQuota,
96 final boolean switchFeaturesMandatory,
97 final long barrierInterval,
98 final int barrierCountLimit,
99 final MessageSpy messageSpy,
100 final boolean isFlowRemovedNotificationOn,
101 final ClusterSingletonServiceProvider singletonServiceProvider,
102 final NotificationPublishService notificationPublishService,
103 final HashedWheelTimer hashedWheelTimer,
104 final ConvertorExecutor convertorExecutor,
105 final boolean skipTableFeatures,
106 final boolean useSingleLayerSerialization,
107 final DeviceInitializerProvider deviceInitializerProvider) {
109 this.dataBroker = dataBroker;
110 this.deviceInitializerProvider = deviceInitializerProvider;
112 /* merge empty nodes to oper DS to predict any problems with missing parent for Node */
113 final WriteTransaction tx = dataBroker.newWriteOnlyTransaction();
114 final NodesBuilder nodesBuilder = new NodesBuilder();
115 nodesBuilder.setNode(Collections.<Node>emptyList());
116 tx.merge(LogicalDatastoreType.OPERATIONAL, InstanceIdentifier.create(Nodes.class), nodesBuilder.build());
119 } catch (ExecutionException | InterruptedException e) {
120 LOG.error("Creation of node failed.", e);
121 throw new IllegalStateException(e);
124 this.switchFeaturesMandatory = switchFeaturesMandatory;
125 this.globalNotificationQuota = globalNotificationQuota;
126 this.isFlowRemovedNotificationOn = isFlowRemovedNotificationOn;
127 this.skipTableFeatures = skipTableFeatures;
128 this.convertorExecutor = convertorExecutor;
129 this.hashedWheelTimer = hashedWheelTimer;
130 this.barrierIntervalNanos = TimeUnit.MILLISECONDS.toNanos(barrierInterval);
131 this.barrierCountLimit = barrierCountLimit;
132 this.spyPool = new ScheduledThreadPoolExecutor(1);
133 this.notificationPublishService = notificationPublishService;
134 this.messageSpy = messageSpy;
135 this.useSingleLayerSerialization = useSingleLayerSerialization;
140 public void setDeviceInitializationPhaseHandler(final DeviceInitializationPhaseHandler handler) {
144 public void onDeviceContextLevelUp(@CheckForNull DeviceInfo deviceInfo, final LifecycleService lifecycleService) throws Exception {
145 // final phase - we have to add new Device to MD-SAL DataStore
146 LOG.debug("Final phase of DeviceContextLevelUp for Node: {} ", deviceInfo.getNodeId());
147 DeviceContext deviceContext = Preconditions.checkNotNull(deviceContexts.get(deviceInfo));
148 deviceContext.onPublished();
149 lifecycleService.registerDeviceRemovedHandler(this);
153 public TranslatorLibrary oook() {
154 return translatorLibrary;
158 public void setTranslatorLibrary(final TranslatorLibrary translatorLibrary) {
159 this.translatorLibrary = translatorLibrary;
163 public void close() {
164 for (final Iterator<DeviceContext> iterator = Iterators.consumingIterator(deviceContexts.values().iterator());
165 iterator.hasNext();) {
166 final DeviceContext deviceCtx = iterator.next();
167 deviceCtx.shutdownConnection();
168 deviceCtx.shuttingDownDataStoreTransactions();
171 Optional.ofNullable(spyPool).ifPresent(ScheduledThreadPoolExecutor::shutdownNow);
177 public void onDeviceContextLevelDown(final DeviceInfo deviceInfo) {
178 updatePacketInRateLimiters();
179 Optional.ofNullable(lifecycleServices.get(deviceInfo)).ifPresent(LifecycleService::close);
183 public void initialize() {
184 spyPool.scheduleAtFixedRate(messageSpy, SPY_RATE, SPY_RATE, TimeUnit.SECONDS);
188 public void setExtensionConverterProvider(final ExtensionConverterProvider extensionConverterProvider) {
189 this.extensionConverterProvider = extensionConverterProvider;
193 public ExtensionConverterProvider getExtensionConverterProvider() {
194 return extensionConverterProvider;
198 public void setDeviceTerminationPhaseHandler(final DeviceTerminationPhaseHandler handler) {
199 this.deviceTerminPhaseHandler = handler;
203 public void onDeviceDisconnected(final ConnectionContext connectionContext) {
204 LOG.trace("onDeviceDisconnected method call for Node: {}", connectionContext.getNodeId());
205 final DeviceInfo deviceInfo = connectionContext.getDeviceInfo();
206 final DeviceContext deviceCtx = this.deviceContexts.get(deviceInfo);
208 if (Objects.isNull(deviceCtx)) {
209 LOG.info("DeviceContext for Node {} was not found. Connection is terminated without OFP context suite.", deviceInfo.getLOGValue());
213 if (!connectionContext.equals(deviceCtx.getPrimaryConnectionContext())) {
214 LOG.debug("Node {} disconnected, but not primary connection.", connectionContext.getDeviceInfo().getLOGValue());
215 // Connection is not PrimaryConnection so try to remove from Auxiliary Connections
216 deviceCtx.removeAuxiliaryConnectionContext(connectionContext);
217 // If this is not primary connection, we should not continue disabling everything
221 if (deviceCtx.getState().equals(OFPContext.CONTEXT_STATE.TERMINATION)) {
222 LOG.info("Device context for node {} is already is termination state, waiting for close all context", deviceInfo.getLOGValue());
228 // TODO: Auxiliary connections supported ?
229 // Device is disconnected and so we need to close TxManager
230 final ListenableFuture<Void> future = deviceCtx.shuttingDownDataStoreTransactions();
231 Futures.addCallback(future, new FutureCallback<Void>() {
233 public void onSuccess(final Void result) {
234 LOG.debug("TxChainManager for device {} is closed successful.", deviceInfo.getLOGValue());
235 deviceTerminPhaseHandler.onDeviceContextLevelDown(deviceInfo);
239 public void onFailure(final Throwable t) {
240 LOG.warn("TxChainManager for device {} failed by closing.", deviceInfo.getLOGValue());
241 LOG.trace("TxChainManager failed by closing. ", t);
242 deviceTerminPhaseHandler.onDeviceContextLevelDown(deviceInfo);
246 // Add timer for Close TxManager because it could fail in cluster without notification
247 final TimerTask timerTask = timeout -> {
248 if (!future.isDone()) {
249 LOG.warn("Shutting down TxChain for node {} not completed during 10 sec. Continue anyway.", deviceInfo.getLOGValue());
250 future.cancel(false);
254 hashedWheelTimer.newTimeout(timerTask, 10, TimeUnit.SECONDS);
258 void addDeviceContextToMap(final DeviceInfo deviceInfo, final DeviceContext deviceContext){
259 deviceContexts.put(deviceInfo, deviceContext);
263 public void setFlowRemovedNotificationOn(boolean isNotificationFlowRemovedOff) {
264 this.isFlowRemovedNotificationOn = isNotificationFlowRemovedOff;
268 public boolean isFlowRemovedNotificationOn() {
269 return this.isFlowRemovedNotificationOn;
274 public void setSkipTableFeatures(boolean skipTableFeaturesValue) {
275 skipTableFeatures = skipTableFeaturesValue;
279 public void setBarrierCountLimit(final int barrierCountLimit) {
280 this.barrierCountLimit = barrierCountLimit;
284 public void setBarrierInterval(final long barrierTimeoutLimit) {
285 this.barrierIntervalNanos = TimeUnit.MILLISECONDS.toNanos(barrierTimeoutLimit);
289 public CheckedFuture<Void, TransactionCommitFailedException> removeDeviceFromOperationalDS(final DeviceInfo deviceInfo) {
290 final WriteTransaction delWtx = dataBroker.newWriteOnlyTransaction();
291 delWtx.delete(LogicalDatastoreType.OPERATIONAL, deviceInfo.getNodeInstanceIdentifier());
292 final CheckedFuture<Void, TransactionCommitFailedException> delFuture = delWtx.submit();
294 Futures.addCallback(delFuture, new FutureCallback<Void>() {
296 public void onSuccess(final Void result) {
297 if (LOG.isDebugEnabled()) {
298 LOG.debug("Delete Node {} was successful", deviceInfo.getLOGValue());
303 public void onFailure(@Nonnull final Throwable t) {
304 LOG.warn("Delete node {} failed with exception {}", deviceInfo.getLOGValue(), t);
312 public void setUseSingleLayerSerialization(final Boolean useSingleLayerSerialization) {
313 this.useSingleLayerSerialization = useSingleLayerSerialization;
316 public DeviceContext createContext(@CheckForNull final ConnectionContext connectionContext) {
318 LOG.info("ConnectionEvent: Device connected to controller, Device:{}, NodeId:{}",
319 connectionContext.getConnectionAdapter().getRemoteAddress(),
320 connectionContext.getDeviceInfo().getNodeId());
322 connectionContext.getConnectionAdapter().setPacketInFiltering(true);
324 final OutboundQueueProvider outboundQueueProvider
325 = new OutboundQueueProviderImpl(connectionContext.getDeviceInfo().getVersion());
327 connectionContext.setOutboundQueueProvider(outboundQueueProvider);
328 final OutboundQueueHandlerRegistration<OutboundQueueProvider> outboundQueueHandlerRegistration =
329 connectionContext.getConnectionAdapter().registerOutboundQueueHandler(
330 outboundQueueProvider,
332 barrierIntervalNanos);
333 connectionContext.setOutboundQueueHandleRegistration(outboundQueueHandlerRegistration);
336 final DeviceContext deviceContext = new DeviceContextImpl(
345 useSingleLayerSerialization,
346 deviceInitializerProvider);
348 deviceContext.setSalRoleService(new SalRoleServiceImpl(deviceContext, deviceContext));
349 deviceContext.setSwitchFeaturesMandatory(switchFeaturesMandatory);
350 ((ExtensionConverterProviderKeeper) deviceContext).setExtensionConverterProvider(extensionConverterProvider);
351 deviceContext.setNotificationPublishService(notificationPublishService);
353 deviceContexts.put(connectionContext.getDeviceInfo(), deviceContext);
354 updatePacketInRateLimiters();
356 final OpenflowProtocolListenerFullImpl messageListener = new OpenflowProtocolListenerFullImpl(
357 connectionContext.getConnectionAdapter(), deviceContext);
359 connectionContext.getConnectionAdapter().setMessageListener(messageListener);
361 return deviceContext;
364 private void updatePacketInRateLimiters() {
365 synchronized (deviceContexts) {
366 final int deviceContextsSize = deviceContexts.size();
367 if (deviceContextsSize > 0) {
368 long freshNotificationLimit = globalNotificationQuota / deviceContextsSize;
369 if (freshNotificationLimit < 100) {
370 freshNotificationLimit = 100;
372 if (LOG.isDebugEnabled()) {
373 LOG.debug("fresh notification limit = {}", freshNotificationLimit);
375 for (final DeviceContext deviceContext : deviceContexts.values()) {
376 deviceContext.updatePacketInRateLimit(freshNotificationLimit);
383 public void onDeviceRemoved(final DeviceInfo deviceInfo) {
384 deviceContexts.remove(deviceInfo);
385 if (LOG.isDebugEnabled()) {
386 LOG.debug("Device context removed for node {}", deviceInfo.getLOGValue());
388 this.updatePacketInRateLimiters();
392 public long getBarrierIntervalNanos() {
393 return barrierIntervalNanos;
397 public int getBarrierCountLimit() {
398 return barrierCountLimit;