2 * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.openflowplugin.impl.device;
10 import com.google.common.annotations.VisibleForTesting;
11 import com.google.common.base.Preconditions;
12 import com.google.common.collect.Iterators;
13 import com.google.common.util.concurrent.CheckedFuture;
14 import com.google.common.util.concurrent.FutureCallback;
15 import com.google.common.util.concurrent.Futures;
16 import com.google.common.util.concurrent.ListenableFuture;
17 import io.netty.util.HashedWheelTimer;
18 import io.netty.util.TimerTask;
19 import java.util.Collections;
20 import java.util.Iterator;
21 import java.util.Objects;
22 import java.util.Optional;
23 import java.util.concurrent.ConcurrentHashMap;
24 import java.util.concurrent.ConcurrentMap;
25 import java.util.concurrent.ExecutionException;
26 import java.util.concurrent.ScheduledThreadPoolExecutor;
27 import java.util.concurrent.TimeUnit;
28 import javax.annotation.CheckForNull;
29 import javax.annotation.Nonnull;
30 import javax.annotation.Nullable;
31 import org.opendaylight.controller.md.sal.binding.api.DataBroker;
32 import org.opendaylight.controller.md.sal.binding.api.NotificationPublishService;
33 import org.opendaylight.controller.md.sal.binding.api.WriteTransaction;
34 import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
35 import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException;
36 import org.opendaylight.mdsal.singleton.common.api.ClusterSingletonServiceProvider;
37 import org.opendaylight.openflowjava.protocol.api.connection.ConnectionAdapter;
38 import org.opendaylight.openflowjava.protocol.api.connection.OutboundQueueHandlerRegistration;
39 import org.opendaylight.openflowplugin.api.openflow.OFPContext;
40 import org.opendaylight.openflowplugin.api.openflow.connection.ConnectionContext;
41 import org.opendaylight.openflowplugin.api.openflow.connection.ConnectionStatus;
42 import org.opendaylight.openflowplugin.api.openflow.connection.OutboundQueueProvider;
43 import org.opendaylight.openflowplugin.api.openflow.device.DeviceContext;
44 import org.opendaylight.openflowplugin.api.openflow.device.DeviceInfo;
45 import org.opendaylight.openflowplugin.api.openflow.device.DeviceManager;
46 import org.opendaylight.openflowplugin.api.openflow.device.TranslatorLibrary;
47 import org.opendaylight.openflowplugin.api.openflow.device.handlers.DeviceInitializationPhaseHandler;
48 import org.opendaylight.openflowplugin.api.openflow.device.handlers.DeviceTerminationPhaseHandler;
49 import org.opendaylight.openflowplugin.api.openflow.lifecycle.LifecycleService;
50 import org.opendaylight.openflowplugin.api.openflow.statistics.ofpspecific.MessageSpy;
51 import org.opendaylight.openflowplugin.extension.api.ExtensionConverterProviderKeeper;
52 import org.opendaylight.openflowplugin.extension.api.core.extension.ExtensionConverterProvider;
53 import org.opendaylight.openflowplugin.impl.connection.OutboundQueueProviderImpl;
54 import org.opendaylight.openflowplugin.impl.device.initialization.DeviceInitializerProvider;
55 import org.opendaylight.openflowplugin.impl.device.listener.OpenflowProtocolListenerFullImpl;
56 import org.opendaylight.openflowplugin.impl.lifecycle.LifecycleServiceImpl;
57 import org.opendaylight.openflowplugin.impl.services.sal.SalRoleServiceImpl;
58 import org.opendaylight.openflowplugin.openflow.md.core.sal.convertor.ConvertorExecutor;
59 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.Nodes;
60 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodesBuilder;
61 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node;
62 import org.opendaylight.yang.gen.v1.urn.opendaylight.role.service.rev150727.SetRoleOutput;
63 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
64 import org.opendaylight.yangtools.yang.common.RpcResult;
65 import org.slf4j.Logger;
66 import org.slf4j.LoggerFactory;
71 public class DeviceManagerImpl implements DeviceManager, ExtensionConverterProviderKeeper {
73 private static final Logger LOG = LoggerFactory.getLogger(DeviceManagerImpl.class);
75 private final long globalNotificationQuota;
76 private final boolean switchFeaturesMandatory;
77 private boolean isFlowRemovedNotificationOn;
78 private boolean skipTableFeatures;
79 private static final int SPY_RATE = 10;
81 private final DataBroker dataBroker;
82 private final DeviceInitializerProvider deviceInitializerProvider;
83 private final ConvertorExecutor convertorExecutor;
84 private TranslatorLibrary translatorLibrary;
85 private DeviceInitializationPhaseHandler deviceInitPhaseHandler;
86 private DeviceTerminationPhaseHandler deviceTerminPhaseHandler;
88 private final ConcurrentMap<DeviceInfo, DeviceContext> deviceContexts = new ConcurrentHashMap<>();
89 private final ConcurrentMap<DeviceInfo, LifecycleService> lifecycleServices = new ConcurrentHashMap<>();
91 private long barrierIntervalNanos;
92 private int barrierCountLimit;
94 private ExtensionConverterProvider extensionConverterProvider;
95 private ScheduledThreadPoolExecutor spyPool;
96 private final ClusterSingletonServiceProvider singletonServiceProvider;
97 private final NotificationPublishService notificationPublishService;
98 private final MessageSpy messageSpy;
99 private final HashedWheelTimer hashedWheelTimer;
100 private final boolean useSingleLayerSerialization;
102 public DeviceManagerImpl(@Nonnull final DataBroker dataBroker,
103 final long globalNotificationQuota,
104 final boolean switchFeaturesMandatory,
105 final long barrierInterval,
106 final int barrierCountLimit,
107 final MessageSpy messageSpy,
108 final boolean isFlowRemovedNotificationOn,
109 final ClusterSingletonServiceProvider singletonServiceProvider,
110 final NotificationPublishService notificationPublishService,
111 final HashedWheelTimer hashedWheelTimer,
112 final ConvertorExecutor convertorExecutor,
113 final boolean skipTableFeatures,
114 final boolean useSingleLayerSerialization,
115 final DeviceInitializerProvider deviceInitializerProvider) {
117 this.dataBroker = dataBroker;
118 this.deviceInitializerProvider = deviceInitializerProvider;
120 /* merge empty nodes to oper DS to predict any problems with missing parent for Node */
121 final WriteTransaction tx = dataBroker.newWriteOnlyTransaction();
122 final NodesBuilder nodesBuilder = new NodesBuilder();
123 nodesBuilder.setNode(Collections.<Node>emptyList());
124 tx.merge(LogicalDatastoreType.OPERATIONAL, InstanceIdentifier.create(Nodes.class), nodesBuilder.build());
127 } catch (ExecutionException | InterruptedException e) {
128 LOG.error("Creation of node failed.", e);
129 throw new IllegalStateException(e);
132 this.switchFeaturesMandatory = switchFeaturesMandatory;
133 this.globalNotificationQuota = globalNotificationQuota;
134 this.isFlowRemovedNotificationOn = isFlowRemovedNotificationOn;
135 this.skipTableFeatures = skipTableFeatures;
136 this.convertorExecutor = convertorExecutor;
137 this.hashedWheelTimer = hashedWheelTimer;
138 this.barrierIntervalNanos = TimeUnit.MILLISECONDS.toNanos(barrierInterval);
139 this.barrierCountLimit = barrierCountLimit;
140 this.spyPool = new ScheduledThreadPoolExecutor(1);
141 this.singletonServiceProvider = singletonServiceProvider;
142 this.notificationPublishService = notificationPublishService;
143 this.messageSpy = messageSpy;
144 this.useSingleLayerSerialization = useSingleLayerSerialization;
149 public void setDeviceInitializationPhaseHandler(final DeviceInitializationPhaseHandler handler) {
150 this.deviceInitPhaseHandler = handler;
154 public void onDeviceContextLevelUp(@CheckForNull DeviceInfo deviceInfo, final LifecycleService lifecycleService) throws Exception {
155 // final phase - we have to add new Device to MD-SAL DataStore
156 LOG.debug("Final phase of DeviceContextLevelUp for Node: {} ", deviceInfo.getNodeId());
157 DeviceContext deviceContext = Preconditions.checkNotNull(deviceContexts.get(deviceInfo));
158 deviceContext.onPublished();
159 lifecycleService.registerDeviceRemovedHandler(this);
160 lifecycleService.registerService(this.singletonServiceProvider);
164 public ConnectionStatus deviceConnected(@CheckForNull final ConnectionContext connectionContext) throws Exception {
165 Preconditions.checkArgument(connectionContext != null);
166 final DeviceInfo deviceInfo = connectionContext.getDeviceInfo();
169 * This part prevent destroy another device context. Throwing here an exception result to propagate close connection
170 * in {@link org.opendaylight.openflowplugin.impl.connection.org.opendaylight.openflowplugin.impl.connection.HandshakeContextImpl}
171 * If context already exist we are in state closing process (connection flapping) and we should not propagate connection close
173 if (deviceContexts.containsKey(deviceInfo)) {
174 DeviceContext deviceContext = deviceContexts.get(deviceInfo);
175 LOG.warn("Node {} already connected disconnecting device. Rejecting connection", deviceInfo.getLOGValue());
176 if (!deviceContext.getState().equals(OFPContext.CONTEXT_STATE.TERMINATION)) {
177 LOG.warn("Node {} context state not in TERMINATION state.",
178 connectionContext.getDeviceInfo().getLOGValue());
179 return ConnectionStatus.ALREADY_CONNECTED;
181 return ConnectionStatus.CLOSING;
185 LOG.info("ConnectionEvent: Device connected to controller, Device:{}, NodeId:{}",
186 connectionContext.getConnectionAdapter().getRemoteAddress(), deviceInfo.getNodeId());
188 // Add Disconnect handler
189 connectionContext.setDeviceDisconnectedHandler(this);
191 // Cache this for clarity
192 final ConnectionAdapter connectionAdapter = connectionContext.getConnectionAdapter();
194 // FIXME: as soon as auxiliary connection are fully supported then this is needed only before device context published
195 connectionAdapter.setPacketInFiltering(true);
197 final OutboundQueueProvider outboundQueueProvider = new OutboundQueueProviderImpl(deviceInfo.getVersion());
199 connectionContext.setOutboundQueueProvider(outboundQueueProvider);
200 final OutboundQueueHandlerRegistration<OutboundQueueProvider> outboundQueueHandlerRegistration =
201 connectionAdapter.registerOutboundQueueHandler(outboundQueueProvider, barrierCountLimit, barrierIntervalNanos);
202 connectionContext.setOutboundQueueHandleRegistration(outboundQueueHandlerRegistration);
204 final LifecycleService lifecycleService = new LifecycleServiceImpl();
205 final DeviceContext deviceContext = new DeviceContextImpl(
215 useSingleLayerSerialization,
216 deviceInitializerProvider);
218 deviceContext.setSalRoleService(new SalRoleServiceImpl(deviceContext, deviceContext));
219 deviceContexts.put(deviceInfo, deviceContext);
221 lifecycleService.setDeviceContext(deviceContext);
222 deviceContext.putLifecycleServiceIntoTxChainManager(lifecycleService);
224 lifecycleServices.put(deviceInfo, lifecycleService);
226 addCallbackToDeviceInitializeToSlave(deviceInfo, deviceContext, lifecycleService);
228 deviceContext.setSwitchFeaturesMandatory(switchFeaturesMandatory);
230 ((ExtensionConverterProviderKeeper) deviceContext).setExtensionConverterProvider(extensionConverterProvider);
231 deviceContext.setNotificationPublishService(notificationPublishService);
233 updatePacketInRateLimiters();
235 final OpenflowProtocolListenerFullImpl messageListener = new OpenflowProtocolListenerFullImpl(
236 connectionAdapter, deviceContext);
238 connectionAdapter.setMessageListener(messageListener);
239 deviceInitPhaseHandler.onDeviceContextLevelUp(connectionContext.getDeviceInfo(), lifecycleService);
240 return ConnectionStatus.MAY_CONTINUE;
244 public TranslatorLibrary oook() {
245 return translatorLibrary;
249 public void setTranslatorLibrary(final TranslatorLibrary translatorLibrary) {
250 this.translatorLibrary = translatorLibrary;
254 public void close() {
255 for (final Iterator<DeviceContext> iterator = Iterators.consumingIterator(deviceContexts.values().iterator());
256 iterator.hasNext();) {
257 final DeviceContext deviceCtx = iterator.next();
258 deviceCtx.shutdownConnection();
259 deviceCtx.shuttingDownDataStoreTransactions();
262 Optional.ofNullable(spyPool).ifPresent(ScheduledThreadPoolExecutor::shutdownNow);
268 public void onDeviceContextLevelDown(final DeviceInfo deviceInfo) {
269 updatePacketInRateLimiters();
270 Optional.ofNullable(lifecycleServices.get(deviceInfo)).ifPresent(OFPContext::close);
274 public void initialize() {
275 spyPool.scheduleAtFixedRate(messageSpy, SPY_RATE, SPY_RATE, TimeUnit.SECONDS);
279 public void setExtensionConverterProvider(final ExtensionConverterProvider extensionConverterProvider) {
280 this.extensionConverterProvider = extensionConverterProvider;
284 public ExtensionConverterProvider getExtensionConverterProvider() {
285 return extensionConverterProvider;
289 public void setDeviceTerminationPhaseHandler(final DeviceTerminationPhaseHandler handler) {
290 this.deviceTerminPhaseHandler = handler;
294 public void onDeviceDisconnected(final ConnectionContext connectionContext) {
295 LOG.trace("onDeviceDisconnected method call for Node: {}", connectionContext.getNodeId());
296 final DeviceInfo deviceInfo = connectionContext.getDeviceInfo();
297 final DeviceContext deviceCtx = this.deviceContexts.get(deviceInfo);
299 if (Objects.isNull(deviceCtx)) {
300 LOG.info("DeviceContext for Node {} was not found. Connection is terminated without OFP context suite.", deviceInfo.getLOGValue());
304 if (deviceCtx.getState().equals(OFPContext.CONTEXT_STATE.TERMINATION)) {
305 LOG.info("Device context for node {} is already is termination state, waiting for close all context", deviceInfo.getLOGValue());
311 if (!connectionContext.equals(deviceCtx.getPrimaryConnectionContext())) {
312 LOG.debug("Node {} disconnected, but not primary connection.", connectionContext.getDeviceInfo().getLOGValue());
313 // Connection is not PrimaryConnection so try to remove from Auxiliary Connections
314 deviceCtx.removeAuxiliaryConnectionContext(connectionContext);
317 // TODO: Auxiliary connections supported ?
318 // Device is disconnected and so we need to close TxManager
319 final ListenableFuture<Void> future = deviceCtx.shuttingDownDataStoreTransactions();
320 Futures.addCallback(future, new FutureCallback<Void>() {
322 public void onSuccess(final Void result) {
323 LOG.debug("TxChainManager for device {} is closed successful.", deviceInfo.getLOGValue());
324 deviceTerminPhaseHandler.onDeviceContextLevelDown(deviceInfo);
328 public void onFailure(final Throwable t) {
329 LOG.warn("TxChainManager for device {} failed by closing.", deviceInfo.getLOGValue());
330 LOG.trace("TxChainManager failed by closing. ", t);
331 deviceTerminPhaseHandler.onDeviceContextLevelDown(deviceInfo);
335 // Add timer for Close TxManager because it could fail in cluster without notification
336 final TimerTask timerTask = timeout -> {
337 if (!future.isDone()) {
338 LOG.warn("Shutting down TxChain for node {} not completed during 10 sec. Continue anyway.", deviceInfo.getLOGValue());
339 future.cancel(false);
343 hashedWheelTimer.newTimeout(timerTask, 10, TimeUnit.SECONDS);
347 void addDeviceContextToMap(final DeviceInfo deviceInfo, final DeviceContext deviceContext){
348 deviceContexts.put(deviceInfo, deviceContext);
352 public void setFlowRemovedNotificationOn(boolean isNotificationFlowRemovedOff) {
353 this.isFlowRemovedNotificationOn = isNotificationFlowRemovedOff;
357 public boolean isFlowRemovedNotificationOn() {
358 return this.isFlowRemovedNotificationOn;
363 public void setSkipTableFeatures(boolean skipTableFeaturesValue) {
364 skipTableFeatures = skipTableFeaturesValue;
368 public void setBarrierCountLimit(final int barrierCountLimit) {
369 this.barrierCountLimit = barrierCountLimit;
373 public void setBarrierInterval(final long barrierTimeoutLimit) {
374 this.barrierIntervalNanos = TimeUnit.MILLISECONDS.toNanos(barrierTimeoutLimit);
378 public CheckedFuture<Void, TransactionCommitFailedException> removeDeviceFromOperationalDS(final DeviceInfo deviceInfo) {
379 final WriteTransaction delWtx = dataBroker.newWriteOnlyTransaction();
380 delWtx.delete(LogicalDatastoreType.OPERATIONAL, deviceInfo.getNodeInstanceIdentifier());
381 final CheckedFuture<Void, TransactionCommitFailedException> delFuture = delWtx.submit();
383 Futures.addCallback(delFuture, new FutureCallback<Void>() {
385 public void onSuccess(final Void result) {
386 if (LOG.isDebugEnabled()) {
387 LOG.debug("Delete Node {} was successful", deviceInfo.getLOGValue());
392 public void onFailure(@Nonnull final Throwable t) {
393 LOG.warn("Delete node {} failed with exception {}", deviceInfo.getLOGValue(), t);
401 private void addCallbackToDeviceInitializeToSlave(final DeviceInfo deviceInfo, final DeviceContext deviceContext, final LifecycleService lifecycleService) {
402 Futures.addCallback(deviceContext.makeDeviceSlave(), new FutureCallback<RpcResult<SetRoleOutput>>() {
404 public void onSuccess(@Nullable RpcResult<SetRoleOutput> setRoleOutputRpcResult) {
405 if (LOG.isDebugEnabled()) {
406 LOG.debug("Role SLAVE was successfully propagated on device, node {}", deviceInfo.getLOGValue());
411 public void onFailure(Throwable throwable) {
412 LOG.warn("Was not able to set role SLAVE to device on node {} ",deviceInfo.getLOGValue());
413 lifecycleService.closeConnection();
418 private void updatePacketInRateLimiters() {
419 synchronized (deviceContexts) {
420 final int deviceContextsSize = deviceContexts.size();
421 if (deviceContextsSize > 0) {
422 long freshNotificationLimit = globalNotificationQuota / deviceContextsSize;
423 if (freshNotificationLimit < 100) {
424 freshNotificationLimit = 100;
426 if (LOG.isDebugEnabled()) {
427 LOG.debug("fresh notification limit = {}", freshNotificationLimit);
429 for (final DeviceContext deviceContext : deviceContexts.values()) {
430 deviceContext.updatePacketInRateLimit(freshNotificationLimit);
436 public void onDeviceRemoved(DeviceInfo deviceInfo) {
437 deviceContexts.remove(deviceInfo);
438 LOG.debug("Device context removed for node {}", deviceInfo.getLOGValue());
440 lifecycleServices.remove(deviceInfo);
441 LOG.debug("Lifecycle service removed for node {}", deviceInfo.getLOGValue());