2 * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.openflowplugin.impl.device;
10 import com.google.common.annotations.VisibleForTesting;
11 import com.google.common.base.Preconditions;
12 import com.google.common.collect.Iterators;
13 import com.google.common.util.concurrent.CheckedFuture;
14 import com.google.common.util.concurrent.FutureCallback;
15 import com.google.common.util.concurrent.Futures;
16 import com.google.common.util.concurrent.ListenableFuture;
17 import io.netty.util.HashedWheelTimer;
18 import io.netty.util.TimerTask;
19 import java.util.Collections;
20 import java.util.Iterator;
21 import java.util.Objects;
22 import java.util.Optional;
23 import java.util.concurrent.ConcurrentHashMap;
24 import java.util.concurrent.ConcurrentMap;
25 import java.util.concurrent.ExecutionException;
26 import java.util.concurrent.ScheduledThreadPoolExecutor;
27 import java.util.concurrent.TimeUnit;
28 import javax.annotation.CheckForNull;
29 import javax.annotation.Nonnull;
30 import javax.annotation.Nullable;
31 import org.opendaylight.controller.md.sal.binding.api.DataBroker;
32 import org.opendaylight.controller.md.sal.binding.api.NotificationPublishService;
33 import org.opendaylight.controller.md.sal.binding.api.WriteTransaction;
34 import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
35 import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException;
36 import org.opendaylight.mdsal.singleton.common.api.ClusterSingletonServiceProvider;
37 import org.opendaylight.openflowjava.protocol.api.connection.ConnectionAdapter;
38 import org.opendaylight.openflowjava.protocol.api.connection.OutboundQueueHandlerRegistration;
39 import org.opendaylight.openflowplugin.api.openflow.OFPContext;
40 import org.opendaylight.openflowplugin.api.openflow.connection.ConnectionContext;
41 import org.opendaylight.openflowplugin.api.openflow.connection.ConnectionStatus;
42 import org.opendaylight.openflowplugin.api.openflow.connection.OutboundQueueProvider;
43 import org.opendaylight.openflowplugin.api.openflow.device.DeviceContext;
44 import org.opendaylight.openflowplugin.api.openflow.device.DeviceInfo;
45 import org.opendaylight.openflowplugin.api.openflow.device.DeviceManager;
46 import org.opendaylight.openflowplugin.api.openflow.device.TranslatorLibrary;
47 import org.opendaylight.openflowplugin.api.openflow.device.handlers.DeviceInitializationPhaseHandler;
48 import org.opendaylight.openflowplugin.api.openflow.device.handlers.DeviceTerminationPhaseHandler;
49 import org.opendaylight.openflowplugin.api.openflow.lifecycle.LifecycleService;
50 import org.opendaylight.openflowplugin.api.openflow.statistics.ofpspecific.MessageSpy;
51 import org.opendaylight.openflowplugin.extension.api.ExtensionConverterProviderKeeper;
52 import org.opendaylight.openflowplugin.extension.api.core.extension.ExtensionConverterProvider;
53 import org.opendaylight.openflowplugin.impl.connection.OutboundQueueProviderImpl;
54 import org.opendaylight.openflowplugin.impl.device.listener.OpenflowProtocolListenerFullImpl;
55 import org.opendaylight.openflowplugin.impl.lifecycle.LifecycleServiceImpl;
56 import org.opendaylight.openflowplugin.impl.services.SalRoleServiceImpl;
57 import org.opendaylight.openflowplugin.openflow.md.core.sal.convertor.ConvertorExecutor;
58 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.Nodes;
59 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodesBuilder;
60 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node;
61 import org.opendaylight.yang.gen.v1.urn.opendaylight.role.service.rev150727.SetRoleOutput;
62 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
63 import org.opendaylight.yangtools.yang.common.RpcResult;
64 import org.slf4j.Logger;
65 import org.slf4j.LoggerFactory;
70 public class DeviceManagerImpl implements DeviceManager, ExtensionConverterProviderKeeper {
72 private static final Logger LOG = LoggerFactory.getLogger(DeviceManagerImpl.class);
74 private final long globalNotificationQuota;
75 private final boolean switchFeaturesMandatory;
76 private boolean isFlowRemovedNotificationOn;
77 private boolean skipTableFeatures;
78 private static final int SPY_RATE = 10;
80 private final DataBroker dataBroker;
81 private final ConvertorExecutor convertorExecutor;
82 private TranslatorLibrary translatorLibrary;
83 private DeviceInitializationPhaseHandler deviceInitPhaseHandler;
84 private DeviceTerminationPhaseHandler deviceTerminPhaseHandler;
86 private final ConcurrentMap<DeviceInfo, DeviceContext> deviceContexts = new ConcurrentHashMap<>();
87 private final ConcurrentMap<DeviceInfo, LifecycleService> lifecycleServices = new ConcurrentHashMap<>();
89 private long barrierIntervalNanos;
90 private int barrierCountLimit;
92 private ExtensionConverterProvider extensionConverterProvider;
93 private ScheduledThreadPoolExecutor spyPool;
94 private final ClusterSingletonServiceProvider singletonServiceProvider;
95 private final NotificationPublishService notificationPublishService;
96 private final MessageSpy messageSpy;
97 private final HashedWheelTimer hashedWheelTimer;
98 private final boolean useSingleLayerSerialization;
100 public DeviceManagerImpl(@Nonnull final DataBroker dataBroker,
101 final long globalNotificationQuota,
102 final boolean switchFeaturesMandatory,
103 final long barrierInterval,
104 final int barrierCountLimit,
105 final MessageSpy messageSpy,
106 final boolean isFlowRemovedNotificationOn,
107 final ClusterSingletonServiceProvider singletonServiceProvider,
108 final NotificationPublishService notificationPublishService,
109 final HashedWheelTimer hashedWheelTimer,
110 final ConvertorExecutor convertorExecutor,
111 final boolean skipTableFeatures,
112 final boolean useSingleLayerSerialization) {
114 this.dataBroker = dataBroker;
116 /* merge empty nodes to oper DS to predict any problems with missing parent for Node */
117 final WriteTransaction tx = dataBroker.newWriteOnlyTransaction();
118 final NodesBuilder nodesBuilder = new NodesBuilder();
119 nodesBuilder.setNode(Collections.<Node>emptyList());
120 tx.merge(LogicalDatastoreType.OPERATIONAL, InstanceIdentifier.create(Nodes.class), nodesBuilder.build());
123 } catch (ExecutionException | InterruptedException e) {
124 LOG.error("Creation of node failed.", e);
125 throw new IllegalStateException(e);
128 this.switchFeaturesMandatory = switchFeaturesMandatory;
129 this.globalNotificationQuota = globalNotificationQuota;
130 this.isFlowRemovedNotificationOn = isFlowRemovedNotificationOn;
131 this.skipTableFeatures = skipTableFeatures;
132 this.convertorExecutor = convertorExecutor;
133 this.hashedWheelTimer = hashedWheelTimer;
134 this.barrierIntervalNanos = TimeUnit.MILLISECONDS.toNanos(barrierInterval);
135 this.barrierCountLimit = barrierCountLimit;
136 this.spyPool = new ScheduledThreadPoolExecutor(1);
137 this.singletonServiceProvider = singletonServiceProvider;
138 this.notificationPublishService = notificationPublishService;
139 this.messageSpy = messageSpy;
140 this.useSingleLayerSerialization = useSingleLayerSerialization;
145 public void setDeviceInitializationPhaseHandler(final DeviceInitializationPhaseHandler handler) {
146 this.deviceInitPhaseHandler = handler;
150 public void onDeviceContextLevelUp(@CheckForNull DeviceInfo deviceInfo, final LifecycleService lifecycleService) throws Exception {
151 // final phase - we have to add new Device to MD-SAL DataStore
152 LOG.debug("Final phase of DeviceContextLevelUp for Node: {} ", deviceInfo.getNodeId());
153 DeviceContext deviceContext = Preconditions.checkNotNull(deviceContexts.get(deviceInfo));
154 deviceContext.onPublished();
155 lifecycleService.registerDeviceRemovedHandler(this);
156 lifecycleService.registerService(this.singletonServiceProvider);
160 public ConnectionStatus deviceConnected(@CheckForNull final ConnectionContext connectionContext) throws Exception {
161 Preconditions.checkArgument(connectionContext != null);
162 final DeviceInfo deviceInfo = connectionContext.getDeviceInfo();
165 * This part prevent destroy another device context. Throwing here an exception result to propagate close connection
166 * in {@link org.opendaylight.openflowplugin.impl.connection.org.opendaylight.openflowplugin.impl.connection.HandshakeContextImpl}
167 * If context already exist we are in state closing process (connection flapping) and we should not propagate connection close
169 if (deviceContexts.containsKey(deviceInfo)) {
170 DeviceContext deviceContext = deviceContexts.get(deviceInfo);
171 LOG.warn("Node {} already connected disconnecting device. Rejecting connection", deviceInfo.getLOGValue());
172 if (!deviceContext.getState().equals(OFPContext.CONTEXT_STATE.TERMINATION)) {
173 LOG.warn("Node {} context state not in TERMINATION state.",
174 connectionContext.getDeviceInfo().getLOGValue());
175 return ConnectionStatus.ALREADY_CONNECTED;
177 return ConnectionStatus.CLOSING;
181 LOG.info("ConnectionEvent: Device connected to controller, Device:{}, NodeId:{}",
182 connectionContext.getConnectionAdapter().getRemoteAddress(), deviceInfo.getNodeId());
184 // Add Disconnect handler
185 connectionContext.setDeviceDisconnectedHandler(this);
187 // Cache this for clarity
188 final ConnectionAdapter connectionAdapter = connectionContext.getConnectionAdapter();
190 // FIXME: as soon as auxiliary connection are fully supported then this is needed only before device context published
191 connectionAdapter.setPacketInFiltering(true);
193 final OutboundQueueProvider outboundQueueProvider = new OutboundQueueProviderImpl(deviceInfo.getVersion());
195 connectionContext.setOutboundQueueProvider(outboundQueueProvider);
196 final OutboundQueueHandlerRegistration<OutboundQueueProvider> outboundQueueHandlerRegistration =
197 connectionAdapter.registerOutboundQueueHandler(outboundQueueProvider, barrierCountLimit, barrierIntervalNanos);
198 connectionContext.setOutboundQueueHandleRegistration(outboundQueueHandlerRegistration);
200 final LifecycleService lifecycleService = new LifecycleServiceImpl();
202 final DeviceContext deviceContext = new DeviceContextImpl(
212 useSingleLayerSerialization);
214 deviceContext.setSalRoleService(new SalRoleServiceImpl(deviceContext, deviceContext));
215 deviceContexts.put(deviceInfo, deviceContext);
217 lifecycleService.setDeviceContext(deviceContext);
218 deviceContext.putLifecycleServiceIntoTxChainManager(lifecycleService);
220 lifecycleServices.put(deviceInfo, lifecycleService);
222 addCallbackToDeviceInitializeToSlave(deviceInfo, deviceContext, lifecycleService);
224 deviceContext.setSwitchFeaturesMandatory(switchFeaturesMandatory);
226 ((ExtensionConverterProviderKeeper) deviceContext).setExtensionConverterProvider(extensionConverterProvider);
227 deviceContext.setNotificationPublishService(notificationPublishService);
229 updatePacketInRateLimiters();
231 final OpenflowProtocolListenerFullImpl messageListener = new OpenflowProtocolListenerFullImpl(
232 connectionAdapter, deviceContext);
234 connectionAdapter.setMessageListener(messageListener);
235 deviceInitPhaseHandler.onDeviceContextLevelUp(connectionContext.getDeviceInfo(), lifecycleService);
236 return ConnectionStatus.MAY_CONTINUE;
240 public TranslatorLibrary oook() {
241 return translatorLibrary;
245 public void setTranslatorLibrary(final TranslatorLibrary translatorLibrary) {
246 this.translatorLibrary = translatorLibrary;
250 public void close() {
251 for (final Iterator<DeviceContext> iterator = Iterators.consumingIterator(deviceContexts.values().iterator());
252 iterator.hasNext();) {
253 final DeviceContext deviceCtx = iterator.next();
254 deviceCtx.shutdownConnection();
255 deviceCtx.shuttingDownDataStoreTransactions();
258 Optional.ofNullable(spyPool).ifPresent(ScheduledThreadPoolExecutor::shutdownNow);
264 public void onDeviceContextLevelDown(final DeviceInfo deviceInfo) {
265 updatePacketInRateLimiters();
266 Optional.ofNullable(lifecycleServices.get(deviceInfo)).ifPresent(OFPContext::close);
270 public void initialize() {
271 spyPool.scheduleAtFixedRate(messageSpy, SPY_RATE, SPY_RATE, TimeUnit.SECONDS);
275 public void setExtensionConverterProvider(final ExtensionConverterProvider extensionConverterProvider) {
276 this.extensionConverterProvider = extensionConverterProvider;
280 public ExtensionConverterProvider getExtensionConverterProvider() {
281 return extensionConverterProvider;
285 public void setDeviceTerminationPhaseHandler(final DeviceTerminationPhaseHandler handler) {
286 this.deviceTerminPhaseHandler = handler;
290 public void onDeviceDisconnected(final ConnectionContext connectionContext) {
291 LOG.trace("onDeviceDisconnected method call for Node: {}", connectionContext.getNodeId());
292 final DeviceInfo deviceInfo = connectionContext.getDeviceInfo();
293 final DeviceContext deviceCtx = this.deviceContexts.get(deviceInfo);
295 if (Objects.isNull(deviceCtx)) {
296 LOG.info("DeviceContext for Node {} was not found. Connection is terminated without OFP context suite.", deviceInfo.getLOGValue());
300 if (deviceCtx.getState().equals(OFPContext.CONTEXT_STATE.TERMINATION)) {
301 LOG.info("Device context for node {} is already is termination state, waiting for close all context", deviceInfo.getLOGValue());
307 if (!connectionContext.equals(deviceCtx.getPrimaryConnectionContext())) {
308 LOG.debug("Node {} disconnected, but not primary connection.", connectionContext.getDeviceInfo().getLOGValue());
309 // Connection is not PrimaryConnection so try to remove from Auxiliary Connections
310 deviceCtx.removeAuxiliaryConnectionContext(connectionContext);
313 // TODO: Auxiliary connections supported ?
314 // Device is disconnected and so we need to close TxManager
315 final ListenableFuture<Void> future = deviceCtx.shuttingDownDataStoreTransactions();
316 Futures.addCallback(future, new FutureCallback<Void>() {
318 public void onSuccess(final Void result) {
319 LOG.debug("TxChainManager for device {} is closed successful.", deviceInfo.getLOGValue());
320 deviceTerminPhaseHandler.onDeviceContextLevelDown(deviceInfo);
324 public void onFailure(final Throwable t) {
325 LOG.warn("TxChainManager for device {} failed by closing.", deviceInfo.getLOGValue());
326 LOG.trace("TxChainManager failed by closing. ", t);
327 deviceTerminPhaseHandler.onDeviceContextLevelDown(deviceInfo);
331 // Add timer for Close TxManager because it could fail in cluster without notification
332 final TimerTask timerTask = timeout -> {
333 if (!future.isDone()) {
334 LOG.warn("Shutting down TxChain for node {} not completed during 10 sec. Continue anyway.", deviceInfo.getLOGValue());
335 future.cancel(false);
339 hashedWheelTimer.newTimeout(timerTask, 10, TimeUnit.SECONDS);
343 void addDeviceContextToMap(final DeviceInfo deviceInfo, final DeviceContext deviceContext){
344 deviceContexts.put(deviceInfo, deviceContext);
348 public void setFlowRemovedNotificationOn(boolean isNotificationFlowRemovedOff) {
349 this.isFlowRemovedNotificationOn = isNotificationFlowRemovedOff;
353 public boolean isFlowRemovedNotificationOn() {
354 return this.isFlowRemovedNotificationOn;
359 public void setSkipTableFeatures(boolean skipTableFeaturesValue) {
360 skipTableFeatures = skipTableFeaturesValue;
364 public void setBarrierCountLimit(final int barrierCountLimit) {
365 this.barrierCountLimit = barrierCountLimit;
369 public void setBarrierInterval(final long barrierTimeoutLimit) {
370 this.barrierIntervalNanos = TimeUnit.MILLISECONDS.toNanos(barrierTimeoutLimit);
374 public CheckedFuture<Void, TransactionCommitFailedException> removeDeviceFromOperationalDS(final DeviceInfo deviceInfo) {
375 final WriteTransaction delWtx = dataBroker.newWriteOnlyTransaction();
376 delWtx.delete(LogicalDatastoreType.OPERATIONAL, deviceInfo.getNodeInstanceIdentifier());
377 final CheckedFuture<Void, TransactionCommitFailedException> delFuture = delWtx.submit();
379 Futures.addCallback(delFuture, new FutureCallback<Void>() {
381 public void onSuccess(final Void result) {
382 if (LOG.isDebugEnabled()) {
383 LOG.debug("Delete Node {} was successful", deviceInfo.getLOGValue());
388 public void onFailure(@Nonnull final Throwable t) {
389 LOG.warn("Delete node {} failed with exception {}", deviceInfo.getLOGValue(), t);
397 private void addCallbackToDeviceInitializeToSlave(final DeviceInfo deviceInfo, final DeviceContext deviceContext, final LifecycleService lifecycleService) {
398 Futures.addCallback(deviceContext.makeDeviceSlave(), new FutureCallback<RpcResult<SetRoleOutput>>() {
400 public void onSuccess(@Nullable RpcResult<SetRoleOutput> setRoleOutputRpcResult) {
401 if (LOG.isDebugEnabled()) {
402 LOG.debug("Role SLAVE was successfully propagated on device, node {}", deviceInfo.getLOGValue());
407 public void onFailure(Throwable throwable) {
408 LOG.warn("Was not able to set role SLAVE to device on node {} ",deviceInfo.getLOGValue());
409 lifecycleService.closeConnection();
414 private void updatePacketInRateLimiters() {
415 synchronized (deviceContexts) {
416 final int deviceContextsSize = deviceContexts.size();
417 if (deviceContextsSize > 0) {
418 long freshNotificationLimit = globalNotificationQuota / deviceContextsSize;
419 if (freshNotificationLimit < 100) {
420 freshNotificationLimit = 100;
422 if (LOG.isDebugEnabled()) {
423 LOG.debug("fresh notification limit = {}", freshNotificationLimit);
425 for (final DeviceContext deviceContext : deviceContexts.values()) {
426 deviceContext.updatePacketInRateLimit(freshNotificationLimit);
432 public void onDeviceRemoved(DeviceInfo deviceInfo) {
433 deviceContexts.remove(deviceInfo);
434 LOG.debug("Device context removed for node {}", deviceInfo.getLOGValue());
436 lifecycleServices.remove(deviceInfo);
437 LOG.debug("Lifecycle service removed for node {}", deviceInfo.getLOGValue());