2 * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.openflowplugin.impl.device;
10 import com.google.common.annotations.VisibleForTesting;
11 import com.google.common.base.Preconditions;
12 import com.google.common.collect.Iterators;
13 import com.google.common.util.concurrent.CheckedFuture;
14 import com.google.common.util.concurrent.FutureCallback;
15 import com.google.common.util.concurrent.Futures;
16 import com.google.common.util.concurrent.ListenableFuture;
17 import io.netty.util.HashedWheelTimer;
18 import io.netty.util.TimerTask;
19 import java.util.Collections;
20 import java.util.Iterator;
21 import java.util.Objects;
22 import java.util.Optional;
23 import java.util.concurrent.ConcurrentHashMap;
24 import java.util.concurrent.ConcurrentMap;
25 import java.util.concurrent.ExecutionException;
26 import java.util.concurrent.ScheduledThreadPoolExecutor;
27 import java.util.concurrent.TimeUnit;
28 import javax.annotation.CheckForNull;
29 import javax.annotation.Nonnull;
30 import javax.annotation.Nullable;
31 import org.opendaylight.controller.md.sal.binding.api.DataBroker;
32 import org.opendaylight.controller.md.sal.binding.api.NotificationPublishService;
33 import org.opendaylight.controller.md.sal.binding.api.WriteTransaction;
34 import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
35 import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException;
36 import org.opendaylight.mdsal.singleton.common.api.ClusterSingletonServiceProvider;
37 import org.opendaylight.openflowjava.protocol.api.connection.ConnectionAdapter;
38 import org.opendaylight.openflowjava.protocol.api.connection.OutboundQueueHandlerRegistration;
39 import org.opendaylight.openflowplugin.api.openflow.OFPContext;
40 import org.opendaylight.openflowplugin.api.openflow.connection.ConnectionContext;
41 import org.opendaylight.openflowplugin.api.openflow.connection.ConnectionStatus;
42 import org.opendaylight.openflowplugin.api.openflow.connection.OutboundQueueProvider;
43 import org.opendaylight.openflowplugin.api.openflow.device.DeviceContext;
44 import org.opendaylight.openflowplugin.api.openflow.device.DeviceInfo;
45 import org.opendaylight.openflowplugin.api.openflow.device.DeviceManager;
46 import org.opendaylight.openflowplugin.api.openflow.device.TranslatorLibrary;
47 import org.opendaylight.openflowplugin.api.openflow.device.handlers.DeviceInitializationPhaseHandler;
48 import org.opendaylight.openflowplugin.api.openflow.device.handlers.DeviceTerminationPhaseHandler;
49 import org.opendaylight.openflowplugin.api.openflow.lifecycle.LifecycleService;
50 import org.opendaylight.openflowplugin.api.openflow.statistics.ofpspecific.MessageSpy;
51 import org.opendaylight.openflowplugin.extension.api.ExtensionConverterProviderKeeper;
52 import org.opendaylight.openflowplugin.extension.api.core.extension.ExtensionConverterProvider;
53 import org.opendaylight.openflowplugin.impl.connection.OutboundQueueProviderImpl;
54 import org.opendaylight.openflowplugin.impl.device.initialization.DeviceInitializerProvider;
55 import org.opendaylight.openflowplugin.impl.device.listener.OpenflowProtocolListenerFullImpl;
56 import org.opendaylight.openflowplugin.impl.lifecycle.LifecycleServiceImpl;
57 import org.opendaylight.openflowplugin.impl.services.sal.SalRoleServiceImpl;
58 import org.opendaylight.openflowplugin.openflow.md.core.sal.convertor.ConvertorExecutor;
59 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.Nodes;
60 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodesBuilder;
61 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node;
62 import org.opendaylight.yang.gen.v1.urn.opendaylight.role.service.rev150727.SetRoleOutput;
63 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
64 import org.opendaylight.yangtools.yang.common.RpcResult;
65 import org.slf4j.Logger;
66 import org.slf4j.LoggerFactory;
71 public class DeviceManagerImpl implements DeviceManager, ExtensionConverterProviderKeeper {
73 private static final Logger LOG = LoggerFactory.getLogger(DeviceManagerImpl.class);
75 private final long globalNotificationQuota;
76 private final boolean switchFeaturesMandatory;
77 private boolean isFlowRemovedNotificationOn;
78 private boolean skipTableFeatures;
79 private static final int SPY_RATE = 10;
81 private final DataBroker dataBroker;
82 private final DeviceInitializerProvider deviceInitializerProvider;
83 private final ConvertorExecutor convertorExecutor;
84 private TranslatorLibrary translatorLibrary;
85 private DeviceInitializationPhaseHandler deviceInitPhaseHandler;
86 private DeviceTerminationPhaseHandler deviceTerminPhaseHandler;
88 private final ConcurrentMap<DeviceInfo, DeviceContext> deviceContexts = new ConcurrentHashMap<>();
89 private final ConcurrentMap<DeviceInfo, LifecycleService> lifecycleServices = new ConcurrentHashMap<>();
91 private long barrierIntervalNanos;
92 private int barrierCountLimit;
94 private ExtensionConverterProvider extensionConverterProvider;
95 private ScheduledThreadPoolExecutor spyPool;
96 private final ClusterSingletonServiceProvider singletonServiceProvider;
97 private final NotificationPublishService notificationPublishService;
98 private final MessageSpy messageSpy;
99 private final HashedWheelTimer hashedWheelTimer;
100 private final boolean useSingleLayerSerialization;
102 public DeviceManagerImpl(@Nonnull final DataBroker dataBroker,
103 final long globalNotificationQuota,
104 final boolean switchFeaturesMandatory,
105 final long barrierInterval,
106 final int barrierCountLimit,
107 final MessageSpy messageSpy,
108 final boolean isFlowRemovedNotificationOn,
109 final ClusterSingletonServiceProvider singletonServiceProvider,
110 final NotificationPublishService notificationPublishService,
111 final HashedWheelTimer hashedWheelTimer,
112 final ConvertorExecutor convertorExecutor,
113 final boolean skipTableFeatures,
114 final boolean useSingleLayerSerialization,
115 final DeviceInitializerProvider deviceInitializerProvider) {
117 this.dataBroker = dataBroker;
118 this.deviceInitializerProvider = deviceInitializerProvider;
120 /* merge empty nodes to oper DS to predict any problems with missing parent for Node */
121 final WriteTransaction tx = dataBroker.newWriteOnlyTransaction();
122 final NodesBuilder nodesBuilder = new NodesBuilder();
123 nodesBuilder.setNode(Collections.<Node>emptyList());
124 tx.merge(LogicalDatastoreType.OPERATIONAL, InstanceIdentifier.create(Nodes.class), nodesBuilder.build());
127 } catch (ExecutionException | InterruptedException e) {
128 LOG.error("Creation of node failed.", e);
129 throw new IllegalStateException(e);
132 this.switchFeaturesMandatory = switchFeaturesMandatory;
133 this.globalNotificationQuota = globalNotificationQuota;
134 this.isFlowRemovedNotificationOn = isFlowRemovedNotificationOn;
135 this.skipTableFeatures = skipTableFeatures;
136 this.convertorExecutor = convertorExecutor;
137 this.hashedWheelTimer = hashedWheelTimer;
138 this.barrierIntervalNanos = TimeUnit.MILLISECONDS.toNanos(barrierInterval);
139 this.barrierCountLimit = barrierCountLimit;
140 this.spyPool = new ScheduledThreadPoolExecutor(1);
141 this.singletonServiceProvider = singletonServiceProvider;
142 this.notificationPublishService = notificationPublishService;
143 this.messageSpy = messageSpy;
144 this.useSingleLayerSerialization = useSingleLayerSerialization;
149 public void setDeviceInitializationPhaseHandler(final DeviceInitializationPhaseHandler handler) {
150 this.deviceInitPhaseHandler = handler;
154 public void onDeviceContextLevelUp(@CheckForNull DeviceInfo deviceInfo, final LifecycleService lifecycleService) throws Exception {
155 // final phase - we have to add new Device to MD-SAL DataStore
156 LOG.debug("Final phase of DeviceContextLevelUp for Node: {} ", deviceInfo.getNodeId());
157 DeviceContext deviceContext = Preconditions.checkNotNull(deviceContexts.get(deviceInfo));
158 deviceContext.onPublished();
159 lifecycleService.registerDeviceRemovedHandler(this);
160 lifecycleService.registerService(this.singletonServiceProvider);
164 public ConnectionStatus deviceConnected(@CheckForNull final ConnectionContext connectionContext) throws Exception {
165 Preconditions.checkArgument(connectionContext != null);
166 final DeviceInfo deviceInfo = connectionContext.getDeviceInfo();
169 * This part prevent destroy another device context. Throwing here an exception result to propagate close connection
170 * in {@link org.opendaylight.openflowplugin.impl.connection.org.opendaylight.openflowplugin.impl.connection.HandshakeContextImpl}
171 * If context already exist we are in state closing process (connection flapping) and we should not propagate connection close
173 if (deviceContexts.containsKey(deviceInfo)) {
174 DeviceContext deviceContext = deviceContexts.get(deviceInfo);
175 LOG.warn("Node {} already connected disconnecting device. Rejecting connection", deviceInfo.getLOGValue());
176 if (!deviceContext.getState().equals(OFPContext.CONTEXT_STATE.TERMINATION)) {
177 LOG.warn("Node {} context state not in TERMINATION state.",
178 connectionContext.getDeviceInfo().getLOGValue());
179 return ConnectionStatus.ALREADY_CONNECTED;
181 return ConnectionStatus.CLOSING;
185 LOG.info("ConnectionEvent: Device connected to controller, Device:{}, NodeId:{}",
186 connectionContext.getConnectionAdapter().getRemoteAddress(), deviceInfo.getNodeId());
188 // Add Disconnect handler
189 connectionContext.setDeviceDisconnectedHandler(this);
191 // Cache this for clarity
192 final ConnectionAdapter connectionAdapter = connectionContext.getConnectionAdapter();
194 // FIXME: as soon as auxiliary connection are fully supported then this is needed only before device context published
195 connectionAdapter.setPacketInFiltering(true);
197 final OutboundQueueProvider outboundQueueProvider = new OutboundQueueProviderImpl(deviceInfo.getVersion());
199 connectionContext.setOutboundQueueProvider(outboundQueueProvider);
200 final OutboundQueueHandlerRegistration<OutboundQueueProvider> outboundQueueHandlerRegistration =
201 connectionAdapter.registerOutboundQueueHandler(outboundQueueProvider, barrierCountLimit, barrierIntervalNanos);
202 connectionContext.setOutboundQueueHandleRegistration(outboundQueueHandlerRegistration);
204 final LifecycleService lifecycleService = new LifecycleServiceImpl();
205 final DeviceContext deviceContext = new DeviceContextImpl(
215 useSingleLayerSerialization,
216 deviceInitializerProvider);
218 deviceContext.setSalRoleService(new SalRoleServiceImpl(deviceContext, deviceContext));
219 deviceContexts.put(deviceInfo, deviceContext);
221 lifecycleService.setDeviceContext(deviceContext);
222 deviceContext.putLifecycleServiceIntoTxChainManager(lifecycleService);
224 lifecycleServices.put(deviceInfo, lifecycleService);
226 addCallbackToDeviceInitializeToSlave(deviceInfo, deviceContext, lifecycleService);
228 deviceContext.setSwitchFeaturesMandatory(switchFeaturesMandatory);
230 ((ExtensionConverterProviderKeeper) deviceContext).setExtensionConverterProvider(extensionConverterProvider);
231 deviceContext.setNotificationPublishService(notificationPublishService);
233 updatePacketInRateLimiters();
235 final OpenflowProtocolListenerFullImpl messageListener = new OpenflowProtocolListenerFullImpl(
236 connectionAdapter, deviceContext);
238 connectionAdapter.setMessageListener(messageListener);
239 deviceInitPhaseHandler.onDeviceContextLevelUp(connectionContext.getDeviceInfo(), lifecycleService);
240 return ConnectionStatus.MAY_CONTINUE;
244 public TranslatorLibrary oook() {
245 return translatorLibrary;
249 public void setTranslatorLibrary(final TranslatorLibrary translatorLibrary) {
250 this.translatorLibrary = translatorLibrary;
254 public void close() {
255 for (final Iterator<DeviceContext> iterator = Iterators.consumingIterator(deviceContexts.values().iterator());
256 iterator.hasNext();) {
257 final DeviceContext deviceCtx = iterator.next();
258 deviceCtx.shutdownConnection();
259 deviceCtx.shuttingDownDataStoreTransactions();
262 Optional.ofNullable(spyPool).ifPresent(ScheduledThreadPoolExecutor::shutdownNow);
268 public void onDeviceContextLevelDown(final DeviceInfo deviceInfo) {
269 updatePacketInRateLimiters();
270 Optional.ofNullable(lifecycleServices.get(deviceInfo)).ifPresent(OFPContext::close);
274 public void initialize() {
275 spyPool.scheduleAtFixedRate(messageSpy, SPY_RATE, SPY_RATE, TimeUnit.SECONDS);
279 public void setExtensionConverterProvider(final ExtensionConverterProvider extensionConverterProvider) {
280 this.extensionConverterProvider = extensionConverterProvider;
284 public ExtensionConverterProvider getExtensionConverterProvider() {
285 return extensionConverterProvider;
289 public void setDeviceTerminationPhaseHandler(final DeviceTerminationPhaseHandler handler) {
290 this.deviceTerminPhaseHandler = handler;
294 public void onDeviceDisconnected(final ConnectionContext connectionContext) {
295 LOG.trace("onDeviceDisconnected method call for Node: {}", connectionContext.getNodeId());
296 final DeviceInfo deviceInfo = connectionContext.getDeviceInfo();
297 final DeviceContext deviceCtx = this.deviceContexts.get(deviceInfo);
299 if (Objects.isNull(deviceCtx)) {
300 LOG.info("DeviceContext for Node {} was not found. Connection is terminated without OFP context suite.", deviceInfo.getLOGValue());
304 if (!connectionContext.equals(deviceCtx.getPrimaryConnectionContext())) {
305 LOG.debug("Node {} disconnected, but not primary connection.", connectionContext.getDeviceInfo().getLOGValue());
306 // Connection is not PrimaryConnection so try to remove from Auxiliary Connections
307 deviceCtx.removeAuxiliaryConnectionContext(connectionContext);
308 // If this is not primary connection, we should not continue disabling everything
312 if (deviceCtx.getState().equals(OFPContext.CONTEXT_STATE.TERMINATION)) {
313 LOG.info("Device context for node {} is already is termination state, waiting for close all context", deviceInfo.getLOGValue());
319 // TODO: Auxiliary connections supported ?
320 // Device is disconnected and so we need to close TxManager
321 final ListenableFuture<Void> future = deviceCtx.shuttingDownDataStoreTransactions();
322 Futures.addCallback(future, new FutureCallback<Void>() {
324 public void onSuccess(final Void result) {
325 LOG.debug("TxChainManager for device {} is closed successful.", deviceInfo.getLOGValue());
326 deviceTerminPhaseHandler.onDeviceContextLevelDown(deviceInfo);
330 public void onFailure(final Throwable t) {
331 LOG.warn("TxChainManager for device {} failed by closing.", deviceInfo.getLOGValue());
332 LOG.trace("TxChainManager failed by closing. ", t);
333 deviceTerminPhaseHandler.onDeviceContextLevelDown(deviceInfo);
337 // Add timer for Close TxManager because it could fail in cluster without notification
338 final TimerTask timerTask = timeout -> {
339 if (!future.isDone()) {
340 LOG.warn("Shutting down TxChain for node {} not completed during 10 sec. Continue anyway.", deviceInfo.getLOGValue());
341 future.cancel(false);
345 hashedWheelTimer.newTimeout(timerTask, 10, TimeUnit.SECONDS);
349 void addDeviceContextToMap(final DeviceInfo deviceInfo, final DeviceContext deviceContext){
350 deviceContexts.put(deviceInfo, deviceContext);
354 public void setFlowRemovedNotificationOn(boolean isNotificationFlowRemovedOff) {
355 this.isFlowRemovedNotificationOn = isNotificationFlowRemovedOff;
359 public boolean isFlowRemovedNotificationOn() {
360 return this.isFlowRemovedNotificationOn;
365 public void setSkipTableFeatures(boolean skipTableFeaturesValue) {
366 skipTableFeatures = skipTableFeaturesValue;
370 public void setBarrierCountLimit(final int barrierCountLimit) {
371 this.barrierCountLimit = barrierCountLimit;
375 public void setBarrierInterval(final long barrierTimeoutLimit) {
376 this.barrierIntervalNanos = TimeUnit.MILLISECONDS.toNanos(barrierTimeoutLimit);
380 public CheckedFuture<Void, TransactionCommitFailedException> removeDeviceFromOperationalDS(final DeviceInfo deviceInfo) {
381 final WriteTransaction delWtx = dataBroker.newWriteOnlyTransaction();
382 delWtx.delete(LogicalDatastoreType.OPERATIONAL, deviceInfo.getNodeInstanceIdentifier());
383 final CheckedFuture<Void, TransactionCommitFailedException> delFuture = delWtx.submit();
385 Futures.addCallback(delFuture, new FutureCallback<Void>() {
387 public void onSuccess(final Void result) {
388 if (LOG.isDebugEnabled()) {
389 LOG.debug("Delete Node {} was successful", deviceInfo.getLOGValue());
394 public void onFailure(@Nonnull final Throwable t) {
395 LOG.warn("Delete node {} failed with exception {}", deviceInfo.getLOGValue(), t);
403 private void addCallbackToDeviceInitializeToSlave(final DeviceInfo deviceInfo, final DeviceContext deviceContext, final LifecycleService lifecycleService) {
404 Futures.addCallback(deviceContext.makeDeviceSlave(), new FutureCallback<RpcResult<SetRoleOutput>>() {
406 public void onSuccess(@Nullable RpcResult<SetRoleOutput> setRoleOutputRpcResult) {
407 if (LOG.isDebugEnabled()) {
408 LOG.debug("Role SLAVE was successfully propagated on device, node {}", deviceInfo.getLOGValue());
410 deviceContext.sendNodeAddedNotification();
414 public void onFailure(Throwable throwable) {
415 LOG.warn("Was not able to set role SLAVE to device on node {} ",deviceInfo.getLOGValue());
416 lifecycleService.closeConnection();
421 private void updatePacketInRateLimiters() {
422 synchronized (deviceContexts) {
423 final int deviceContextsSize = deviceContexts.size();
424 if (deviceContextsSize > 0) {
425 long freshNotificationLimit = globalNotificationQuota / deviceContextsSize;
426 if (freshNotificationLimit < 100) {
427 freshNotificationLimit = 100;
429 if (LOG.isDebugEnabled()) {
430 LOG.debug("fresh notification limit = {}", freshNotificationLimit);
432 for (final DeviceContext deviceContext : deviceContexts.values()) {
433 deviceContext.updatePacketInRateLimit(freshNotificationLimit);
439 public void onDeviceRemoved(DeviceInfo deviceInfo) {
440 deviceContexts.remove(deviceInfo);
441 LOG.debug("Device context removed for node {}", deviceInfo.getLOGValue());
443 lifecycleServices.remove(deviceInfo);
444 LOG.debug("Lifecycle service removed for node {}", deviceInfo.getLOGValue());