2 * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.openflowplugin.impl.device;
10 import com.google.common.annotations.VisibleForTesting;
11 import com.google.common.base.Function;
12 import com.google.common.base.Preconditions;
13 import com.google.common.base.Verify;
14 import com.google.common.collect.Iterators;
15 import com.google.common.util.concurrent.AsyncFunction;
16 import com.google.common.util.concurrent.FutureCallback;
17 import com.google.common.util.concurrent.Futures;
18 import com.google.common.util.concurrent.ListenableFuture;
19 import io.netty.util.TimerTask;
20 import java.util.Collections;
21 import java.util.HashSet;
22 import java.util.Iterator;
24 import java.util.concurrent.ConcurrentHashMap;
25 import java.util.concurrent.ConcurrentMap;
26 import java.util.concurrent.ExecutionException;
27 import java.util.concurrent.ScheduledThreadPoolExecutor;
28 import java.util.concurrent.TimeUnit;
29 import javax.annotation.CheckForNull;
30 import javax.annotation.Nonnull;
31 import org.opendaylight.controller.md.sal.binding.api.DataBroker;
32 import org.opendaylight.controller.md.sal.binding.api.WriteTransaction;
33 import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
34 import org.opendaylight.mdsal.singleton.common.api.ClusterSingletonServiceProvider;
35 import org.opendaylight.openflowjava.protocol.api.connection.ConnectionAdapter;
36 import org.opendaylight.openflowjava.protocol.api.connection.OutboundQueueHandlerRegistration;
37 import org.opendaylight.openflowplugin.api.openflow.OFPContext;
38 import org.opendaylight.openflowplugin.api.openflow.connection.ConnectionContext;
39 import org.opendaylight.openflowplugin.api.openflow.connection.OutboundQueueProvider;
40 import org.opendaylight.openflowplugin.api.openflow.device.DeviceContext;
41 import org.opendaylight.openflowplugin.api.openflow.device.DeviceInfo;
42 import org.opendaylight.openflowplugin.api.openflow.device.DeviceManager;
43 import org.opendaylight.openflowplugin.api.openflow.device.DeviceState;
44 import org.opendaylight.openflowplugin.api.openflow.device.DeviceSynchronizeListener;
45 import org.opendaylight.openflowplugin.api.openflow.device.DeviceValidListener;
46 import org.opendaylight.openflowplugin.api.openflow.device.TranslatorLibrary;
47 import org.opendaylight.openflowplugin.api.openflow.device.handlers.DeviceInitializationPhaseHandler;
48 import org.opendaylight.openflowplugin.api.openflow.device.handlers.DeviceTerminationPhaseHandler;
49 import org.opendaylight.openflowplugin.api.openflow.lifecycle.LifecycleConductor;
50 import org.opendaylight.openflowplugin.api.openflow.lifecycle.LifecycleService;
51 import org.opendaylight.openflowplugin.api.openflow.statistics.StatisticsContext;
52 import org.opendaylight.openflowplugin.extension.api.ExtensionConverterProviderKeeper;
53 import org.opendaylight.openflowplugin.extension.api.core.extension.ExtensionConverterProvider;
54 import org.opendaylight.openflowplugin.impl.connection.OutboundQueueProviderImpl;
55 import org.opendaylight.openflowplugin.impl.device.listener.OpenflowProtocolListenerFullImpl;
56 import org.opendaylight.openflowplugin.impl.lifecycle.LifecycleServiceImpl;
57 import org.opendaylight.openflowplugin.impl.util.DeviceInitializationUtils;
58 import org.opendaylight.openflowplugin.openflow.md.core.sal.convertor.ConvertorExecutor;
59 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.Nodes;
60 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodesBuilder;
61 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node;
62 import org.opendaylight.yang.gen.v1.urn.opendaylight.role.service.rev150727.OfpRole;
63 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
64 import org.slf4j.Logger;
65 import org.slf4j.LoggerFactory;
70 public class DeviceManagerImpl implements DeviceManager, ExtensionConverterProviderKeeper {
72 private static final Logger LOG = LoggerFactory.getLogger(DeviceManagerImpl.class);
74 private final long globalNotificationQuota;
75 private final boolean switchFeaturesMandatory;
76 private boolean isNotificationFlowRemovedOff;
78 private static final int SPY_RATE = 10;
80 private final DataBroker dataBroker;
81 private final ConvertorExecutor convertorExecutor;
82 private TranslatorLibrary translatorLibrary;
83 private DeviceInitializationPhaseHandler deviceInitPhaseHandler;
84 private DeviceTerminationPhaseHandler deviceTerminPhaseHandler;
86 private final ConcurrentMap<DeviceInfo, DeviceContext> deviceContexts = new ConcurrentHashMap<>();
87 private final ConcurrentMap<DeviceInfo, LifecycleService> lifecycleServices = new ConcurrentHashMap<>();
89 private final long barrierIntervalNanos;
90 private final int barrierCountLimit;
91 private ExtensionConverterProvider extensionConverterProvider;
92 private ScheduledThreadPoolExecutor spyPool;
93 private Set<DeviceSynchronizeListener> deviceSynchronizedListeners;
94 private Set<DeviceValidListener> deviceValidListeners;
95 private final ClusterSingletonServiceProvider singletonServiceProvider;
97 private final LifecycleConductor conductor;
99 public DeviceManagerImpl(@Nonnull final DataBroker dataBroker,
100 final long globalNotificationQuota,
101 final boolean switchFeaturesMandatory,
102 final long barrierInterval,
103 final int barrierCountLimit,
104 final LifecycleConductor lifecycleConductor,
105 boolean isNotificationFlowRemovedOff,
106 final ConvertorExecutor convertorExecutor,
107 final ClusterSingletonServiceProvider singletonServiceProvider) {
108 this.switchFeaturesMandatory = switchFeaturesMandatory;
109 this.globalNotificationQuota = globalNotificationQuota;
110 this.isNotificationFlowRemovedOff = isNotificationFlowRemovedOff;
111 this.dataBroker = Preconditions.checkNotNull(dataBroker);
112 this.convertorExecutor = convertorExecutor;
113 /* merge empty nodes to oper DS to predict any problems with missing parent for Node */
114 final WriteTransaction tx = dataBroker.newWriteOnlyTransaction();
116 final NodesBuilder nodesBuilder = new NodesBuilder();
117 nodesBuilder.setNode(Collections.<Node>emptyList());
118 tx.merge(LogicalDatastoreType.OPERATIONAL, InstanceIdentifier.create(Nodes.class), nodesBuilder.build());
121 } catch (ExecutionException | InterruptedException e) {
122 LOG.error("Creation of node failed.", e);
123 throw new IllegalStateException(e);
126 this.barrierIntervalNanos = TimeUnit.MILLISECONDS.toNanos(barrierInterval);
127 this.barrierCountLimit = barrierCountLimit;
129 this.conductor = lifecycleConductor;
130 spyPool = new ScheduledThreadPoolExecutor(1);
131 this.deviceSynchronizedListeners = new HashSet<>();
132 this.deviceValidListeners = new HashSet<>();
133 this.singletonServiceProvider = singletonServiceProvider;
138 public void setDeviceInitializationPhaseHandler(final DeviceInitializationPhaseHandler handler) {
139 this.deviceInitPhaseHandler = handler;
143 public void onDeviceContextLevelUp(@CheckForNull DeviceInfo deviceInfo, final LifecycleService lifecycleService) throws Exception {
144 // final phase - we have to add new Device to MD-SAL DataStore
145 LOG.debug("Final phase of DeviceContextLevelUp for Node: {} ", deviceInfo.getNodeId());
146 DeviceContext deviceContext = Preconditions.checkNotNull(deviceContexts.get(deviceInfo));
147 deviceContext.onPublished();
148 lifecycleService.registerService(this.singletonServiceProvider);
152 public boolean deviceConnected(@CheckForNull final ConnectionContext connectionContext) throws Exception {
153 Preconditions.checkArgument(connectionContext != null);
155 DeviceInfo deviceInfo = connectionContext.getDeviceInfo();
157 * This part prevent destroy another device context. Throwing here an exception result to propagate close connection
158 * in {@link org.opendaylight.openflowplugin.impl.connection.org.opendaylight.openflowplugin.impl.connection.HandshakeContextImpl}
159 * If context already exist we are in state closing process (connection flapping) and we should not propagate connection close
161 if (deviceContexts.containsKey(deviceInfo)) {
162 LOG.warn("Rejecting connection from node which is already connected and there exist deviceContext for it: {}", connectionContext.getNodeId());
166 LOG.info("ConnectionEvent: Device connected to controller, Device:{}, NodeId:{}",
167 connectionContext.getConnectionAdapter().getRemoteAddress(), deviceInfo.getNodeId());
169 // Add Disconnect handler
170 connectionContext.setDeviceDisconnectedHandler(DeviceManagerImpl.this);
171 // Cache this for clarity
172 final ConnectionAdapter connectionAdapter = connectionContext.getConnectionAdapter();
174 //FIXME: as soon as auxiliary connection are fully supported then this is needed only before device context published
175 connectionAdapter.setPacketInFiltering(true);
177 final OutboundQueueProvider outboundQueueProvider = new OutboundQueueProviderImpl(deviceInfo.getVersion());
179 connectionContext.setOutboundQueueProvider(outboundQueueProvider);
180 final OutboundQueueHandlerRegistration<OutboundQueueProvider> outboundQueueHandlerRegistration =
181 connectionAdapter.registerOutboundQueueHandler(outboundQueueProvider, barrierCountLimit, barrierIntervalNanos);
182 connectionContext.setOutboundQueueHandleRegistration(outboundQueueHandlerRegistration);
184 final DeviceState deviceState = new DeviceStateImpl(deviceInfo);
185 this.addDeviceSynchronizeListener(deviceState);
186 this.addDeviceValidListener(deviceState);
188 final DeviceContext deviceContext = new DeviceContextImpl(connectionContext,
192 outboundQueueProvider,
195 connectionContext.getDeviceInfo(),
198 final LifecycleService lifecycleService = new LifecycleServiceImpl();
199 lifecycleService.setDeviceContext(deviceContext);
201 Verify.verify(deviceContexts.putIfAbsent(deviceInfo, deviceContext) == null, "DeviceCtx still not closed.");
202 lifecycleServices.putIfAbsent(deviceInfo, lifecycleService);
204 deviceContext.setSwitchFeaturesMandatory(switchFeaturesMandatory);
206 ((ExtensionConverterProviderKeeper) deviceContext).setExtensionConverterProvider(extensionConverterProvider);
207 deviceContext.setNotificationPublishService(conductor.getNotificationPublishService());
209 updatePacketInRateLimiters();
211 final OpenflowProtocolListenerFullImpl messageListener = new OpenflowProtocolListenerFullImpl(
212 connectionAdapter, deviceContext);
213 connectionAdapter.setMessageListener(messageListener);
214 notifyDeviceValidListeners(deviceInfo, true);
216 deviceInitPhaseHandler.onDeviceContextLevelUp(connectionContext.getDeviceInfo(), lifecycleServices.get(deviceInfo));
218 notifyDeviceSynchronizeListeners(deviceInfo, true);
223 private void updatePacketInRateLimiters() {
224 synchronized (deviceContexts) {
225 final int deviceContextsSize = deviceContexts.size();
226 if (deviceContextsSize > 0) {
227 long freshNotificationLimit = globalNotificationQuota / deviceContextsSize;
228 if (freshNotificationLimit < 100) {
229 freshNotificationLimit = 100;
231 LOG.debug("fresh notification limit = {}", freshNotificationLimit);
232 for (final DeviceContext deviceContext : deviceContexts.values()) {
233 deviceContext.updatePacketInRateLimit(freshNotificationLimit);
240 public TranslatorLibrary oook() {
241 return translatorLibrary;
245 public void setTranslatorLibrary(final TranslatorLibrary translatorLibrary) {
246 this.translatorLibrary = translatorLibrary;
250 public void close() {
251 for (final Iterator<DeviceContext> iterator = Iterators.consumingIterator(deviceContexts.values().iterator());
252 iterator.hasNext();) {
253 final DeviceContext deviceCtx = iterator.next();
254 notifyDeviceValidListeners(deviceCtx.getDeviceInfo(), false);
255 deviceCtx.shutdownConnection();
256 deviceCtx.shuttingDownDataStoreTransactions();
259 if (spyPool != null) {
260 spyPool.shutdownNow();
266 public void onDeviceContextLevelDown(final DeviceInfo deviceInfo) {
267 LOG.debug("onDeviceContextClosed for Node {}", deviceInfo.getNodeId());
268 deviceContexts.remove(deviceInfo);
269 updatePacketInRateLimiters();
270 LifecycleService lifecycleService = lifecycleServices.remove(deviceInfo);
272 lifecycleService.close();
273 } catch (Exception e) {
274 LOG.warn("Closing service for node {} was unsuccessful ", deviceInfo.getNodeId().getValue(), e);
279 public void initialize() {
280 spyPool.scheduleAtFixedRate(conductor.getMessageIntelligenceAgency(), SPY_RATE, SPY_RATE, TimeUnit.SECONDS);
284 public void setExtensionConverterProvider(final ExtensionConverterProvider extensionConverterProvider) {
285 this.extensionConverterProvider = extensionConverterProvider;
289 public ExtensionConverterProvider getExtensionConverterProvider() {
290 return extensionConverterProvider;
294 public void setDeviceTerminationPhaseHandler(final DeviceTerminationPhaseHandler handler) {
295 this.deviceTerminPhaseHandler = handler;
299 public void onDeviceDisconnected(final ConnectionContext connectionContext) {
300 LOG.trace("onDeviceDisconnected method call for Node: {}", connectionContext.getNodeId());
301 final DeviceInfo deviceInfo = connectionContext.getDeviceInfo();
302 final DeviceContext deviceCtx = this.deviceContexts.get(deviceInfo);
304 if (null == deviceCtx) {
305 LOG.info("DeviceContext for Node {} was not found. Connection is terminated without OFP context suite.", deviceInfo.getNodeId());
309 if (!connectionContext.equals(deviceCtx.getPrimaryConnectionContext())) {
310 /* Connection is not PrimaryConnection so try to remove from Auxiliary Connections */
311 deviceCtx.removeAuxiliaryConnectionContext(connectionContext);
313 notifyDeviceValidListeners(deviceInfo, false);
314 /* Device is disconnected and so we need to close TxManager */
315 final ListenableFuture<Void> future = deviceCtx.shuttingDownDataStoreTransactions();
316 Futures.addCallback(future, new FutureCallback<Void>() {
319 public void onSuccess(final Void result) {
320 LOG.debug("TxChainManager for device {} is closed successful.", deviceInfo.getNodeId());
321 deviceTerminPhaseHandler.onDeviceContextLevelDown(deviceInfo);
325 public void onFailure(final Throwable t) {
326 LOG.warn("TxChainManager for device {} failed by closing.", deviceInfo.getNodeId(), t);
327 deviceTerminPhaseHandler.onDeviceContextLevelDown(deviceInfo);
330 /* Add timer for Close TxManager because it could fain ind cluster without notification */
331 final TimerTask timerTask = timeout -> {
332 if (!future.isDone()) {
333 LOG.info("Shutting down TxChain for node {} not completed during 10 sec. Continue anyway.", deviceInfo.getNodeId());
334 future.cancel(false);
337 conductor.newTimeout(timerTask, 10, TimeUnit.SECONDS);
342 void addDeviceContextToMap(final DeviceInfo deviceInfo, final DeviceContext deviceContext){
343 deviceContexts.put(deviceInfo, deviceContext);
347 public <T extends OFPContext> T gainContext(final DeviceInfo deviceInfo) {
348 return (T) deviceContexts.get(deviceInfo);
352 public ListenableFuture<Void> onClusterRoleChange(final DeviceInfo deviceInfo, final OfpRole role) {
353 DeviceContext deviceContext = deviceContexts.get(deviceInfo);
354 LOG.trace("onClusterRoleChange {} for node:", role, deviceInfo.getNodeId());
355 if (OfpRole.BECOMEMASTER.equals(role)) {
356 return onDeviceTakeClusterLeadership(deviceInfo);
358 return ((DeviceContextImpl)deviceContext).getTransactionChainManager().deactivateTransactionManager();
362 public void addDeviceSynchronizeListener(final DeviceSynchronizeListener deviceSynchronizeListener) {
363 this.deviceSynchronizedListeners.add(deviceSynchronizeListener);
367 public void notifyDeviceSynchronizeListeners(final DeviceInfo deviceInfo, final boolean deviceSynchronized) {
368 for (DeviceSynchronizeListener listener : deviceSynchronizedListeners) {
369 listener.deviceIsSynchronized(deviceInfo, deviceSynchronized);
374 public void addDeviceValidListener(final DeviceValidListener deviceValidListener) {
375 this.deviceValidListeners.add(deviceValidListener);
379 public void notifyDeviceValidListeners(final DeviceInfo deviceInfo, final boolean deviceValid) {
380 for (DeviceValidListener listener : deviceValidListeners) {
381 listener.deviceIsValid(deviceInfo, deviceValid);
386 public void setIsNotificationFlowRemovedOff(boolean isNotificationFlowRemovedOff) {
387 this.isNotificationFlowRemovedOff = isNotificationFlowRemovedOff;
391 public boolean getIsNotificationFlowRemovedOff() {
392 return this.isNotificationFlowRemovedOff;
395 private ListenableFuture<Void> onDeviceTakeClusterLeadership(final DeviceInfo deviceInfo) {
396 LOG.trace("onDeviceTakeClusterLeadership for node: {}", deviceInfo.getNodeId());
398 StatisticsContext statisticsContext = conductor.getStatisticsContext(deviceInfo);
399 if (statisticsContext == null) {
400 final String errMsg = String.format("DeviceCtx %s is up but we are missing StatisticsContext", deviceInfo.getDatapathId());
402 return Futures.immediateFailedFuture(new IllegalStateException(errMsg));
404 DeviceContext deviceContext = deviceContexts.get(deviceInfo);
405 /* Prepare init info collecting */
406 notifyDeviceSynchronizeListeners(deviceInfo, false);
407 ((DeviceContextImpl)deviceContext).getTransactionChainManager().activateTransactionManager();
408 ((DeviceContextImpl)deviceContext).getTransactionChainManager().initialSubmitWriteTransaction();
409 /* Init Collecting NodeInfo */
410 final ListenableFuture<Void> initCollectingDeviceInfo = DeviceInitializationUtils.initializeNodeInformation(
411 deviceContext, switchFeaturesMandatory, convertorExecutor);
412 /* Init Collecting StatInfo */
413 final ListenableFuture<Boolean> statPollFuture = Futures.transform(initCollectingDeviceInfo,
414 new AsyncFunction<Void, Boolean>() {
417 public ListenableFuture<Boolean> apply(@Nonnull final Void input) throws Exception {
418 statisticsContext.statListForCollectingInitialization();
419 return statisticsContext.initialGatherDynamicData();
423 return Futures.transform(statPollFuture, getInitialDeviceInformation(deviceContext));
426 private Function<Boolean, Void> getInitialDeviceInformation(final DeviceContext deviceContext) {
428 if (ConnectionContext.CONNECTION_STATE.RIP.equals(
429 conductor.gainConnectionStateSafely(deviceContext.getDeviceInfo())
431 final String errMsg =
432 String.format("We lost connection for Device %s, context has to be closed.",
433 deviceContext.getDeviceInfo().getNodeId());
435 throw new IllegalStateException(errMsg);
438 if (input == null || !input) {
439 final String errMsg =
440 String.format("Get Initial Device %s information fails",
441 deviceContext.getDeviceInfo().getNodeId());
443 throw new IllegalStateException(errMsg);
445 LOG.debug("Get Initial Device {} information is successful",
446 deviceContext.getDeviceInfo().getNodeId());
447 notifyDeviceSynchronizeListeners(deviceContext.getDeviceInfo(), true);
448 deviceContext.getDeviceState().setStatisticsPollingEnabledProp(true);