2 * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.openflowplugin.impl.device;
10 import com.google.common.annotations.VisibleForTesting;
11 import com.google.common.base.Preconditions;
12 import com.google.common.base.Verify;
13 import com.google.common.collect.Iterators;
14 import com.google.common.util.concurrent.FutureCallback;
15 import com.google.common.util.concurrent.Futures;
16 import com.google.common.util.concurrent.ListenableFuture;
17 import io.netty.util.TimerTask;
18 import java.util.Collections;
19 import java.util.Iterator;
20 import java.util.concurrent.ConcurrentHashMap;
21 import java.util.concurrent.ConcurrentMap;
22 import java.util.concurrent.ExecutionException;
23 import java.util.concurrent.ScheduledThreadPoolExecutor;
24 import java.util.concurrent.TimeUnit;
25 import javax.annotation.CheckForNull;
26 import javax.annotation.Nonnull;
27 import org.opendaylight.controller.md.sal.binding.api.DataBroker;
28 import org.opendaylight.controller.md.sal.binding.api.WriteTransaction;
29 import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
30 import org.opendaylight.mdsal.singleton.common.api.ClusterSingletonServiceProvider;
31 import org.opendaylight.openflowjava.protocol.api.connection.ConnectionAdapter;
32 import org.opendaylight.openflowjava.protocol.api.connection.OutboundQueueHandlerRegistration;
33 import org.opendaylight.openflowplugin.api.openflow.OFPContext;
34 import org.opendaylight.openflowplugin.api.openflow.connection.ConnectionContext;
35 import org.opendaylight.openflowplugin.api.openflow.connection.OutboundQueueProvider;
36 import org.opendaylight.openflowplugin.api.openflow.device.DeviceContext;
37 import org.opendaylight.openflowplugin.api.openflow.device.DeviceInfo;
38 import org.opendaylight.openflowplugin.api.openflow.device.DeviceManager;
39 import org.opendaylight.openflowplugin.api.openflow.device.TranslatorLibrary;
40 import org.opendaylight.openflowplugin.api.openflow.device.handlers.DeviceInitializationPhaseHandler;
41 import org.opendaylight.openflowplugin.api.openflow.device.handlers.DeviceTerminationPhaseHandler;
42 import org.opendaylight.openflowplugin.api.openflow.lifecycle.LifecycleConductor;
43 import org.opendaylight.openflowplugin.api.openflow.lifecycle.LifecycleService;
44 import org.opendaylight.openflowplugin.extension.api.ExtensionConverterProviderKeeper;
45 import org.opendaylight.openflowplugin.extension.api.core.extension.ExtensionConverterProvider;
46 import org.opendaylight.openflowplugin.impl.connection.OutboundQueueProviderImpl;
47 import org.opendaylight.openflowplugin.impl.device.listener.OpenflowProtocolListenerFullImpl;
48 import org.opendaylight.openflowplugin.impl.lifecycle.LifecycleServiceImpl;
49 import org.opendaylight.openflowplugin.impl.util.DeviceInitializationUtils;
50 import org.opendaylight.openflowplugin.openflow.md.core.sal.convertor.ConvertorExecutor;
51 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.Nodes;
52 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodesBuilder;
53 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node;
54 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
55 import org.slf4j.Logger;
56 import org.slf4j.LoggerFactory;
61 public class DeviceManagerImpl implements DeviceManager, ExtensionConverterProviderKeeper {
63 private static final Logger LOG = LoggerFactory.getLogger(DeviceManagerImpl.class);
65 private final long globalNotificationQuota;
66 private final boolean switchFeaturesMandatory;
67 private boolean isNotificationFlowRemovedOff;
69 private static final int SPY_RATE = 10;
71 private final DataBroker dataBroker;
72 private final ConvertorExecutor convertorExecutor;
73 private TranslatorLibrary translatorLibrary;
74 private DeviceInitializationPhaseHandler deviceInitPhaseHandler;
75 private DeviceTerminationPhaseHandler deviceTerminPhaseHandler;
77 private final ConcurrentMap<DeviceInfo, DeviceContext> deviceContexts = new ConcurrentHashMap<>();
78 private final ConcurrentMap<DeviceInfo, LifecycleService> lifecycleServices = new ConcurrentHashMap<>();
80 private final long barrierIntervalNanos;
81 private final int barrierCountLimit;
82 private ExtensionConverterProvider extensionConverterProvider;
83 private ScheduledThreadPoolExecutor spyPool;
84 private final ClusterSingletonServiceProvider singletonServiceProvider;
86 private final LifecycleConductor conductor;
88 public DeviceManagerImpl(@Nonnull final DataBroker dataBroker,
89 final long globalNotificationQuota,
90 final boolean switchFeaturesMandatory,
91 final long barrierInterval,
92 final int barrierCountLimit,
93 final LifecycleConductor lifecycleConductor,
94 boolean isNotificationFlowRemovedOff,
95 final ConvertorExecutor convertorExecutor,
96 final ClusterSingletonServiceProvider singletonServiceProvider) {
97 this.switchFeaturesMandatory = switchFeaturesMandatory;
98 this.globalNotificationQuota = globalNotificationQuota;
99 this.isNotificationFlowRemovedOff = isNotificationFlowRemovedOff;
100 this.dataBroker = Preconditions.checkNotNull(dataBroker);
101 this.convertorExecutor = convertorExecutor;
102 /* merge empty nodes to oper DS to predict any problems with missing parent for Node */
103 final WriteTransaction tx = dataBroker.newWriteOnlyTransaction();
105 final NodesBuilder nodesBuilder = new NodesBuilder();
106 nodesBuilder.setNode(Collections.<Node>emptyList());
107 tx.merge(LogicalDatastoreType.OPERATIONAL, InstanceIdentifier.create(Nodes.class), nodesBuilder.build());
110 } catch (ExecutionException | InterruptedException e) {
111 LOG.error("Creation of node failed.", e);
112 throw new IllegalStateException(e);
115 this.barrierIntervalNanos = TimeUnit.MILLISECONDS.toNanos(barrierInterval);
116 this.barrierCountLimit = barrierCountLimit;
118 this.conductor = lifecycleConductor;
119 spyPool = new ScheduledThreadPoolExecutor(1);
120 this.singletonServiceProvider = singletonServiceProvider;
125 public void setDeviceInitializationPhaseHandler(final DeviceInitializationPhaseHandler handler) {
126 this.deviceInitPhaseHandler = handler;
130 public void onDeviceContextLevelUp(@CheckForNull DeviceInfo deviceInfo, final LifecycleService lifecycleService) throws Exception {
131 // final phase - we have to add new Device to MD-SAL DataStore
132 LOG.debug("Final phase of DeviceContextLevelUp for Node: {} ", deviceInfo.getNodeId());
133 DeviceContext deviceContext = Preconditions.checkNotNull(deviceContexts.get(deviceInfo));
134 deviceContext.onPublished();
135 lifecycleService.registerService(this.singletonServiceProvider);
139 public boolean deviceConnected(@CheckForNull final ConnectionContext connectionContext) throws Exception {
140 Preconditions.checkArgument(connectionContext != null);
142 DeviceInfo deviceInfo = connectionContext.getDeviceInfo();
144 * This part prevent destroy another device context. Throwing here an exception result to propagate close connection
145 * in {@link org.opendaylight.openflowplugin.impl.connection.org.opendaylight.openflowplugin.impl.connection.HandshakeContextImpl}
146 * If context already exist we are in state closing process (connection flapping) and we should not propagate connection close
148 if (deviceContexts.containsKey(deviceInfo)) {
149 LOG.warn("Rejecting connection from node which is already connected and there exist deviceContext for it: {}", connectionContext.getNodeId());
153 LOG.info("ConnectionEvent: Device connected to controller, Device:{}, NodeId:{}",
154 connectionContext.getConnectionAdapter().getRemoteAddress(), deviceInfo.getNodeId());
156 // Add Disconnect handler
157 connectionContext.setDeviceDisconnectedHandler(DeviceManagerImpl.this);
158 // Cache this for clarity
159 final ConnectionAdapter connectionAdapter = connectionContext.getConnectionAdapter();
161 //FIXME: as soon as auxiliary connection are fully supported then this is needed only before device context published
162 connectionAdapter.setPacketInFiltering(true);
164 final OutboundQueueProvider outboundQueueProvider = new OutboundQueueProviderImpl(deviceInfo.getVersion());
166 connectionContext.setOutboundQueueProvider(outboundQueueProvider);
167 final OutboundQueueHandlerRegistration<OutboundQueueProvider> outboundQueueHandlerRegistration =
168 connectionAdapter.registerOutboundQueueHandler(outboundQueueProvider, barrierCountLimit, barrierIntervalNanos);
169 connectionContext.setOutboundQueueHandleRegistration(outboundQueueHandlerRegistration);
171 final DeviceContext deviceContext = new DeviceContextImpl(connectionContext,
174 outboundQueueProvider,
179 final LifecycleService lifecycleService = new LifecycleServiceImpl();
180 lifecycleService.setDeviceContext(deviceContext);
182 Verify.verify(deviceContexts.putIfAbsent(deviceInfo, deviceContext) == null, "DeviceCtx still not closed.");
183 lifecycleServices.putIfAbsent(deviceInfo, lifecycleService);
185 deviceContext.setSwitchFeaturesMandatory(switchFeaturesMandatory);
187 ((ExtensionConverterProviderKeeper) deviceContext).setExtensionConverterProvider(extensionConverterProvider);
188 deviceContext.setNotificationPublishService(conductor.getNotificationPublishService());
190 updatePacketInRateLimiters();
192 final OpenflowProtocolListenerFullImpl messageListener = new OpenflowProtocolListenerFullImpl(
193 connectionAdapter, deviceContext);
195 connectionAdapter.setMessageListener(messageListener);
196 deviceInitPhaseHandler.onDeviceContextLevelUp(connectionContext.getDeviceInfo(), lifecycleService);
200 private void updatePacketInRateLimiters() {
201 synchronized (deviceContexts) {
202 final int deviceContextsSize = deviceContexts.size();
203 if (deviceContextsSize > 0) {
204 long freshNotificationLimit = globalNotificationQuota / deviceContextsSize;
205 if (freshNotificationLimit < 100) {
206 freshNotificationLimit = 100;
208 LOG.debug("fresh notification limit = {}", freshNotificationLimit);
209 for (final DeviceContext deviceContext : deviceContexts.values()) {
210 deviceContext.updatePacketInRateLimit(freshNotificationLimit);
217 public TranslatorLibrary oook() {
218 return translatorLibrary;
222 public void setTranslatorLibrary(final TranslatorLibrary translatorLibrary) {
223 this.translatorLibrary = translatorLibrary;
227 public void close() {
228 for (final Iterator<DeviceContext> iterator = Iterators.consumingIterator(deviceContexts.values().iterator());
229 iterator.hasNext();) {
230 final DeviceContext deviceCtx = iterator.next();
231 deviceCtx.shutdownConnection();
232 deviceCtx.shuttingDownDataStoreTransactions();
235 if (spyPool != null) {
236 spyPool.shutdownNow();
242 public void onDeviceContextLevelDown(final DeviceInfo deviceInfo) {
243 LOG.debug("onDeviceContextClosed for Node {}", deviceInfo.getNodeId());
244 deviceContexts.remove(deviceInfo);
245 updatePacketInRateLimiters();
246 LifecycleService lifecycleService = lifecycleServices.remove(deviceInfo);
248 lifecycleService.close();
249 } catch (Exception e) {
250 LOG.warn("Closing service for node {} was unsuccessful ", deviceInfo.getNodeId().getValue(), e);
255 public void initialize() {
256 spyPool.scheduleAtFixedRate(conductor.getMessageIntelligenceAgency(), SPY_RATE, SPY_RATE, TimeUnit.SECONDS);
260 public void setExtensionConverterProvider(final ExtensionConverterProvider extensionConverterProvider) {
261 this.extensionConverterProvider = extensionConverterProvider;
265 public ExtensionConverterProvider getExtensionConverterProvider() {
266 return extensionConverterProvider;
270 public void setDeviceTerminationPhaseHandler(final DeviceTerminationPhaseHandler handler) {
271 this.deviceTerminPhaseHandler = handler;
275 public void onDeviceDisconnected(final ConnectionContext connectionContext) {
276 LOG.trace("onDeviceDisconnected method call for Node: {}", connectionContext.getNodeId());
277 final DeviceInfo deviceInfo = connectionContext.getDeviceInfo();
278 final DeviceContext deviceCtx = this.deviceContexts.get(deviceInfo);
280 if (null == deviceCtx) {
281 LOG.info("DeviceContext for Node {} was not found. Connection is terminated without OFP context suite.", deviceInfo.getNodeId());
285 if (!connectionContext.equals(deviceCtx.getPrimaryConnectionContext())) {
286 /* Connection is not PrimaryConnection so try to remove from Auxiliary Connections */
287 deviceCtx.removeAuxiliaryConnectionContext(connectionContext);
289 /* Device is disconnected and so we need to close TxManager */
290 final ListenableFuture<Void> future = deviceCtx.shuttingDownDataStoreTransactions();
291 Futures.addCallback(future, new FutureCallback<Void>() {
294 public void onSuccess(final Void result) {
295 LOG.debug("TxChainManager for device {} is closed successful.", deviceInfo.getNodeId());
296 deviceTerminPhaseHandler.onDeviceContextLevelDown(deviceInfo);
300 public void onFailure(final Throwable t) {
301 LOG.warn("TxChainManager for device {} failed by closing.", deviceInfo.getNodeId(), t);
302 deviceTerminPhaseHandler.onDeviceContextLevelDown(deviceInfo);
305 /* Add timer for Close TxManager because it could fain ind cluster without notification */
306 final TimerTask timerTask = timeout -> {
307 if (!future.isDone()) {
308 LOG.info("Shutting down TxChain for node {} not completed during 10 sec. Continue anyway.", deviceInfo.getNodeId());
309 future.cancel(false);
312 conductor.newTimeout(timerTask, 10, TimeUnit.SECONDS);
317 void addDeviceContextToMap(final DeviceInfo deviceInfo, final DeviceContext deviceContext){
318 deviceContexts.put(deviceInfo, deviceContext);
322 public <T extends OFPContext> T gainContext(final DeviceInfo deviceInfo) {
323 return (T) deviceContexts.get(deviceInfo);
327 public void setIsNotificationFlowRemovedOff(boolean isNotificationFlowRemovedOff) {
328 this.isNotificationFlowRemovedOff = isNotificationFlowRemovedOff;
332 public boolean getIsNotificationFlowRemovedOff() {
333 return this.isNotificationFlowRemovedOff;