2 * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.openflowplugin.impl.device;
10 import com.google.common.annotations.VisibleForTesting;
11 import com.google.common.base.Preconditions;
12 import com.google.common.collect.Iterators;
13 import com.google.common.util.concurrent.CheckedFuture;
14 import com.google.common.util.concurrent.FutureCallback;
15 import com.google.common.util.concurrent.Futures;
16 import com.google.common.util.concurrent.ListenableFuture;
17 import io.netty.util.HashedWheelTimer;
18 import io.netty.util.TimerTask;
19 import java.util.Collections;
20 import java.util.Iterator;
21 import java.util.Objects;
22 import java.util.Optional;
23 import java.util.concurrent.ConcurrentHashMap;
24 import java.util.concurrent.ConcurrentMap;
25 import java.util.concurrent.ExecutionException;
26 import java.util.concurrent.ScheduledThreadPoolExecutor;
27 import java.util.concurrent.TimeUnit;
28 import javax.annotation.CheckForNull;
29 import javax.annotation.Nonnull;
30 import javax.annotation.Nullable;
31 import org.opendaylight.controller.md.sal.binding.api.DataBroker;
32 import org.opendaylight.controller.md.sal.binding.api.NotificationPublishService;
33 import org.opendaylight.controller.md.sal.binding.api.WriteTransaction;
34 import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
35 import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException;
36 import org.opendaylight.mdsal.singleton.common.api.ClusterSingletonServiceProvider;
37 import org.opendaylight.openflowjava.protocol.api.connection.ConnectionAdapter;
38 import org.opendaylight.openflowjava.protocol.api.connection.OutboundQueueHandlerRegistration;
39 import org.opendaylight.openflowplugin.api.openflow.OFPContext;
40 import org.opendaylight.openflowplugin.api.openflow.connection.ConnectionContext;
41 import org.opendaylight.openflowplugin.api.openflow.connection.ConnectionStatus;
42 import org.opendaylight.openflowplugin.api.openflow.connection.OutboundQueueProvider;
43 import org.opendaylight.openflowplugin.api.openflow.device.DeviceContext;
44 import org.opendaylight.openflowplugin.api.openflow.device.DeviceInfo;
45 import org.opendaylight.openflowplugin.api.openflow.device.DeviceManager;
46 import org.opendaylight.openflowplugin.api.openflow.device.TranslatorLibrary;
47 import org.opendaylight.openflowplugin.api.openflow.device.handlers.DeviceInitializationPhaseHandler;
48 import org.opendaylight.openflowplugin.api.openflow.device.handlers.DeviceTerminationPhaseHandler;
49 import org.opendaylight.openflowplugin.api.openflow.lifecycle.LifecycleService;
50 import org.opendaylight.openflowplugin.api.openflow.statistics.ofpspecific.MessageSpy;
51 import org.opendaylight.openflowplugin.extension.api.ExtensionConverterProviderKeeper;
52 import org.opendaylight.openflowplugin.extension.api.core.extension.ExtensionConverterProvider;
53 import org.opendaylight.openflowplugin.impl.connection.OutboundQueueProviderImpl;
54 import org.opendaylight.openflowplugin.impl.device.listener.OpenflowProtocolListenerFullImpl;
55 import org.opendaylight.openflowplugin.impl.lifecycle.LifecycleServiceImpl;
56 import org.opendaylight.openflowplugin.impl.services.SalRoleServiceImpl;
57 import org.opendaylight.openflowplugin.openflow.md.core.sal.convertor.ConvertorExecutor;
58 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.Nodes;
59 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodesBuilder;
60 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node;
61 import org.opendaylight.yang.gen.v1.urn.opendaylight.role.service.rev150727.SetRoleOutput;
62 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
63 import org.opendaylight.yangtools.yang.common.RpcResult;
64 import org.slf4j.Logger;
65 import org.slf4j.LoggerFactory;
70 public class DeviceManagerImpl implements DeviceManager, ExtensionConverterProviderKeeper {
72 private static final Logger LOG = LoggerFactory.getLogger(DeviceManagerImpl.class);
74 private final long globalNotificationQuota;
75 private final boolean switchFeaturesMandatory;
76 private boolean isNotificationFlowRemovedOff;
77 private boolean skipTableFeatures;
78 private static final int SPY_RATE = 10;
80 private final DataBroker dataBroker;
81 private final ConvertorExecutor convertorExecutor;
82 private TranslatorLibrary translatorLibrary;
83 private DeviceInitializationPhaseHandler deviceInitPhaseHandler;
84 private DeviceTerminationPhaseHandler deviceTerminPhaseHandler;
86 private final ConcurrentMap<DeviceInfo, DeviceContext> deviceContexts = new ConcurrentHashMap<>();
87 private final ConcurrentMap<DeviceInfo, LifecycleService> lifecycleServices = new ConcurrentHashMap<>();
89 private long barrierIntervalNanos;
90 private int barrierCountLimit;
92 private ExtensionConverterProvider extensionConverterProvider;
93 private ScheduledThreadPoolExecutor spyPool;
94 private final ClusterSingletonServiceProvider singletonServiceProvider;
95 private final NotificationPublishService notificationPublishService;
96 private final MessageSpy messageSpy;
97 private final HashedWheelTimer hashedWheelTimer;
99 public DeviceManagerImpl(@Nonnull final DataBroker dataBroker,
100 final long globalNotificationQuota,
101 final boolean switchFeaturesMandatory,
102 final long barrierInterval,
103 final int barrierCountLimit,
104 final MessageSpy messageSpy,
105 final boolean isNotificationFlowRemovedOff,
106 final ClusterSingletonServiceProvider singletonServiceProvider,
107 final NotificationPublishService notificationPublishService,
108 final HashedWheelTimer hashedWheelTimer,
109 final ConvertorExecutor convertorExecutor,
110 final boolean skipTableFeatures) {
112 this.dataBroker = dataBroker;
114 /* merge empty nodes to oper DS to predict any problems with missing parent for Node */
115 final WriteTransaction tx = dataBroker.newWriteOnlyTransaction();
116 final NodesBuilder nodesBuilder = new NodesBuilder();
117 nodesBuilder.setNode(Collections.<Node>emptyList());
118 tx.merge(LogicalDatastoreType.OPERATIONAL, InstanceIdentifier.create(Nodes.class), nodesBuilder.build());
121 } catch (ExecutionException | InterruptedException e) {
122 LOG.error("Creation of node failed.", e);
123 throw new IllegalStateException(e);
126 this.switchFeaturesMandatory = switchFeaturesMandatory;
127 this.globalNotificationQuota = globalNotificationQuota;
128 this.isNotificationFlowRemovedOff = isNotificationFlowRemovedOff;
129 this.skipTableFeatures = skipTableFeatures;
130 this.convertorExecutor = convertorExecutor;
131 this.hashedWheelTimer = hashedWheelTimer;
132 this.barrierIntervalNanos = TimeUnit.MILLISECONDS.toNanos(barrierInterval);
133 this.barrierCountLimit = barrierCountLimit;
134 this.spyPool = new ScheduledThreadPoolExecutor(1);
135 this.singletonServiceProvider = singletonServiceProvider;
136 this.notificationPublishService = notificationPublishService;
137 this.messageSpy = messageSpy;
142 public void setDeviceInitializationPhaseHandler(final DeviceInitializationPhaseHandler handler) {
143 this.deviceInitPhaseHandler = handler;
147 public void onDeviceContextLevelUp(@CheckForNull DeviceInfo deviceInfo, final LifecycleService lifecycleService) throws Exception {
148 // final phase - we have to add new Device to MD-SAL DataStore
149 LOG.debug("Final phase of DeviceContextLevelUp for Node: {} ", deviceInfo.getNodeId());
150 DeviceContext deviceContext = Preconditions.checkNotNull(deviceContexts.get(deviceInfo));
151 deviceContext.onPublished();
152 lifecycleService.registerService(this.singletonServiceProvider);
156 public ConnectionStatus deviceConnected(@CheckForNull final ConnectionContext connectionContext) throws Exception {
157 Preconditions.checkArgument(connectionContext != null);
159 DeviceInfo deviceInfo = connectionContext.getDeviceInfo();
161 * This part prevent destroy another device context. Throwing here an exception result to propagate close connection
162 * in {@link org.opendaylight.openflowplugin.impl.connection.org.opendaylight.openflowplugin.impl.connection.HandshakeContextImpl}
163 * If context already exist we are in state closing process (connection flapping) and we should not propagate connection close
165 if (deviceContexts.containsKey(deviceInfo)) {
166 DeviceContext deviceContext = deviceContexts.get(deviceInfo);
167 LOG.warn("Node {} already connected disconnecting device. Rejecting connection", deviceInfo.getLOGValue());
168 if (!deviceContext.getState().equals(OFPContext.CONTEXT_STATE.TERMINATION)) {
169 LOG.warn("Node {} context state not in TERMINATION state.",
170 connectionContext.getDeviceInfo().getLOGValue());
171 return ConnectionStatus.ALREADY_CONNECTED;
173 return ConnectionStatus.CLOSING;
177 LOG.info("ConnectionEvent: Device connected to controller, Device:{}, NodeId:{}",
178 connectionContext.getConnectionAdapter().getRemoteAddress(), deviceInfo.getNodeId());
180 // Add Disconnect handler
181 connectionContext.setDeviceDisconnectedHandler(DeviceManagerImpl.this);
182 // Cache this for clarity
183 final ConnectionAdapter connectionAdapter = connectionContext.getConnectionAdapter();
185 //FIXME: as soon as auxiliary connection are fully supported then this is needed only before device context published
186 connectionAdapter.setPacketInFiltering(true);
188 final OutboundQueueProvider outboundQueueProvider = new OutboundQueueProviderImpl(deviceInfo.getVersion());
190 connectionContext.setOutboundQueueProvider(outboundQueueProvider);
191 final OutboundQueueHandlerRegistration<OutboundQueueProvider> outboundQueueHandlerRegistration =
192 connectionAdapter.registerOutboundQueueHandler(outboundQueueProvider, barrierCountLimit, barrierIntervalNanos);
193 connectionContext.setOutboundQueueHandleRegistration(outboundQueueHandlerRegistration);
195 final LifecycleService lifecycleService = new LifecycleServiceImpl();
197 final DeviceContext deviceContext = new DeviceContextImpl(
208 deviceContext.setSalRoleService(new SalRoleServiceImpl(deviceContext, deviceContext));
209 deviceContexts.put(deviceInfo, deviceContext);
211 lifecycleService.setDeviceContext(deviceContext);
212 deviceContext.putLifecycleServiceIntoTxChainManager(lifecycleService);
214 lifecycleServices.put(deviceInfo, lifecycleService);
216 addCallbackToDeviceInitializeToSlave(deviceInfo, deviceContext, lifecycleService);
218 deviceContext.setSwitchFeaturesMandatory(switchFeaturesMandatory);
220 ((ExtensionConverterProviderKeeper) deviceContext).setExtensionConverterProvider(extensionConverterProvider);
221 deviceContext.setNotificationPublishService(notificationPublishService);
223 updatePacketInRateLimiters();
225 final OpenflowProtocolListenerFullImpl messageListener = new OpenflowProtocolListenerFullImpl(
226 connectionAdapter, deviceContext);
228 connectionAdapter.setMessageListener(messageListener);
229 deviceInitPhaseHandler.onDeviceContextLevelUp(connectionContext.getDeviceInfo(), lifecycleService);
230 return ConnectionStatus.MAY_CONTINUE;
234 public TranslatorLibrary oook() {
235 return translatorLibrary;
239 public void setTranslatorLibrary(final TranslatorLibrary translatorLibrary) {
240 this.translatorLibrary = translatorLibrary;
244 public void close() {
245 for (final Iterator<DeviceContext> iterator = Iterators.consumingIterator(deviceContexts.values().iterator());
246 iterator.hasNext();) {
247 final DeviceContext deviceCtx = iterator.next();
248 deviceCtx.shutdownConnection();
249 deviceCtx.shuttingDownDataStoreTransactions();
252 Optional.ofNullable(spyPool).ifPresent(ScheduledThreadPoolExecutor::shutdownNow);
258 public void onDeviceContextLevelDown(final DeviceInfo deviceInfo) {
260 LifecycleService lifecycleService = lifecycleServices.remove(deviceInfo);
261 if (LOG.isDebugEnabled()) {
262 LOG.debug("Lifecycle service removed for node {}", deviceInfo.getLOGValue());
265 updatePacketInRateLimiters();
266 if (Objects.nonNull(lifecycleService)) {
268 lifecycleService.close();
269 LOG.debug("Lifecycle service successfully closed for node {}", deviceInfo.getLOGValue());
270 } catch (Exception e) {
271 LOG.warn("Closing lifecycle service for node {} was unsuccessful ", deviceInfo.getLOGValue(), e);
275 deviceContexts.remove(deviceInfo);
276 if (LOG.isDebugEnabled()) {
277 LOG.debug("Device context removed for node {}", deviceInfo.getLOGValue());
283 public void initialize() {
284 spyPool.scheduleAtFixedRate(messageSpy, SPY_RATE, SPY_RATE, TimeUnit.SECONDS);
288 public void setExtensionConverterProvider(final ExtensionConverterProvider extensionConverterProvider) {
289 this.extensionConverterProvider = extensionConverterProvider;
293 public ExtensionConverterProvider getExtensionConverterProvider() {
294 return extensionConverterProvider;
298 public void setDeviceTerminationPhaseHandler(final DeviceTerminationPhaseHandler handler) {
299 this.deviceTerminPhaseHandler = handler;
303 public void onDeviceDisconnected(final ConnectionContext connectionContext) {
304 LOG.trace("onDeviceDisconnected method call for Node: {}", connectionContext.getNodeId());
305 final DeviceInfo deviceInfo = connectionContext.getDeviceInfo();
306 final DeviceContext deviceCtx = this.deviceContexts.get(deviceInfo);
308 if (null == deviceCtx) {
309 LOG.info("DeviceContext for Node {} was not found. Connection is terminated without OFP context suite.", deviceInfo.getLOGValue());
313 if (deviceCtx.getState().equals(OFPContext.CONTEXT_STATE.TERMINATION)) {
314 LOG.info("Device context for node {} is already is termination state, waiting for close all context", deviceInfo.getLOGValue());
318 deviceCtx.setState(OFPContext.CONTEXT_STATE.TERMINATION);
320 if (!connectionContext.equals(deviceCtx.getPrimaryConnectionContext())) {
321 LOG.debug("Node {} disconnected, but not primary connection.", connectionContext.getDeviceInfo().getLOGValue());
322 /* Connection is not PrimaryConnection so try to remove from Auxiliary Connections */
323 deviceCtx.removeAuxiliaryConnectionContext(connectionContext);
325 //TODO: Auxiliary connections supported ?
326 /* Device is disconnected and so we need to close TxManager */
327 final ListenableFuture<Void> future = deviceCtx.shuttingDownDataStoreTransactions();
328 Futures.addCallback(future, new FutureCallback<Void>() {
331 public void onSuccess(final Void result) {
332 LOG.debug("TxChainManager for device {} is closed successful.", deviceInfo.getLOGValue());
333 deviceTerminPhaseHandler.onDeviceContextLevelDown(deviceInfo);
337 public void onFailure(final Throwable t) {
338 LOG.warn("TxChainManager for device {} failed by closing.", deviceInfo.getLOGValue());
339 LOG.trace("TxChainManager failed by closing. ", t);
340 deviceTerminPhaseHandler.onDeviceContextLevelDown(deviceInfo);
343 /* Add timer for Close TxManager because it could fain ind cluster without notification */
344 final TimerTask timerTask = timeout -> {
345 if (!future.isDone()) {
346 LOG.warn("Shutting down TxChain for node {} not completed during 10 sec. Continue anyway.", deviceInfo.getLOGValue());
347 future.cancel(false);
350 hashedWheelTimer.newTimeout(timerTask, 10, TimeUnit.SECONDS);
354 void addDeviceContextToMap(final DeviceInfo deviceInfo, final DeviceContext deviceContext){
355 deviceContexts.put(deviceInfo, deviceContext);
359 public void setIsNotificationFlowRemovedOff(boolean isNotificationFlowRemovedOff) {
360 this.isNotificationFlowRemovedOff = isNotificationFlowRemovedOff;
364 public boolean getIsNotificationFlowRemovedOff() {
365 return this.isNotificationFlowRemovedOff;
370 public void setSkipTableFeatures(boolean skipTableFeaturesValue) {
371 skipTableFeatures = skipTableFeaturesValue;
375 public void setBarrierCountLimit(final int barrierCountLimit) {
376 this.barrierCountLimit = barrierCountLimit;
380 public void setBarrierInterval(final long barrierTimeoutLimit) {
381 this.barrierIntervalNanos = TimeUnit.MILLISECONDS.toNanos(barrierTimeoutLimit);
385 public CheckedFuture<Void, TransactionCommitFailedException> removeDeviceFromOperationalDS(final DeviceInfo deviceInfo) {
386 final WriteTransaction delWtx = dataBroker.newWriteOnlyTransaction();
387 delWtx.delete(LogicalDatastoreType.OPERATIONAL, deviceInfo.getNodeInstanceIdentifier());
388 final CheckedFuture<Void, TransactionCommitFailedException> delFuture = delWtx.submit();
390 Futures.addCallback(delFuture, new FutureCallback<Void>() {
392 public void onSuccess(final Void result) {
393 if (LOG.isDebugEnabled()) {
394 LOG.debug("Delete Node {} was successful", deviceInfo.getLOGValue());
399 public void onFailure(@Nonnull final Throwable t) {
400 LOG.warn("Delete node {} failed with exception {}", deviceInfo.getLOGValue(), t);
408 private void addCallbackToDeviceInitializeToSlave(final DeviceInfo deviceInfo, final DeviceContext deviceContext, final LifecycleService lifecycleService) {
409 Futures.addCallback(deviceContext.makeDeviceSlave(), new FutureCallback<RpcResult<SetRoleOutput>>() {
411 public void onSuccess(@Nullable RpcResult<SetRoleOutput> setRoleOutputRpcResult) {
412 if (LOG.isDebugEnabled()) {
413 LOG.debug("Role SLAVE was successfully propagated on device, node {}", deviceInfo.getLOGValue());
418 public void onFailure(Throwable throwable) {
419 LOG.warn("Was not able to set role SLAVE to device on node {} ",deviceInfo.getLOGValue());
420 lifecycleService.closeConnection();
425 private void updatePacketInRateLimiters() {
426 synchronized (deviceContexts) {
427 final int deviceContextsSize = deviceContexts.size();
428 if (deviceContextsSize > 0) {
429 long freshNotificationLimit = globalNotificationQuota / deviceContextsSize;
430 if (freshNotificationLimit < 100) {
431 freshNotificationLimit = 100;
433 if (LOG.isDebugEnabled()) {
434 LOG.debug("fresh notification limit = {}", freshNotificationLimit);
436 for (final DeviceContext deviceContext : deviceContexts.values()) {
437 deviceContext.updatePacketInRateLimit(freshNotificationLimit);
444 void setDeviceContext(final DeviceInfo deviceInfo, final DeviceContext deviceContext) {
445 this.deviceContexts.putIfAbsent(deviceInfo, deviceContext);
449 int getDeviceContextCount() {
450 return this.deviceContexts.size();