2 * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.openflowplugin.impl.device;
10 import com.google.common.annotations.VisibleForTesting;
11 import com.google.common.base.Preconditions;
12 import com.google.common.collect.Iterators;
13 import com.google.common.util.concurrent.FutureCallback;
14 import com.google.common.util.concurrent.Futures;
15 import com.google.common.util.concurrent.ListenableFuture;
16 import io.netty.util.HashedWheelTimer;
17 import io.netty.util.TimerTask;
18 import java.util.Collections;
19 import java.util.Iterator;
20 import java.util.Objects;
21 import java.util.Optional;
22 import java.util.concurrent.ConcurrentHashMap;
23 import java.util.concurrent.ConcurrentMap;
24 import java.util.concurrent.ExecutionException;
25 import java.util.concurrent.ScheduledThreadPoolExecutor;
26 import java.util.concurrent.TimeUnit;
27 import javax.annotation.CheckForNull;
28 import javax.annotation.Nonnull;
29 import org.opendaylight.controller.md.sal.binding.api.DataBroker;
30 import org.opendaylight.controller.md.sal.binding.api.NotificationPublishService;
31 import org.opendaylight.controller.md.sal.binding.api.WriteTransaction;
32 import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
33 import org.opendaylight.mdsal.singleton.common.api.ClusterSingletonServiceProvider;
34 import org.opendaylight.openflowjava.protocol.api.connection.ConnectionAdapter;
35 import org.opendaylight.openflowjava.protocol.api.connection.OutboundQueueHandlerRegistration;
36 import org.opendaylight.openflowplugin.api.openflow.OFPContext;
37 import org.opendaylight.openflowplugin.api.openflow.connection.ConnectionContext;
38 import org.opendaylight.openflowplugin.api.openflow.connection.ConnectionStatus;
39 import org.opendaylight.openflowplugin.api.openflow.connection.OutboundQueueProvider;
40 import org.opendaylight.openflowplugin.api.openflow.device.DeviceContext;
41 import org.opendaylight.openflowplugin.api.openflow.device.DeviceInfo;
42 import org.opendaylight.openflowplugin.api.openflow.device.DeviceManager;
43 import org.opendaylight.openflowplugin.api.openflow.device.TranslatorLibrary;
44 import org.opendaylight.openflowplugin.api.openflow.device.handlers.DeviceInitializationPhaseHandler;
45 import org.opendaylight.openflowplugin.api.openflow.device.handlers.DeviceTerminationPhaseHandler;
46 import org.opendaylight.openflowplugin.api.openflow.lifecycle.LifecycleService;
47 import org.opendaylight.openflowplugin.api.openflow.statistics.ofpspecific.MessageSpy;
48 import org.opendaylight.openflowplugin.extension.api.ExtensionConverterProviderKeeper;
49 import org.opendaylight.openflowplugin.extension.api.core.extension.ExtensionConverterProvider;
50 import org.opendaylight.openflowplugin.impl.connection.OutboundQueueProviderImpl;
51 import org.opendaylight.openflowplugin.impl.device.listener.OpenflowProtocolListenerFullImpl;
52 import org.opendaylight.openflowplugin.impl.lifecycle.LifecycleServiceImpl;
53 import org.opendaylight.openflowplugin.openflow.md.core.sal.convertor.ConvertorExecutor;
54 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.Nodes;
55 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodesBuilder;
56 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node;
57 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
58 import org.slf4j.Logger;
59 import org.slf4j.LoggerFactory;
64 public class DeviceManagerImpl implements DeviceManager, ExtensionConverterProviderKeeper {
66 private static final Logger LOG = LoggerFactory.getLogger(DeviceManagerImpl.class);
68 private final long globalNotificationQuota;
69 private final boolean switchFeaturesMandatory;
70 private boolean isNotificationFlowRemovedOff;
71 private boolean skipTableFeatures;
72 private static final int SPY_RATE = 10;
74 private final DataBroker dataBroker;
75 private final ConvertorExecutor convertorExecutor;
76 private TranslatorLibrary translatorLibrary;
77 private DeviceInitializationPhaseHandler deviceInitPhaseHandler;
78 private DeviceTerminationPhaseHandler deviceTerminPhaseHandler;
80 private final ConcurrentMap<DeviceInfo, DeviceContext> deviceContexts = new ConcurrentHashMap<>();
81 private final ConcurrentMap<DeviceInfo, LifecycleService> lifecycleServices = new ConcurrentHashMap<>();
83 private final long barrierIntervalNanos;
84 private final int barrierCountLimit;
86 private ExtensionConverterProvider extensionConverterProvider;
87 private ScheduledThreadPoolExecutor spyPool;
88 private final ClusterSingletonServiceProvider singletonServiceProvider;
89 private final NotificationPublishService notificationPublishService;
90 private final MessageSpy messageSpy;
91 private final HashedWheelTimer hashedWheelTimer;
93 public DeviceManagerImpl(@Nonnull final DataBroker dataBroker,
94 final long globalNotificationQuota,
95 final boolean switchFeaturesMandatory,
96 final long barrierInterval,
97 final int barrierCountLimit,
98 final MessageSpy messageSpy,
99 final boolean isNotificationFlowRemovedOff,
100 final ClusterSingletonServiceProvider singletonServiceProvider,
101 final NotificationPublishService notificationPublishService,
102 final HashedWheelTimer hashedWheelTimer,
103 final ConvertorExecutor convertorExecutor,
104 final boolean skipTableFeatures) {
106 this.dataBroker = dataBroker;
108 /* merge empty nodes to oper DS to predict any problems with missing parent for Node */
109 final WriteTransaction tx = dataBroker.newWriteOnlyTransaction();
110 final NodesBuilder nodesBuilder = new NodesBuilder();
111 nodesBuilder.setNode(Collections.<Node>emptyList());
112 tx.merge(LogicalDatastoreType.OPERATIONAL, InstanceIdentifier.create(Nodes.class), nodesBuilder.build());
115 } catch (ExecutionException | InterruptedException e) {
116 LOG.error("Creation of node failed.", e);
117 throw new IllegalStateException(e);
120 this.switchFeaturesMandatory = switchFeaturesMandatory;
121 this.globalNotificationQuota = globalNotificationQuota;
122 this.isNotificationFlowRemovedOff = isNotificationFlowRemovedOff;
123 this.skipTableFeatures = skipTableFeatures;
124 this.convertorExecutor = convertorExecutor;
125 this.hashedWheelTimer = hashedWheelTimer;
126 this.barrierIntervalNanos = TimeUnit.MILLISECONDS.toNanos(barrierInterval);
127 this.barrierCountLimit = barrierCountLimit;
128 this.spyPool = new ScheduledThreadPoolExecutor(1);
129 this.singletonServiceProvider = singletonServiceProvider;
130 this.notificationPublishService = notificationPublishService;
131 this.messageSpy = messageSpy;
136 public void setDeviceInitializationPhaseHandler(final DeviceInitializationPhaseHandler handler) {
137 this.deviceInitPhaseHandler = handler;
141 public void onDeviceContextLevelUp(@CheckForNull DeviceInfo deviceInfo, final LifecycleService lifecycleService) throws Exception {
142 // final phase - we have to add new Device to MD-SAL DataStore
143 LOG.debug("Final phase of DeviceContextLevelUp for Node: {} ", deviceInfo.getNodeId());
144 DeviceContext deviceContext = Preconditions.checkNotNull(deviceContexts.get(deviceInfo));
145 deviceContext.onPublished();
146 lifecycleService.registerService(this.singletonServiceProvider);
150 public ConnectionStatus deviceConnected(@CheckForNull final ConnectionContext connectionContext) throws Exception {
151 Preconditions.checkArgument(connectionContext != null);
153 DeviceInfo deviceInfo = connectionContext.getDeviceInfo();
155 * This part prevent destroy another device context. Throwing here an exception result to propagate close connection
156 * in {@link org.opendaylight.openflowplugin.impl.connection.org.opendaylight.openflowplugin.impl.connection.HandshakeContextImpl}
157 * If context already exist we are in state closing process (connection flapping) and we should not propagate connection close
159 if (deviceContexts.containsKey(deviceInfo)) {
160 DeviceContext deviceContext = deviceContexts.get(deviceInfo);
161 LOG.warn("Node {} already connected disconnecting device. Rejecting connection", deviceInfo.getLOGValue());
162 if (!deviceContext.getState().equals(OFPContext.CONTEXT_STATE.TERMINATION)) {
163 LOG.warn("Node {} context state not in TERMINATION state.",
164 connectionContext.getDeviceInfo().getLOGValue());
165 return ConnectionStatus.ALREADY_CONNECTED;
167 return ConnectionStatus.CLOSING;
171 LOG.info("ConnectionEvent: Device connected to controller, Device:{}, NodeId:{}",
172 connectionContext.getConnectionAdapter().getRemoteAddress(), deviceInfo.getNodeId());
174 // Add Disconnect handler
175 connectionContext.setDeviceDisconnectedHandler(DeviceManagerImpl.this);
176 // Cache this for clarity
177 final ConnectionAdapter connectionAdapter = connectionContext.getConnectionAdapter();
179 //FIXME: as soon as auxiliary connection are fully supported then this is needed only before device context published
180 connectionAdapter.setPacketInFiltering(true);
182 final OutboundQueueProvider outboundQueueProvider = new OutboundQueueProviderImpl(deviceInfo.getVersion());
184 connectionContext.setOutboundQueueProvider(outboundQueueProvider);
185 final OutboundQueueHandlerRegistration<OutboundQueueProvider> outboundQueueHandlerRegistration =
186 connectionAdapter.registerOutboundQueueHandler(outboundQueueProvider, barrierCountLimit, barrierIntervalNanos);
187 connectionContext.setOutboundQueueHandleRegistration(outboundQueueHandlerRegistration);
189 final DeviceContext deviceContext = new DeviceContextImpl(
198 deviceContexts.put(deviceInfo, deviceContext);
200 final LifecycleService lifecycleService = new LifecycleServiceImpl();
201 lifecycleService.setDeviceContext(deviceContext);
202 deviceContext.putLifecycleServiceIntoTxChainManager(lifecycleService);
204 lifecycleServices.put(deviceInfo, lifecycleService);
206 deviceContext.setSwitchFeaturesMandatory(switchFeaturesMandatory);
208 ((ExtensionConverterProviderKeeper) deviceContext).setExtensionConverterProvider(extensionConverterProvider);
209 deviceContext.setNotificationPublishService(notificationPublishService);
211 updatePacketInRateLimiters();
213 final OpenflowProtocolListenerFullImpl messageListener = new OpenflowProtocolListenerFullImpl(
214 connectionAdapter, deviceContext);
216 connectionAdapter.setMessageListener(messageListener);
217 deviceInitPhaseHandler.onDeviceContextLevelUp(connectionContext.getDeviceInfo(), lifecycleService);
218 return ConnectionStatus.MAY_CONTINUE;
221 private void updatePacketInRateLimiters() {
222 synchronized (deviceContexts) {
223 final int deviceContextsSize = deviceContexts.size();
224 if (deviceContextsSize > 0) {
225 long freshNotificationLimit = globalNotificationQuota / deviceContextsSize;
226 if (freshNotificationLimit < 100) {
227 freshNotificationLimit = 100;
229 if (LOG.isDebugEnabled()) {
230 LOG.debug("fresh notification limit = {}", freshNotificationLimit);
232 for (final DeviceContext deviceContext : deviceContexts.values()) {
233 deviceContext.updatePacketInRateLimit(freshNotificationLimit);
240 public TranslatorLibrary oook() {
241 return translatorLibrary;
245 public void setTranslatorLibrary(final TranslatorLibrary translatorLibrary) {
246 this.translatorLibrary = translatorLibrary;
250 public void close() {
251 for (final Iterator<DeviceContext> iterator = Iterators.consumingIterator(deviceContexts.values().iterator());
252 iterator.hasNext();) {
253 final DeviceContext deviceCtx = iterator.next();
254 deviceCtx.shutdownConnection();
255 deviceCtx.shuttingDownDataStoreTransactions();
258 Optional.ofNullable(spyPool).ifPresent(ScheduledThreadPoolExecutor::shutdownNow);
264 public void onDeviceContextLevelDown(final DeviceInfo deviceInfo) {
266 LifecycleService lifecycleService = lifecycleServices.remove(deviceInfo);
267 if (LOG.isDebugEnabled()) {
268 LOG.debug("Lifecycle service removed for node {}", deviceInfo.getLOGValue());
271 updatePacketInRateLimiters();
272 if (Objects.nonNull(lifecycleService)) {
274 lifecycleService.close();
275 LOG.debug("Lifecycle service successfully closed for node {}", deviceInfo.getLOGValue());
276 } catch (Exception e) {
277 LOG.warn("Closing lifecycle service for node {} was unsuccessful ", deviceInfo.getLOGValue(), e);
281 deviceContexts.remove(deviceInfo);
282 if (LOG.isDebugEnabled()) {
283 LOG.debug("Device context removed for node {}", deviceInfo.getLOGValue());
289 public void initialize() {
290 spyPool.scheduleAtFixedRate(messageSpy, SPY_RATE, SPY_RATE, TimeUnit.SECONDS);
294 public void setExtensionConverterProvider(final ExtensionConverterProvider extensionConverterProvider) {
295 this.extensionConverterProvider = extensionConverterProvider;
299 public ExtensionConverterProvider getExtensionConverterProvider() {
300 return extensionConverterProvider;
304 public void setDeviceTerminationPhaseHandler(final DeviceTerminationPhaseHandler handler) {
305 this.deviceTerminPhaseHandler = handler;
309 public void onDeviceDisconnected(final ConnectionContext connectionContext) {
310 LOG.trace("onDeviceDisconnected method call for Node: {}", connectionContext.getNodeId());
311 final DeviceInfo deviceInfo = connectionContext.getDeviceInfo();
312 final DeviceContext deviceCtx = this.deviceContexts.get(deviceInfo);
314 if (null == deviceCtx) {
315 LOG.info("DeviceContext for Node {} was not found. Connection is terminated without OFP context suite.", deviceInfo.getLOGValue());
319 if (deviceCtx.getState().equals(OFPContext.CONTEXT_STATE.TERMINATION)) {
320 LOG.info("Device context for node {} is already is termination state, waiting for close all context", deviceInfo.getLOGValue());
324 deviceCtx.setState(OFPContext.CONTEXT_STATE.TERMINATION);
326 if (!connectionContext.equals(deviceCtx.getPrimaryConnectionContext())) {
327 LOG.debug("Node {} disconnected, but not primary connection.", connectionContext.getDeviceInfo().getLOGValue());
328 /* Connection is not PrimaryConnection so try to remove from Auxiliary Connections */
329 deviceCtx.removeAuxiliaryConnectionContext(connectionContext);
331 //TODO: Auxiliary connections supported ?
332 /* Device is disconnected and so we need to close TxManager */
333 final ListenableFuture<Void> future = deviceCtx.shuttingDownDataStoreTransactions();
334 Futures.addCallback(future, new FutureCallback<Void>() {
337 public void onSuccess(final Void result) {
338 LOG.debug("TxChainManager for device {} is closed successful.", deviceInfo.getLOGValue());
339 deviceTerminPhaseHandler.onDeviceContextLevelDown(deviceInfo);
343 public void onFailure(final Throwable t) {
344 LOG.warn("TxChainManager for device {} failed by closing.", deviceInfo.getLOGValue());
345 LOG.trace("TxChainManager failed by closing. ", t);
346 deviceTerminPhaseHandler.onDeviceContextLevelDown(deviceInfo);
349 /* Add timer for Close TxManager because it could fain ind cluster without notification */
350 final TimerTask timerTask = timeout -> {
351 if (!future.isDone()) {
352 LOG.warn("Shutting down TxChain for node {} not completed during 10 sec. Continue anyway.", deviceInfo.getLOGValue());
353 future.cancel(false);
356 hashedWheelTimer.newTimeout(timerTask, 10, TimeUnit.SECONDS);
360 void addDeviceContextToMap(final DeviceInfo deviceInfo, final DeviceContext deviceContext){
361 deviceContexts.put(deviceInfo, deviceContext);
365 public void setIsNotificationFlowRemovedOff(boolean isNotificationFlowRemovedOff) {
366 this.isNotificationFlowRemovedOff = isNotificationFlowRemovedOff;
370 public boolean getIsNotificationFlowRemovedOff() {
371 return this.isNotificationFlowRemovedOff;
376 public void setSkipTableFeatures(boolean skipTableFeaturesValue) {
377 skipTableFeatures = skipTableFeaturesValue;