2 * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.openflowplugin.impl.device;
10 import com.google.common.annotations.VisibleForTesting;
11 import com.google.common.base.Preconditions;
12 import com.google.common.collect.Iterators;
13 import com.google.common.util.concurrent.CheckedFuture;
14 import com.google.common.util.concurrent.FutureCallback;
15 import com.google.common.util.concurrent.Futures;
16 import com.google.common.util.concurrent.ListenableFuture;
17 import io.netty.util.HashedWheelTimer;
18 import io.netty.util.TimerTask;
19 import java.util.Collections;
20 import java.util.Iterator;
21 import java.util.Objects;
22 import java.util.Optional;
23 import java.util.concurrent.ConcurrentHashMap;
24 import java.util.concurrent.ConcurrentMap;
25 import java.util.concurrent.ExecutionException;
26 import java.util.concurrent.ScheduledThreadPoolExecutor;
27 import java.util.concurrent.TimeUnit;
28 import javax.annotation.CheckForNull;
29 import javax.annotation.Nonnull;
30 import javax.annotation.Nullable;
31 import org.opendaylight.controller.md.sal.binding.api.DataBroker;
32 import org.opendaylight.controller.md.sal.binding.api.NotificationPublishService;
33 import org.opendaylight.controller.md.sal.binding.api.WriteTransaction;
34 import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
35 import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException;
36 import org.opendaylight.mdsal.singleton.common.api.ClusterSingletonServiceProvider;
37 import org.opendaylight.openflowjava.protocol.api.connection.ConnectionAdapter;
38 import org.opendaylight.openflowjava.protocol.api.connection.OutboundQueueHandlerRegistration;
39 import org.opendaylight.openflowplugin.api.openflow.OFPContext;
40 import org.opendaylight.openflowplugin.api.openflow.connection.ConnectionContext;
41 import org.opendaylight.openflowplugin.api.openflow.connection.ConnectionStatus;
42 import org.opendaylight.openflowplugin.api.openflow.connection.OutboundQueueProvider;
43 import org.opendaylight.openflowplugin.api.openflow.device.DeviceContext;
44 import org.opendaylight.openflowplugin.api.openflow.device.DeviceInfo;
45 import org.opendaylight.openflowplugin.api.openflow.device.DeviceManager;
46 import org.opendaylight.openflowplugin.api.openflow.device.TranslatorLibrary;
47 import org.opendaylight.openflowplugin.api.openflow.device.handlers.DeviceInitializationPhaseHandler;
48 import org.opendaylight.openflowplugin.api.openflow.device.handlers.DeviceTerminationPhaseHandler;
49 import org.opendaylight.openflowplugin.api.openflow.lifecycle.LifecycleService;
50 import org.opendaylight.openflowplugin.api.openflow.statistics.ofpspecific.MessageSpy;
51 import org.opendaylight.openflowplugin.extension.api.ExtensionConverterProviderKeeper;
52 import org.opendaylight.openflowplugin.extension.api.core.extension.ExtensionConverterProvider;
53 import org.opendaylight.openflowplugin.impl.connection.OutboundQueueProviderImpl;
54 import org.opendaylight.openflowplugin.impl.device.listener.OpenflowProtocolListenerFullImpl;
55 import org.opendaylight.openflowplugin.impl.lifecycle.LifecycleServiceImpl;
56 import org.opendaylight.openflowplugin.impl.services.SalRoleServiceImpl;
57 import org.opendaylight.openflowplugin.openflow.md.core.sal.convertor.ConvertorExecutor;
58 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.Nodes;
59 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodesBuilder;
60 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node;
61 import org.opendaylight.yang.gen.v1.urn.opendaylight.role.service.rev150727.SetRoleOutput;
62 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
63 import org.opendaylight.yangtools.yang.common.RpcResult;
64 import org.slf4j.Logger;
65 import org.slf4j.LoggerFactory;
70 public class DeviceManagerImpl implements DeviceManager, ExtensionConverterProviderKeeper {
72 private static final Logger LOG = LoggerFactory.getLogger(DeviceManagerImpl.class);
74 private final long globalNotificationQuota;
75 private final boolean switchFeaturesMandatory;
76 private boolean isNotificationFlowRemovedOff;
77 private boolean skipTableFeatures;
78 private static final int SPY_RATE = 10;
80 private final DataBroker dataBroker;
81 private final ConvertorExecutor convertorExecutor;
82 private TranslatorLibrary translatorLibrary;
83 private DeviceInitializationPhaseHandler deviceInitPhaseHandler;
84 private DeviceTerminationPhaseHandler deviceTerminPhaseHandler;
86 private final ConcurrentMap<DeviceInfo, DeviceContext> deviceContexts = new ConcurrentHashMap<>();
87 private final ConcurrentMap<DeviceInfo, LifecycleService> lifecycleServices = new ConcurrentHashMap<>();
89 private long barrierIntervalNanos;
90 private int barrierCountLimit;
92 private ExtensionConverterProvider extensionConverterProvider;
93 private ScheduledThreadPoolExecutor spyPool;
94 private final ClusterSingletonServiceProvider singletonServiceProvider;
95 private final NotificationPublishService notificationPublishService;
96 private final MessageSpy messageSpy;
97 private final HashedWheelTimer hashedWheelTimer;
99 public DeviceManagerImpl(@Nonnull final DataBroker dataBroker,
100 final long globalNotificationQuota,
101 final boolean switchFeaturesMandatory,
102 final long barrierInterval,
103 final int barrierCountLimit,
104 final MessageSpy messageSpy,
105 final boolean isNotificationFlowRemovedOff,
106 final ClusterSingletonServiceProvider singletonServiceProvider,
107 final NotificationPublishService notificationPublishService,
108 final HashedWheelTimer hashedWheelTimer,
109 final ConvertorExecutor convertorExecutor,
110 final boolean skipTableFeatures) {
112 this.dataBroker = dataBroker;
114 /* merge empty nodes to oper DS to predict any problems with missing parent for Node */
115 final WriteTransaction tx = dataBroker.newWriteOnlyTransaction();
116 final NodesBuilder nodesBuilder = new NodesBuilder();
117 nodesBuilder.setNode(Collections.<Node>emptyList());
118 tx.merge(LogicalDatastoreType.OPERATIONAL, InstanceIdentifier.create(Nodes.class), nodesBuilder.build());
121 } catch (ExecutionException | InterruptedException e) {
122 LOG.error("Creation of node failed.", e);
123 throw new IllegalStateException(e);
126 this.switchFeaturesMandatory = switchFeaturesMandatory;
127 this.globalNotificationQuota = globalNotificationQuota;
128 this.isNotificationFlowRemovedOff = isNotificationFlowRemovedOff;
129 this.skipTableFeatures = skipTableFeatures;
130 this.convertorExecutor = convertorExecutor;
131 this.hashedWheelTimer = hashedWheelTimer;
132 this.barrierIntervalNanos = TimeUnit.MILLISECONDS.toNanos(barrierInterval);
133 this.barrierCountLimit = barrierCountLimit;
134 this.spyPool = new ScheduledThreadPoolExecutor(1);
135 this.singletonServiceProvider = singletonServiceProvider;
136 this.notificationPublishService = notificationPublishService;
137 this.messageSpy = messageSpy;
142 public void setDeviceInitializationPhaseHandler(final DeviceInitializationPhaseHandler handler) {
143 this.deviceInitPhaseHandler = handler;
147 public void onDeviceContextLevelUp(@CheckForNull DeviceInfo deviceInfo, final LifecycleService lifecycleService) throws Exception {
148 // final phase - we have to add new Device to MD-SAL DataStore
149 LOG.debug("Final phase of DeviceContextLevelUp for Node: {} ", deviceInfo.getNodeId());
150 DeviceContext deviceContext = Preconditions.checkNotNull(deviceContexts.get(deviceInfo));
151 deviceContext.onPublished();
152 lifecycleService.registerDeviceRemovedHandler(this);
153 lifecycleService.registerService(this.singletonServiceProvider);
157 public ConnectionStatus deviceConnected(@CheckForNull final ConnectionContext connectionContext) throws Exception {
158 Preconditions.checkArgument(connectionContext != null);
159 final DeviceInfo deviceInfo = connectionContext.getDeviceInfo();
162 * This part prevent destroy another device context. Throwing here an exception result to propagate close connection
163 * in {@link org.opendaylight.openflowplugin.impl.connection.org.opendaylight.openflowplugin.impl.connection.HandshakeContextImpl}
164 * If context already exist we are in state closing process (connection flapping) and we should not propagate connection close
166 if (deviceContexts.containsKey(deviceInfo)) {
167 DeviceContext deviceContext = deviceContexts.get(deviceInfo);
168 LOG.warn("Node {} already connected disconnecting device. Rejecting connection", deviceInfo.getLOGValue());
169 if (!deviceContext.getState().equals(OFPContext.CONTEXT_STATE.TERMINATION)) {
170 LOG.warn("Node {} context state not in TERMINATION state.",
171 connectionContext.getDeviceInfo().getLOGValue());
172 return ConnectionStatus.ALREADY_CONNECTED;
174 return ConnectionStatus.CLOSING;
178 LOG.info("ConnectionEvent: Device connected to controller, Device:{}, NodeId:{}",
179 connectionContext.getConnectionAdapter().getRemoteAddress(), deviceInfo.getNodeId());
181 // Add Disconnect handler
182 connectionContext.setDeviceDisconnectedHandler(this);
184 // Cache this for clarity
185 final ConnectionAdapter connectionAdapter = connectionContext.getConnectionAdapter();
187 // FIXME: as soon as auxiliary connection are fully supported then this is needed only before device context published
188 connectionAdapter.setPacketInFiltering(true);
190 final OutboundQueueProvider outboundQueueProvider = new OutboundQueueProviderImpl(deviceInfo.getVersion());
192 connectionContext.setOutboundQueueProvider(outboundQueueProvider);
193 final OutboundQueueHandlerRegistration<OutboundQueueProvider> outboundQueueHandlerRegistration =
194 connectionAdapter.registerOutboundQueueHandler(outboundQueueProvider, barrierCountLimit, barrierIntervalNanos);
195 connectionContext.setOutboundQueueHandleRegistration(outboundQueueHandlerRegistration);
197 final LifecycleService lifecycleService = new LifecycleServiceImpl();
199 final DeviceContext deviceContext = new DeviceContextImpl(
210 deviceContext.setSalRoleService(new SalRoleServiceImpl(deviceContext, deviceContext));
211 deviceContexts.put(deviceInfo, deviceContext);
213 lifecycleService.setDeviceContext(deviceContext);
214 deviceContext.putLifecycleServiceIntoTxChainManager(lifecycleService);
216 lifecycleServices.put(deviceInfo, lifecycleService);
218 addCallbackToDeviceInitializeToSlave(deviceInfo, deviceContext, lifecycleService);
220 deviceContext.setSwitchFeaturesMandatory(switchFeaturesMandatory);
222 ((ExtensionConverterProviderKeeper) deviceContext).setExtensionConverterProvider(extensionConverterProvider);
223 deviceContext.setNotificationPublishService(notificationPublishService);
225 updatePacketInRateLimiters();
227 final OpenflowProtocolListenerFullImpl messageListener = new OpenflowProtocolListenerFullImpl(
228 connectionAdapter, deviceContext);
230 connectionAdapter.setMessageListener(messageListener);
231 deviceInitPhaseHandler.onDeviceContextLevelUp(connectionContext.getDeviceInfo(), lifecycleService);
232 return ConnectionStatus.MAY_CONTINUE;
236 public TranslatorLibrary oook() {
237 return translatorLibrary;
241 public void setTranslatorLibrary(final TranslatorLibrary translatorLibrary) {
242 this.translatorLibrary = translatorLibrary;
246 public void close() {
247 for (final Iterator<DeviceContext> iterator = Iterators.consumingIterator(deviceContexts.values().iterator());
248 iterator.hasNext();) {
249 final DeviceContext deviceCtx = iterator.next();
250 deviceCtx.shutdownConnection();
251 deviceCtx.shuttingDownDataStoreTransactions();
254 Optional.ofNullable(spyPool).ifPresent(ScheduledThreadPoolExecutor::shutdownNow);
260 public void onDeviceContextLevelDown(final DeviceInfo deviceInfo) {
261 updatePacketInRateLimiters();
262 Optional.ofNullable(lifecycleServices.get(deviceInfo)).ifPresent(OFPContext::close);
266 public void initialize() {
267 spyPool.scheduleAtFixedRate(messageSpy, SPY_RATE, SPY_RATE, TimeUnit.SECONDS);
271 public void setExtensionConverterProvider(final ExtensionConverterProvider extensionConverterProvider) {
272 this.extensionConverterProvider = extensionConverterProvider;
276 public ExtensionConverterProvider getExtensionConverterProvider() {
277 return extensionConverterProvider;
281 public void setDeviceTerminationPhaseHandler(final DeviceTerminationPhaseHandler handler) {
282 this.deviceTerminPhaseHandler = handler;
286 public void onDeviceDisconnected(final ConnectionContext connectionContext) {
287 LOG.trace("onDeviceDisconnected method call for Node: {}", connectionContext.getNodeId());
288 final DeviceInfo deviceInfo = connectionContext.getDeviceInfo();
289 final DeviceContext deviceCtx = this.deviceContexts.get(deviceInfo);
291 if (Objects.isNull(deviceCtx)) {
292 LOG.info("DeviceContext for Node {} was not found. Connection is terminated without OFP context suite.", deviceInfo.getLOGValue());
296 if (deviceCtx.getState().equals(OFPContext.CONTEXT_STATE.TERMINATION)) {
297 LOG.info("Device context for node {} is already is termination state, waiting for close all context", deviceInfo.getLOGValue());
303 if (!connectionContext.equals(deviceCtx.getPrimaryConnectionContext())) {
304 LOG.debug("Node {} disconnected, but not primary connection.", connectionContext.getDeviceInfo().getLOGValue());
305 // Connection is not PrimaryConnection so try to remove from Auxiliary Connections
306 deviceCtx.removeAuxiliaryConnectionContext(connectionContext);
309 // TODO: Auxiliary connections supported ?
310 // Device is disconnected and so we need to close TxManager
311 final ListenableFuture<Void> future = deviceCtx.shuttingDownDataStoreTransactions();
312 Futures.addCallback(future, new FutureCallback<Void>() {
314 public void onSuccess(final Void result) {
315 LOG.debug("TxChainManager for device {} is closed successful.", deviceInfo.getLOGValue());
316 deviceTerminPhaseHandler.onDeviceContextLevelDown(deviceInfo);
320 public void onFailure(final Throwable t) {
321 LOG.warn("TxChainManager for device {} failed by closing.", deviceInfo.getLOGValue());
322 LOG.trace("TxChainManager failed by closing. ", t);
323 deviceTerminPhaseHandler.onDeviceContextLevelDown(deviceInfo);
327 // Add timer for Close TxManager because it could fail in cluster without notification
328 final TimerTask timerTask = timeout -> {
329 if (!future.isDone()) {
330 LOG.warn("Shutting down TxChain for node {} not completed during 10 sec. Continue anyway.", deviceInfo.getLOGValue());
331 future.cancel(false);
335 hashedWheelTimer.newTimeout(timerTask, 10, TimeUnit.SECONDS);
339 void addDeviceContextToMap(final DeviceInfo deviceInfo, final DeviceContext deviceContext){
340 deviceContexts.put(deviceInfo, deviceContext);
344 public void setIsNotificationFlowRemovedOff(boolean isNotificationFlowRemovedOff) {
345 this.isNotificationFlowRemovedOff = isNotificationFlowRemovedOff;
349 public boolean getIsNotificationFlowRemovedOff() {
350 return this.isNotificationFlowRemovedOff;
355 public void setSkipTableFeatures(boolean skipTableFeaturesValue) {
356 skipTableFeatures = skipTableFeaturesValue;
360 public void setBarrierCountLimit(final int barrierCountLimit) {
361 this.barrierCountLimit = barrierCountLimit;
365 public void setBarrierInterval(final long barrierTimeoutLimit) {
366 this.barrierIntervalNanos = TimeUnit.MILLISECONDS.toNanos(barrierTimeoutLimit);
370 public CheckedFuture<Void, TransactionCommitFailedException> removeDeviceFromOperationalDS(final DeviceInfo deviceInfo) {
371 final WriteTransaction delWtx = dataBroker.newWriteOnlyTransaction();
372 delWtx.delete(LogicalDatastoreType.OPERATIONAL, deviceInfo.getNodeInstanceIdentifier());
373 final CheckedFuture<Void, TransactionCommitFailedException> delFuture = delWtx.submit();
375 Futures.addCallback(delFuture, new FutureCallback<Void>() {
377 public void onSuccess(final Void result) {
378 if (LOG.isDebugEnabled()) {
379 LOG.debug("Delete Node {} was successful", deviceInfo.getLOGValue());
384 public void onFailure(@Nonnull final Throwable t) {
385 LOG.warn("Delete node {} failed with exception {}", deviceInfo.getLOGValue(), t);
393 private void addCallbackToDeviceInitializeToSlave(final DeviceInfo deviceInfo, final DeviceContext deviceContext, final LifecycleService lifecycleService) {
394 Futures.addCallback(deviceContext.makeDeviceSlave(), new FutureCallback<RpcResult<SetRoleOutput>>() {
396 public void onSuccess(@Nullable RpcResult<SetRoleOutput> setRoleOutputRpcResult) {
397 if (LOG.isDebugEnabled()) {
398 LOG.debug("Role SLAVE was successfully propagated on device, node {}", deviceInfo.getLOGValue());
403 public void onFailure(Throwable throwable) {
404 LOG.warn("Was not able to set role SLAVE to device on node {} ",deviceInfo.getLOGValue());
405 lifecycleService.closeConnection();
410 private void updatePacketInRateLimiters() {
411 synchronized (deviceContexts) {
412 final int deviceContextsSize = deviceContexts.size();
413 if (deviceContextsSize > 0) {
414 long freshNotificationLimit = globalNotificationQuota / deviceContextsSize;
415 if (freshNotificationLimit < 100) {
416 freshNotificationLimit = 100;
418 if (LOG.isDebugEnabled()) {
419 LOG.debug("fresh notification limit = {}", freshNotificationLimit);
421 for (final DeviceContext deviceContext : deviceContexts.values()) {
422 deviceContext.updatePacketInRateLimit(freshNotificationLimit);
428 public void onDeviceRemoved(DeviceInfo deviceInfo) {
429 deviceContexts.remove(deviceInfo);
430 LOG.debug("Device context removed for node {}", deviceInfo.getLOGValue());
432 lifecycleServices.remove(deviceInfo);
433 LOG.debug("Lifecycle service removed for node {}", deviceInfo.getLOGValue());