2 * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.openflowplugin.impl.device;
10 import com.google.common.annotations.VisibleForTesting;
11 import com.google.common.base.Preconditions;
12 import com.google.common.collect.Iterators;
13 import com.google.common.util.concurrent.FutureCallback;
14 import com.google.common.util.concurrent.Futures;
15 import com.google.common.util.concurrent.ListenableFuture;
16 import io.netty.util.HashedWheelTimer;
17 import io.netty.util.TimerTask;
18 import java.util.Collections;
19 import java.util.Iterator;
20 import java.util.Objects;
21 import java.util.concurrent.ConcurrentHashMap;
22 import java.util.concurrent.ConcurrentMap;
23 import java.util.concurrent.ExecutionException;
24 import java.util.concurrent.ScheduledThreadPoolExecutor;
25 import java.util.concurrent.TimeUnit;
26 import javax.annotation.CheckForNull;
27 import javax.annotation.Nonnull;
28 import org.opendaylight.controller.md.sal.binding.api.DataBroker;
29 import org.opendaylight.controller.md.sal.binding.api.NotificationPublishService;
30 import org.opendaylight.controller.md.sal.binding.api.WriteTransaction;
31 import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
32 import org.opendaylight.mdsal.singleton.common.api.ClusterSingletonServiceProvider;
33 import org.opendaylight.openflowjava.protocol.api.connection.ConnectionAdapter;
34 import org.opendaylight.openflowjava.protocol.api.connection.OutboundQueueHandlerRegistration;
35 import org.opendaylight.openflowplugin.api.openflow.OFPContext;
36 import org.opendaylight.openflowplugin.api.openflow.connection.ConnectionContext;
37 import org.opendaylight.openflowplugin.api.openflow.connection.ConnectionStatus;
38 import org.opendaylight.openflowplugin.api.openflow.connection.OutboundQueueProvider;
39 import org.opendaylight.openflowplugin.api.openflow.device.DeviceContext;
40 import org.opendaylight.openflowplugin.api.openflow.device.DeviceInfo;
41 import org.opendaylight.openflowplugin.api.openflow.device.DeviceManager;
42 import org.opendaylight.openflowplugin.api.openflow.device.TranslatorLibrary;
43 import org.opendaylight.openflowplugin.api.openflow.device.handlers.DeviceInitializationPhaseHandler;
44 import org.opendaylight.openflowplugin.api.openflow.device.handlers.DeviceTerminationPhaseHandler;
45 import org.opendaylight.openflowplugin.api.openflow.lifecycle.LifecycleService;
46 import org.opendaylight.openflowplugin.api.openflow.statistics.ofpspecific.MessageSpy;
47 import org.opendaylight.openflowplugin.extension.api.ExtensionConverterProviderKeeper;
48 import org.opendaylight.openflowplugin.extension.api.core.extension.ExtensionConverterProvider;
49 import org.opendaylight.openflowplugin.impl.connection.OutboundQueueProviderImpl;
50 import org.opendaylight.openflowplugin.impl.device.listener.OpenflowProtocolListenerFullImpl;
51 import org.opendaylight.openflowplugin.impl.lifecycle.LifecycleServiceImpl;
52 import org.opendaylight.openflowplugin.openflow.md.core.sal.convertor.ConvertorExecutor;
53 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.Nodes;
54 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodesBuilder;
55 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node;
56 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
57 import org.slf4j.Logger;
58 import org.slf4j.LoggerFactory;
63 public class DeviceManagerImpl implements DeviceManager, ExtensionConverterProviderKeeper {
65 private static final Logger LOG = LoggerFactory.getLogger(DeviceManagerImpl.class);
67 private final long globalNotificationQuota;
68 private final boolean switchFeaturesMandatory;
69 private boolean isNotificationFlowRemovedOff;
70 private boolean skipTableFeatures;
71 private static final int SPY_RATE = 10;
73 private final DataBroker dataBroker;
74 private final ConvertorExecutor convertorExecutor;
75 private TranslatorLibrary translatorLibrary;
76 private DeviceInitializationPhaseHandler deviceInitPhaseHandler;
77 private DeviceTerminationPhaseHandler deviceTerminPhaseHandler;
79 private final ConcurrentMap<DeviceInfo, DeviceContext> deviceContexts = new ConcurrentHashMap<>();
80 private final ConcurrentMap<DeviceInfo, LifecycleService> lifecycleServices = new ConcurrentHashMap<>();
82 private final long barrierIntervalNanos;
83 private final int barrierCountLimit;
85 private ExtensionConverterProvider extensionConverterProvider;
86 private ScheduledThreadPoolExecutor spyPool;
87 private final ClusterSingletonServiceProvider singletonServiceProvider;
88 private final NotificationPublishService notificationPublishService;
89 private final MessageSpy messageSpy;
90 private final HashedWheelTimer hashedWheelTimer;
92 public DeviceManagerImpl(@Nonnull final DataBroker dataBroker,
93 final long globalNotificationQuota,
94 final boolean switchFeaturesMandatory,
95 final long barrierInterval,
96 final int barrierCountLimit,
97 final MessageSpy messageSpy,
98 final boolean isNotificationFlowRemovedOff,
99 final ClusterSingletonServiceProvider singletonServiceProvider,
100 final NotificationPublishService notificationPublishService,
101 final HashedWheelTimer hashedWheelTimer,
102 final ConvertorExecutor convertorExecutor,
103 final boolean skipTableFeatures) {
105 this.switchFeaturesMandatory = switchFeaturesMandatory;
106 this.globalNotificationQuota = globalNotificationQuota;
107 this.isNotificationFlowRemovedOff = isNotificationFlowRemovedOff;
108 this.skipTableFeatures = skipTableFeatures;
109 this.dataBroker = Preconditions.checkNotNull(dataBroker);
110 this.convertorExecutor = convertorExecutor;
111 this.hashedWheelTimer = hashedWheelTimer;
112 /* merge empty nodes to oper DS to predict any problems with missing parent for Node */
113 final WriteTransaction tx = dataBroker.newWriteOnlyTransaction();
115 final NodesBuilder nodesBuilder = new NodesBuilder();
116 nodesBuilder.setNode(Collections.<Node>emptyList());
117 tx.merge(LogicalDatastoreType.OPERATIONAL, InstanceIdentifier.create(Nodes.class), nodesBuilder.build());
120 } catch (ExecutionException | InterruptedException e) {
121 LOG.error("Creation of node failed.", e);
122 throw new IllegalStateException(e);
125 this.barrierIntervalNanos = TimeUnit.MILLISECONDS.toNanos(barrierInterval);
126 this.barrierCountLimit = barrierCountLimit;
128 spyPool = new ScheduledThreadPoolExecutor(1);
129 this.singletonServiceProvider = singletonServiceProvider;
130 this.notificationPublishService = notificationPublishService;
131 this.messageSpy = messageSpy;
136 public void setDeviceInitializationPhaseHandler(final DeviceInitializationPhaseHandler handler) {
137 this.deviceInitPhaseHandler = handler;
141 public void onDeviceContextLevelUp(@CheckForNull DeviceInfo deviceInfo, final LifecycleService lifecycleService) throws Exception {
142 // final phase - we have to add new Device to MD-SAL DataStore
143 LOG.debug("Final phase of DeviceContextLevelUp for Node: {} ", deviceInfo.getNodeId());
144 DeviceContext deviceContext = Preconditions.checkNotNull(deviceContexts.get(deviceInfo));
145 deviceContext.onPublished();
146 lifecycleService.registerService(this.singletonServiceProvider);
150 public ConnectionStatus deviceConnected(@CheckForNull final ConnectionContext connectionContext) throws Exception {
151 Preconditions.checkArgument(connectionContext != null);
153 DeviceInfo deviceInfo = connectionContext.getDeviceInfo();
155 * This part prevent destroy another device context. Throwing here an exception result to propagate close connection
156 * in {@link org.opendaylight.openflowplugin.impl.connection.org.opendaylight.openflowplugin.impl.connection.HandshakeContextImpl}
157 * If context already exist we are in state closing process (connection flapping) and we should not propagate connection close
159 if (deviceContexts.containsKey(deviceInfo)) {
160 DeviceContext deviceContext = deviceContexts.get(deviceInfo);
161 if (!deviceContext.getState().equals(OFPContext.CONTEXT_STATE.TERMINATION)) {
162 LOG.info("Node {} already connected but context state not in TERMINATION state, replacing connection context",
163 connectionContext.getDeviceInfo().getLOGValue());
164 deviceContext.replaceConnectionContext(connectionContext);
165 return ConnectionStatus.ALREADY_CONNECTED;
167 LOG.warn("Rejecting connection from node which is already connected and there exist deviceContext for it: {}",
168 connectionContext.getDeviceInfo().getLOGValue());
169 return ConnectionStatus.CLOSING;
173 LOG.info("ConnectionEvent: Device connected to controller, Device:{}, NodeId:{}",
174 connectionContext.getConnectionAdapter().getRemoteAddress(), deviceInfo.getNodeId());
176 // Add Disconnect handler
177 connectionContext.setDeviceDisconnectedHandler(DeviceManagerImpl.this);
178 // Cache this for clarity
179 final ConnectionAdapter connectionAdapter = connectionContext.getConnectionAdapter();
181 //FIXME: as soon as auxiliary connection are fully supported then this is needed only before device context published
182 connectionAdapter.setPacketInFiltering(true);
184 final OutboundQueueProvider outboundQueueProvider = new OutboundQueueProviderImpl(deviceInfo.getVersion());
186 connectionContext.setOutboundQueueProvider(outboundQueueProvider);
187 final OutboundQueueHandlerRegistration<OutboundQueueProvider> outboundQueueHandlerRegistration =
188 connectionAdapter.registerOutboundQueueHandler(outboundQueueProvider, barrierCountLimit, barrierIntervalNanos);
189 connectionContext.setOutboundQueueHandleRegistration(outboundQueueHandlerRegistration);
191 final DeviceContext deviceContext = new DeviceContextImpl(
200 deviceContexts.putIfAbsent(deviceInfo, deviceContext);
202 final LifecycleService lifecycleService = new LifecycleServiceImpl();
203 lifecycleService.setDeviceContext(deviceContext);
204 deviceContext.putLifecycleServiceIntoTxChainManager(lifecycleService);
206 lifecycleServices.putIfAbsent(deviceInfo, lifecycleService);
208 deviceContext.setSwitchFeaturesMandatory(switchFeaturesMandatory);
210 ((ExtensionConverterProviderKeeper) deviceContext).setExtensionConverterProvider(extensionConverterProvider);
211 deviceContext.setNotificationPublishService(notificationPublishService);
213 updatePacketInRateLimiters();
215 final OpenflowProtocolListenerFullImpl messageListener = new OpenflowProtocolListenerFullImpl(
216 connectionAdapter, deviceContext);
218 connectionAdapter.setMessageListener(messageListener);
219 deviceInitPhaseHandler.onDeviceContextLevelUp(connectionContext.getDeviceInfo(), lifecycleService);
220 return ConnectionStatus.MAY_CONTINUE;
223 private void updatePacketInRateLimiters() {
224 synchronized (deviceContexts) {
225 final int deviceContextsSize = deviceContexts.size();
226 if (deviceContextsSize > 0) {
227 long freshNotificationLimit = globalNotificationQuota / deviceContextsSize;
228 if (freshNotificationLimit < 100) {
229 freshNotificationLimit = 100;
231 LOG.debug("fresh notification limit = {}", freshNotificationLimit);
232 for (final DeviceContext deviceContext : deviceContexts.values()) {
233 deviceContext.updatePacketInRateLimit(freshNotificationLimit);
240 public TranslatorLibrary oook() {
241 return translatorLibrary;
245 public void setTranslatorLibrary(final TranslatorLibrary translatorLibrary) {
246 this.translatorLibrary = translatorLibrary;
250 public void close() {
251 for (final Iterator<DeviceContext> iterator = Iterators.consumingIterator(deviceContexts.values().iterator());
252 iterator.hasNext();) {
253 final DeviceContext deviceCtx = iterator.next();
254 deviceCtx.shutdownConnection();
255 deviceCtx.shuttingDownDataStoreTransactions();
258 if (spyPool != null) {
259 spyPool.shutdownNow();
265 public void onDeviceContextLevelDown(final DeviceInfo deviceInfo) {
267 LifecycleService lifecycleService = lifecycleServices.remove(deviceInfo);
268 if (LOG.isDebugEnabled()) {
269 LOG.debug("Lifecycle service removed for node {}", deviceInfo.getLOGValue());
272 updatePacketInRateLimiters();
273 if (Objects.nonNull(lifecycleService)) {
275 lifecycleService.close();
276 LOG.debug("Lifecycle service successfully closed for node {}", deviceInfo.getLOGValue());
277 } catch (Exception e) {
278 LOG.warn("Closing lifecycle service for node {} was unsuccessful ", deviceInfo.getLOGValue(), e);
282 deviceContexts.remove(deviceInfo);
283 if (LOG.isDebugEnabled()) {
284 LOG.debug("Device context removed for node {}", deviceInfo.getLOGValue());
290 public void initialize() {
291 spyPool.scheduleAtFixedRate(messageSpy, SPY_RATE, SPY_RATE, TimeUnit.SECONDS);
295 public void setExtensionConverterProvider(final ExtensionConverterProvider extensionConverterProvider) {
296 this.extensionConverterProvider = extensionConverterProvider;
300 public ExtensionConverterProvider getExtensionConverterProvider() {
301 return extensionConverterProvider;
305 public void setDeviceTerminationPhaseHandler(final DeviceTerminationPhaseHandler handler) {
306 this.deviceTerminPhaseHandler = handler;
310 public void onDeviceDisconnected(final ConnectionContext connectionContext) {
311 LOG.trace("onDeviceDisconnected method call for Node: {}", connectionContext.getNodeId());
312 final DeviceInfo deviceInfo = connectionContext.getDeviceInfo();
313 final DeviceContext deviceCtx = this.deviceContexts.get(deviceInfo);
315 if (null == deviceCtx) {
316 LOG.info("DeviceContext for Node {} was not found. Connection is terminated without OFP context suite.", deviceInfo.getLOGValue());
320 if (deviceCtx.getState().equals(OFPContext.CONTEXT_STATE.TERMINATION)) {
321 LOG.debug("Device context for node {} is already is termination state, waiting for close all context");
325 deviceCtx.setState(OFPContext.CONTEXT_STATE.TERMINATION);
327 if (!connectionContext.equals(deviceCtx.getPrimaryConnectionContext())) {
328 LOG.debug("Node {} disconnected, but not primary connection.", connectionContext.getDeviceInfo().getLOGValue());
329 /* Connection is not PrimaryConnection so try to remove from Auxiliary Connections */
330 deviceCtx.removeAuxiliaryConnectionContext(connectionContext);
332 //TODO: Auxiliary connections supported ?
334 /* Device is disconnected and so we need to close TxManager */
335 final ListenableFuture<Void> future = deviceCtx.shuttingDownDataStoreTransactions();
336 Futures.addCallback(future, new FutureCallback<Void>() {
339 public void onSuccess(final Void result) {
340 LOG.debug("TxChainManager for device {} is closed successful.", deviceInfo.getLOGValue());
341 deviceTerminPhaseHandler.onDeviceContextLevelDown(deviceInfo);
345 public void onFailure(final Throwable t) {
346 LOG.warn("TxChainManager for device {} failed by closing.", deviceInfo.getLOGValue());
347 LOG.trace("TxChainManager failed by closing. ", t);
348 deviceTerminPhaseHandler.onDeviceContextLevelDown(deviceInfo);
351 /* Add timer for Close TxManager because it could fain ind cluster without notification */
352 final TimerTask timerTask = timeout -> {
353 if (!future.isDone()) {
354 LOG.warn("Shutting down TxChain for node {} not completed during 10 sec. Continue anyway.", deviceInfo.getLOGValue());
355 future.cancel(false);
358 hashedWheelTimer.newTimeout(timerTask, 10, TimeUnit.SECONDS);
363 void addDeviceContextToMap(final DeviceInfo deviceInfo, final DeviceContext deviceContext){
364 deviceContexts.put(deviceInfo, deviceContext);
368 public <T extends OFPContext> T gainContext(final DeviceInfo deviceInfo) {
369 return (T) deviceContexts.get(deviceInfo);
373 public void setIsNotificationFlowRemovedOff(boolean isNotificationFlowRemovedOff) {
374 this.isNotificationFlowRemovedOff = isNotificationFlowRemovedOff;
378 public boolean getIsNotificationFlowRemovedOff() {
379 return this.isNotificationFlowRemovedOff;
384 public void setSkipTableFeatures(boolean skipTableFeaturesValue) {
385 skipTableFeatures = skipTableFeaturesValue;