2 * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.openflowplugin.impl.device;
10 import com.google.common.annotations.VisibleForTesting;
11 import com.google.common.base.Preconditions;
12 import com.google.common.base.Verify;
13 import com.google.common.collect.Iterators;
14 import com.google.common.util.concurrent.CheckedFuture;
15 import com.google.common.util.concurrent.FutureCallback;
16 import com.google.common.util.concurrent.Futures;
17 import com.google.common.util.concurrent.ListenableFuture;
18 import io.netty.util.HashedWheelTimer;
19 import io.netty.util.TimerTask;
20 import java.util.Collections;
21 import java.util.Iterator;
22 import java.util.Objects;
23 import java.util.Optional;
24 import java.util.concurrent.ConcurrentHashMap;
25 import java.util.concurrent.ConcurrentMap;
26 import java.util.concurrent.ExecutionException;
27 import java.util.concurrent.ScheduledThreadPoolExecutor;
28 import java.util.concurrent.TimeUnit;
29 import javax.annotation.CheckForNull;
30 import javax.annotation.Nonnull;
31 import javax.annotation.Nullable;
32 import org.opendaylight.controller.md.sal.binding.api.DataBroker;
33 import org.opendaylight.controller.md.sal.binding.api.NotificationPublishService;
34 import org.opendaylight.controller.md.sal.binding.api.WriteTransaction;
35 import org.opendaylight.controller.md.sal.common.api.clustering.EntityOwnershipChange;
36 import org.opendaylight.controller.md.sal.common.api.clustering.EntityOwnershipListenerRegistration;
37 import org.opendaylight.controller.md.sal.common.api.clustering.EntityOwnershipService;
38 import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
39 import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException;
40 import org.opendaylight.mdsal.singleton.common.api.ClusterSingletonServiceProvider;
41 import org.opendaylight.openflowjava.protocol.api.connection.ConnectionAdapter;
42 import org.opendaylight.openflowjava.protocol.api.connection.OutboundQueueHandlerRegistration;
43 import org.opendaylight.openflowplugin.api.openflow.OFPContext;
44 import org.opendaylight.openflowplugin.api.openflow.connection.ConnectionContext;
45 import org.opendaylight.openflowplugin.api.openflow.connection.ConnectionStatus;
46 import org.opendaylight.openflowplugin.api.openflow.connection.OutboundQueueProvider;
47 import org.opendaylight.openflowplugin.api.openflow.device.DeviceContext;
48 import org.opendaylight.openflowplugin.api.openflow.device.DeviceInfo;
49 import org.opendaylight.openflowplugin.api.openflow.device.DeviceManager;
50 import org.opendaylight.openflowplugin.api.openflow.device.TranslatorLibrary;
51 import org.opendaylight.openflowplugin.api.openflow.device.handlers.DeviceInitializationPhaseHandler;
52 import org.opendaylight.openflowplugin.api.openflow.device.handlers.DeviceTerminationPhaseHandler;
53 import org.opendaylight.openflowplugin.api.openflow.lifecycle.LifecycleService;
54 import org.opendaylight.openflowplugin.api.openflow.statistics.ofpspecific.MessageSpy;
55 import org.opendaylight.openflowplugin.extension.api.ExtensionConverterProviderKeeper;
56 import org.opendaylight.openflowplugin.extension.api.core.extension.ExtensionConverterProvider;
57 import org.opendaylight.openflowplugin.impl.connection.OutboundQueueProviderImpl;
58 import org.opendaylight.openflowplugin.impl.device.listener.OpenflowProtocolListenerFullImpl;
59 import org.opendaylight.openflowplugin.impl.lifecycle.LifecycleServiceImpl;
60 import org.opendaylight.openflowplugin.impl.services.SalRoleServiceImpl;
61 import org.opendaylight.openflowplugin.impl.util.DeviceStateUtil;
62 import org.opendaylight.openflowplugin.openflow.md.core.sal.convertor.ConvertorExecutor;
63 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeId;
64 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.Nodes;
65 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodesBuilder;
66 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node;
67 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.NodeKey;
68 import org.opendaylight.yang.gen.v1.urn.opendaylight.role.service.rev150727.SetRoleOutput;
69 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
70 import org.opendaylight.yangtools.yang.binding.KeyedInstanceIdentifier;
71 import org.opendaylight.yangtools.yang.common.RpcResult;
72 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
73 import org.slf4j.Logger;
74 import org.slf4j.LoggerFactory;
79 public class DeviceManagerImpl implements DeviceManager, ExtensionConverterProviderKeeper {
81 private static final Logger LOG = LoggerFactory.getLogger(DeviceManagerImpl.class);
82 private static final String SERVICE_ENTITY_TYPE = "org.opendaylight.mdsal.ServiceEntityType";
84 private final long globalNotificationQuota;
85 private final boolean switchFeaturesMandatory;
86 private final EntityOwnershipListenerRegistration eosListenerRegistration;
87 private boolean isFlowRemovedNotificationOn;
88 private boolean skipTableFeatures;
89 private static final int SPY_RATE = 10;
91 private final DataBroker dataBroker;
92 private final ConvertorExecutor convertorExecutor;
93 private TranslatorLibrary translatorLibrary;
94 private DeviceInitializationPhaseHandler deviceInitPhaseHandler;
95 private DeviceTerminationPhaseHandler deviceTerminPhaseHandler;
97 private final ConcurrentMap<DeviceInfo, DeviceContext> deviceContexts = new ConcurrentHashMap<>();
98 private final ConcurrentMap<DeviceInfo, DeviceContext> removeddeviceContexts = new ConcurrentHashMap<>();
99 private final ConcurrentMap<DeviceInfo, LifecycleService> lifecycleServices = new ConcurrentHashMap<>();
101 private long barrierIntervalNanos;
102 private int barrierCountLimit;
104 private ExtensionConverterProvider extensionConverterProvider;
105 private ScheduledThreadPoolExecutor spyPool;
106 private final ClusterSingletonServiceProvider singletonServiceProvider;
107 private final EntityOwnershipService entityOwnershipService;
108 private final NotificationPublishService notificationPublishService;
109 private final MessageSpy messageSpy;
110 private final HashedWheelTimer hashedWheelTimer;
112 public DeviceManagerImpl(@Nonnull final DataBroker dataBroker,
113 final long globalNotificationQuota,
114 final boolean switchFeaturesMandatory,
115 final long barrierInterval,
116 final int barrierCountLimit,
117 final MessageSpy messageSpy,
118 final boolean isFlowRemovedNotificationOn,
119 final ClusterSingletonServiceProvider singletonServiceProvider,
120 final EntityOwnershipService entityOwnershipService,
121 final HashedWheelTimer hashedWheelTimer,
122 final ConvertorExecutor convertorExecutor,
123 final boolean skipTableFeatures,
124 final NotificationPublishService notificationPublishService) {
127 this.dataBroker = dataBroker;
128 this.entityOwnershipService = entityOwnershipService;
129 this.switchFeaturesMandatory = switchFeaturesMandatory;
130 this.globalNotificationQuota = globalNotificationQuota;
131 this.isFlowRemovedNotificationOn = isFlowRemovedNotificationOn;
132 this.skipTableFeatures = skipTableFeatures;
133 this.convertorExecutor = convertorExecutor;
134 this.hashedWheelTimer = hashedWheelTimer;
135 this.barrierIntervalNanos = TimeUnit.MILLISECONDS.toNanos(barrierInterval);
136 this.barrierCountLimit = barrierCountLimit;
137 this.spyPool = new ScheduledThreadPoolExecutor(1);
138 this.singletonServiceProvider = singletonServiceProvider;
139 this.notificationPublishService = notificationPublishService;
140 this.messageSpy = messageSpy;
142 this.eosListenerRegistration = Verify.verifyNotNull(entityOwnershipService.registerListener
143 (SERVICE_ENTITY_TYPE, this));
145 /* merge empty nodes to oper DS to predict any problems with missing parent for Node */
146 final WriteTransaction tx = dataBroker.newWriteOnlyTransaction();
147 final NodesBuilder nodesBuilder = new NodesBuilder();
148 nodesBuilder.setNode(Collections.<Node>emptyList());
149 tx.merge(LogicalDatastoreType.OPERATIONAL, InstanceIdentifier.create(Nodes.class), nodesBuilder.build());
152 } catch (ExecutionException | InterruptedException e) {
153 LOG.error("Creation of node failed.", e);
154 throw new IllegalStateException(e);
160 public void setDeviceInitializationPhaseHandler(final DeviceInitializationPhaseHandler handler) {
161 this.deviceInitPhaseHandler = handler;
165 public void onDeviceContextLevelUp(@CheckForNull DeviceInfo deviceInfo, final LifecycleService lifecycleService) throws Exception {
166 // final phase - we have to add new Device to MD-SAL DataStore
167 LOG.debug("Final phase of DeviceContextLevelUp for Node: {} ", deviceInfo.getNodeId());
168 DeviceContext deviceContext = Preconditions.checkNotNull(deviceContexts.get(deviceInfo));
169 deviceContext.onPublished();
170 lifecycleService.registerDeviceRemovedHandler(this);
171 lifecycleService.registerService(this.singletonServiceProvider);
175 public ConnectionStatus deviceConnected(@CheckForNull final ConnectionContext connectionContext) throws Exception {
176 Preconditions.checkArgument(connectionContext != null);
177 final DeviceInfo deviceInfo = connectionContext.getDeviceInfo();
180 * This part prevent destroy another device context. Throwing here an exception result to propagate close connection
181 * in {@link org.opendaylight.openflowplugin.impl.connection.org.opendaylight.openflowplugin.impl.connection.HandshakeContextImpl}
182 * If context already exist we are in state closing process (connection flapping) and we should not propagate connection close
184 if (deviceContexts.containsKey(deviceInfo)) {
185 DeviceContext deviceContext = deviceContexts.get(deviceInfo);
186 LOG.warn("Node {} already connected disconnecting device. Rejecting connection", deviceInfo.getLOGValue());
187 if (!deviceContext.getState().equals(OFPContext.CONTEXT_STATE.TERMINATION)) {
188 LOG.warn("Node {} context state not in TERMINATION state.",
189 connectionContext.getDeviceInfo().getLOGValue());
190 return ConnectionStatus.ALREADY_CONNECTED;
192 return ConnectionStatus.CLOSING;
196 LOG.info("ConnectionEvent: Device connected to controller, Device:{}, NodeId:{}",
197 connectionContext.getConnectionAdapter().getRemoteAddress(), deviceInfo.getNodeId());
199 // Add Disconnect handler
200 connectionContext.setDeviceDisconnectedHandler(this);
202 // Cache this for clarity
203 final ConnectionAdapter connectionAdapter = connectionContext.getConnectionAdapter();
205 // FIXME: as soon as auxiliary connection are fully supported then this is needed only before device context published
206 connectionAdapter.setPacketInFiltering(true);
208 final OutboundQueueProvider outboundQueueProvider = new OutboundQueueProviderImpl(deviceInfo.getVersion());
210 connectionContext.setOutboundQueueProvider(outboundQueueProvider);
211 final OutboundQueueHandlerRegistration<OutboundQueueProvider> outboundQueueHandlerRegistration =
212 connectionAdapter.registerOutboundQueueHandler(outboundQueueProvider, barrierCountLimit, barrierIntervalNanos);
213 connectionContext.setOutboundQueueHandleRegistration(outboundQueueHandlerRegistration);
215 final LifecycleService lifecycleService = new LifecycleServiceImpl();
217 final DeviceContext deviceContext = new DeviceContextImpl(
228 deviceContext.setSalRoleService(new SalRoleServiceImpl(deviceContext, deviceContext));
229 deviceContexts.put(deviceInfo, deviceContext);
231 lifecycleService.setDeviceContext(deviceContext);
232 deviceContext.putLifecycleServiceIntoTxChainManager(lifecycleService);
234 lifecycleServices.put(deviceInfo, lifecycleService);
236 addCallbackToDeviceInitializeToSlave(deviceInfo, deviceContext, lifecycleService);
238 deviceContext.setSwitchFeaturesMandatory(switchFeaturesMandatory);
240 ((ExtensionConverterProviderKeeper) deviceContext).setExtensionConverterProvider(extensionConverterProvider);
241 deviceContext.setNotificationPublishService(notificationPublishService);
243 updatePacketInRateLimiters();
245 final OpenflowProtocolListenerFullImpl messageListener = new OpenflowProtocolListenerFullImpl(
246 connectionAdapter, deviceContext);
248 connectionAdapter.setMessageListener(messageListener);
249 deviceInitPhaseHandler.onDeviceContextLevelUp(connectionContext.getDeviceInfo(), lifecycleService);
250 return ConnectionStatus.MAY_CONTINUE;
254 public TranslatorLibrary oook() {
255 return translatorLibrary;
259 public void setTranslatorLibrary(final TranslatorLibrary translatorLibrary) {
260 this.translatorLibrary = translatorLibrary;
264 public void close() {
265 for (final Iterator<DeviceContext> iterator = Iterators.consumingIterator(deviceContexts.values().iterator());
266 iterator.hasNext();) {
267 final DeviceContext deviceCtx = iterator.next();
268 deviceCtx.shutdownConnection();
269 deviceCtx.shuttingDownDataStoreTransactions();
272 Optional.ofNullable(spyPool).ifPresent(ScheduledThreadPoolExecutor::shutdownNow);
275 if (Objects.nonNull(eosListenerRegistration)) {
277 LOG.debug("Closing entity ownership listener");
278 eosListenerRegistration.close();
279 } catch (Exception e) {
280 LOG.debug("Failed to close entity ownership listener registration with exception",e);
287 public void onDeviceContextLevelDown(final DeviceInfo deviceInfo) {
288 updatePacketInRateLimiters();
289 Optional.ofNullable(lifecycleServices.get(deviceInfo)).ifPresent(OFPContext::close);
293 public void initialize() {
294 spyPool.scheduleAtFixedRate(messageSpy, SPY_RATE, SPY_RATE, TimeUnit.SECONDS);
298 public void setExtensionConverterProvider(final ExtensionConverterProvider extensionConverterProvider) {
299 this.extensionConverterProvider = extensionConverterProvider;
303 public ExtensionConverterProvider getExtensionConverterProvider() {
304 return extensionConverterProvider;
308 public void setDeviceTerminationPhaseHandler(final DeviceTerminationPhaseHandler handler) {
309 this.deviceTerminPhaseHandler = handler;
313 public void onDeviceDisconnected(final ConnectionContext connectionContext) {
314 LOG.trace("onDeviceDisconnected method call for Node: {}", connectionContext.getNodeId());
315 final DeviceInfo deviceInfo = connectionContext.getDeviceInfo();
316 final DeviceContext deviceCtx = this.deviceContexts.get(deviceInfo);
318 if (Objects.isNull(deviceCtx)) {
319 LOG.info("DeviceContext for Node {} was not found. Connection is terminated without OFP context suite.", deviceInfo.getLOGValue());
323 if (deviceCtx.getState().equals(OFPContext.CONTEXT_STATE.TERMINATION)) {
324 LOG.info("Device context for node {} is already is termination state, waiting for close all context", deviceInfo.getLOGValue());
330 if (!connectionContext.equals(deviceCtx.getPrimaryConnectionContext())) {
331 LOG.debug("Node {} disconnected, but not primary connection.", connectionContext.getDeviceInfo().getLOGValue());
332 // Connection is not PrimaryConnection so try to remove from Auxiliary Connections
333 deviceCtx.removeAuxiliaryConnectionContext(connectionContext);
336 // TODO: Auxiliary connections supported ?
337 // Device is disconnected and so we need to close TxManager
338 final ListenableFuture<Void> future = deviceCtx.shuttingDownDataStoreTransactions();
339 Futures.addCallback(future, new FutureCallback<Void>() {
341 public void onSuccess(final Void result) {
342 LOG.debug("TxChainManager for device {} is closed successful.", deviceInfo.getLOGValue());
343 deviceTerminPhaseHandler.onDeviceContextLevelDown(deviceInfo);
347 public void onFailure(final Throwable t) {
348 LOG.warn("TxChainManager for device {} failed by closing.", deviceInfo.getLOGValue());
349 LOG.trace("TxChainManager failed by closing. ", t);
350 deviceTerminPhaseHandler.onDeviceContextLevelDown(deviceInfo);
354 // Add timer for Close TxManager because it could fail in cluster without notification
355 final TimerTask timerTask = timeout -> {
356 if (!future.isDone()) {
357 LOG.warn("Shutting down TxChain for node {} not completed during 10 sec. Continue anyway.", deviceInfo.getLOGValue());
358 future.cancel(false);
362 hashedWheelTimer.newTimeout(timerTask, 10, TimeUnit.SECONDS);
366 void addDeviceContextToMap(final DeviceInfo deviceInfo, final DeviceContext deviceContext){
367 deviceContexts.put(deviceInfo, deviceContext);
371 public void setFlowRemovedNotificationOn(boolean isNotificationFlowRemovedOff) {
372 this.isFlowRemovedNotificationOn = isNotificationFlowRemovedOff;
376 public boolean isFlowRemovedNotificationOn() {
377 return this.isFlowRemovedNotificationOn;
382 public void setSkipTableFeatures(boolean skipTableFeaturesValue) {
383 skipTableFeatures = skipTableFeaturesValue;
387 public void setBarrierCountLimit(final int barrierCountLimit) {
388 this.barrierCountLimit = barrierCountLimit;
392 public void setBarrierInterval(final long barrierTimeoutLimit) {
393 this.barrierIntervalNanos = TimeUnit.MILLISECONDS.toNanos(barrierTimeoutLimit);
397 public CheckedFuture<Void, TransactionCommitFailedException> removeDeviceFromOperationalDS(final DeviceInfo deviceInfo) {
398 return removeDeviceFromOperationalDS(deviceInfo.getNodeInstanceIdentifier(), deviceInfo.getLOGValue());
401 private CheckedFuture<Void, TransactionCommitFailedException> removeDeviceFromOperationalDS(
402 final KeyedInstanceIdentifier<Node, NodeKey> nodeIid, final String nodeName) {
403 Preconditions.checkNotNull(nodeIid, "Node IID must not be null");
405 final WriteTransaction delWtx = dataBroker.newWriteOnlyTransaction();
406 delWtx.delete(LogicalDatastoreType.OPERATIONAL, nodeIid);
407 final CheckedFuture<Void, TransactionCommitFailedException> delFuture = delWtx.submit();
409 Futures.addCallback(delFuture, new FutureCallback<Void>() {
411 public void onSuccess(final Void result) {
412 if (LOG.isDebugEnabled()) {
413 LOG.debug("Delete Node {} was successful", nodeName);
418 public void onFailure(@Nonnull final Throwable t) {
419 LOG.warn("Delete node {} failed with exception {}", nodeName, t);
426 private void addCallbackToDeviceInitializeToSlave(final DeviceInfo deviceInfo, final DeviceContext deviceContext, final LifecycleService lifecycleService) {
427 Futures.addCallback(deviceContext.makeDeviceSlave(), new FutureCallback<RpcResult<SetRoleOutput>>() {
429 public void onSuccess(@Nullable RpcResult<SetRoleOutput> setRoleOutputRpcResult) {
430 if (LOG.isDebugEnabled()) {
431 LOG.debug("Role SLAVE was successfully propagated on device, node {}", deviceInfo.getLOGValue());
433 deviceContext.sendNodeAddedNotification();
437 public void onFailure(Throwable throwable) {
438 LOG.warn("Was not able to set role SLAVE to device on node {} ",deviceInfo.getLOGValue());
439 lifecycleService.closeConnection();
444 private void updatePacketInRateLimiters() {
445 synchronized (deviceContexts) {
446 final int deviceContextsSize = deviceContexts.size();
447 if (deviceContextsSize > 0) {
448 long freshNotificationLimit = globalNotificationQuota / deviceContextsSize;
449 if (freshNotificationLimit < 100) {
450 freshNotificationLimit = 100;
452 if (LOG.isDebugEnabled()) {
453 LOG.debug("fresh notification limit = {}", freshNotificationLimit);
455 for (final DeviceContext deviceContext : deviceContexts.values()) {
456 deviceContext.updatePacketInRateLimit(freshNotificationLimit);
462 public void onDeviceRemoved(DeviceInfo deviceInfo) {
463 DeviceContext deviceContext = deviceContexts.remove(deviceInfo);
464 removeddeviceContexts.putIfAbsent(deviceInfo, deviceContext);
465 LOG.debug("Device context removed for node {}", deviceInfo.getLOGValue());
467 lifecycleServices.remove(deviceInfo);
468 LOG.debug("Lifecycle service removed for node {}", deviceInfo.getLOGValue());
472 public void ownershipChanged(EntityOwnershipChange entityOwnershipChange) {
473 if (!entityOwnershipChange.hasOwner()) {
474 final YangInstanceIdentifier yii = entityOwnershipChange.getEntity().getId();
475 final YangInstanceIdentifier.NodeIdentifierWithPredicates niiwp =
476 (YangInstanceIdentifier.NodeIdentifierWithPredicates) yii.getLastPathArgument();
477 String entityName = niiwp.getKeyValues().values().iterator().next().toString();
478 LOG.info("Entity ownership changed for device : {} : {}", entityName, entityOwnershipChange);
480 if (entityName != null ){
481 if (!removeddeviceContexts.isEmpty()) {
482 for (DeviceInfo device : removeddeviceContexts.keySet()) {
483 if (device.getNodeId().getValue().equals(entityName)) {
484 LOG.info("Cleaning up operational data of the node : {}", entityName);
485 // No owner present for the entity, clean up the data and remove it from
487 removeddeviceContexts.remove(device).cleanupDeviceData();
492 removeDeviceFromOperationalDS(DeviceStateUtil.createNodeInstanceIdentifier(new NodeId(entityName)),