2 * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.openflowplugin.impl.device;
10 import com.google.common.annotations.VisibleForTesting;
11 import com.google.common.base.Function;
12 import com.google.common.base.Preconditions;
13 import com.google.common.base.Verify;
14 import com.google.common.collect.Iterators;
15 import com.google.common.util.concurrent.AsyncFunction;
16 import com.google.common.util.concurrent.FutureCallback;
17 import com.google.common.util.concurrent.Futures;
18 import com.google.common.util.concurrent.ListenableFuture;
19 import io.netty.util.TimerTask;
20 import java.util.Collections;
21 import java.util.HashSet;
22 import java.util.Iterator;
24 import java.util.concurrent.ConcurrentHashMap;
25 import java.util.concurrent.ConcurrentMap;
26 import java.util.concurrent.ExecutionException;
27 import java.util.concurrent.ScheduledThreadPoolExecutor;
28 import java.util.concurrent.TimeUnit;
29 import javax.annotation.CheckForNull;
30 import javax.annotation.Nonnull;
31 import org.opendaylight.controller.md.sal.binding.api.DataBroker;
32 import org.opendaylight.controller.md.sal.binding.api.WriteTransaction;
33 import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
34 import org.opendaylight.openflowjava.protocol.api.connection.ConnectionAdapter;
35 import org.opendaylight.openflowjava.protocol.api.connection.OutboundQueueHandlerRegistration;
36 import org.opendaylight.openflowplugin.api.openflow.OFPContext;
37 import org.opendaylight.openflowplugin.api.openflow.connection.ConnectionContext;
38 import org.opendaylight.openflowplugin.api.openflow.connection.OutboundQueueProvider;
39 import org.opendaylight.openflowplugin.api.openflow.device.DeviceContext;
40 import org.opendaylight.openflowplugin.api.openflow.device.DeviceInfo;
41 import org.opendaylight.openflowplugin.api.openflow.device.DeviceManager;
42 import org.opendaylight.openflowplugin.api.openflow.device.DeviceState;
43 import org.opendaylight.openflowplugin.api.openflow.device.DeviceSynchronizeListener;
44 import org.opendaylight.openflowplugin.api.openflow.device.DeviceValidListener;
45 import org.opendaylight.openflowplugin.api.openflow.device.TranslatorLibrary;
46 import org.opendaylight.openflowplugin.api.openflow.device.handlers.DeviceInitializationPhaseHandler;
47 import org.opendaylight.openflowplugin.api.openflow.device.handlers.DeviceTerminationPhaseHandler;
48 import org.opendaylight.openflowplugin.api.openflow.lifecycle.LifecycleConductor;
49 import org.opendaylight.openflowplugin.api.openflow.statistics.StatisticsContext;
50 import org.opendaylight.openflowplugin.extension.api.ExtensionConverterProviderKeeper;
51 import org.opendaylight.openflowplugin.extension.api.core.extension.ExtensionConverterProvider;
52 import org.opendaylight.openflowplugin.impl.connection.OutboundQueueProviderImpl;
53 import org.opendaylight.openflowplugin.impl.device.listener.OpenflowProtocolListenerFullImpl;
54 import org.opendaylight.openflowplugin.impl.util.DeviceInitializationUtils;
55 import org.opendaylight.openflowplugin.openflow.md.core.sal.convertor.ConvertorExecutor;
56 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.Nodes;
57 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodesBuilder;
58 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node;
59 import org.opendaylight.yang.gen.v1.urn.opendaylight.role.service.rev150727.OfpRole;
60 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
61 import org.slf4j.Logger;
62 import org.slf4j.LoggerFactory;
67 public class DeviceManagerImpl implements DeviceManager, ExtensionConverterProviderKeeper {
69 private static final Logger LOG = LoggerFactory.getLogger(DeviceManagerImpl.class);
71 private final long globalNotificationQuota;
72 private final boolean switchFeaturesMandatory;
73 private boolean isNotificationFlowRemovedOff;
75 private static final int SPY_RATE = 10;
77 private final DataBroker dataBroker;
78 private final ConvertorExecutor convertorExecutor;
79 private TranslatorLibrary translatorLibrary;
80 private DeviceInitializationPhaseHandler deviceInitPhaseHandler;
81 private DeviceTerminationPhaseHandler deviceTerminPhaseHandler;
83 private final ConcurrentMap<DeviceInfo, DeviceContext> deviceContexts = new ConcurrentHashMap<>();
85 private final long barrierIntervalNanos;
86 private final int barrierCountLimit;
87 private ExtensionConverterProvider extensionConverterProvider;
88 private ScheduledThreadPoolExecutor spyPool;
89 private Set<DeviceSynchronizeListener> deviceSynchronizedListeners;
90 private Set<DeviceValidListener> deviceValidListeners;
92 private final LifecycleConductor conductor;
94 public DeviceManagerImpl(@Nonnull final DataBroker dataBroker,
95 final long globalNotificationQuota,
96 final boolean switchFeaturesMandatory,
97 final long barrierInterval,
98 final int barrierCountLimit,
99 final LifecycleConductor lifecycleConductor,
100 boolean isNotificationFlowRemovedOff,
101 final ConvertorExecutor convertorExecutor) {
102 this.switchFeaturesMandatory = switchFeaturesMandatory;
103 this.globalNotificationQuota = globalNotificationQuota;
104 this.isNotificationFlowRemovedOff = isNotificationFlowRemovedOff;
105 this.dataBroker = Preconditions.checkNotNull(dataBroker);
106 this.convertorExecutor = convertorExecutor;
107 /* merge empty nodes to oper DS to predict any problems with missing parent for Node */
108 final WriteTransaction tx = dataBroker.newWriteOnlyTransaction();
110 final NodesBuilder nodesBuilder = new NodesBuilder();
111 nodesBuilder.setNode(Collections.<Node>emptyList());
112 tx.merge(LogicalDatastoreType.OPERATIONAL, InstanceIdentifier.create(Nodes.class), nodesBuilder.build());
115 } catch (ExecutionException | InterruptedException e) {
116 LOG.error("Creation of node failed.", e);
117 throw new IllegalStateException(e);
120 this.barrierIntervalNanos = TimeUnit.MILLISECONDS.toNanos(barrierInterval);
121 this.barrierCountLimit = barrierCountLimit;
123 this.conductor = lifecycleConductor;
124 spyPool = new ScheduledThreadPoolExecutor(1);
125 this.deviceSynchronizedListeners = new HashSet<>();
126 this.deviceValidListeners = new HashSet<>();
131 public void setDeviceInitializationPhaseHandler(final DeviceInitializationPhaseHandler handler) {
132 this.deviceInitPhaseHandler = handler;
136 public void onDeviceContextLevelUp(@CheckForNull DeviceInfo deviceInfo) throws Exception {
137 // final phase - we have to add new Device to MD-SAL DataStore
138 LOG.debug("Final phase of DeviceContextLevelUp for Node: {} ", deviceInfo.getNodeId());
139 DeviceContext deviceContext = Preconditions.checkNotNull(deviceContexts.get(deviceInfo));
140 deviceContext.onPublished();
144 public boolean deviceConnected(@CheckForNull final ConnectionContext connectionContext) throws Exception {
145 Preconditions.checkArgument(connectionContext != null);
147 DeviceInfo deviceInfo = connectionContext.getDeviceInfo();
149 * This part prevent destroy another device context. Throwing here an exception result to propagate close connection
150 * in {@link org.opendaylight.openflowplugin.impl.connection.org.opendaylight.openflowplugin.impl.connection.HandshakeContextImpl}
151 * If context already exist we are in state closing process (connection flapping) and we should not propagate connection close
153 if (deviceContexts.containsKey(deviceInfo)) {
154 LOG.warn("Rejecting connection from node which is already connected and there exist deviceContext for it: {}", connectionContext.getNodeId());
158 LOG.info("ConnectionEvent: Device connected to controller, Device:{}, NodeId:{}",
159 connectionContext.getConnectionAdapter().getRemoteAddress(), deviceInfo.getNodeId());
161 // Add Disconnect handler
162 connectionContext.setDeviceDisconnectedHandler(DeviceManagerImpl.this);
163 // Cache this for clarity
164 final ConnectionAdapter connectionAdapter = connectionContext.getConnectionAdapter();
166 //FIXME: as soon as auxiliary connection are fully supported then this is needed only before device context published
167 connectionAdapter.setPacketInFiltering(true);
169 final OutboundQueueProvider outboundQueueProvider = new OutboundQueueProviderImpl(deviceInfo.getVersion());
171 connectionContext.setOutboundQueueProvider(outboundQueueProvider);
172 final OutboundQueueHandlerRegistration<OutboundQueueProvider> outboundQueueHandlerRegistration =
173 connectionAdapter.registerOutboundQueueHandler(outboundQueueProvider, barrierCountLimit, barrierIntervalNanos);
174 connectionContext.setOutboundQueueHandleRegistration(outboundQueueHandlerRegistration);
176 final DeviceState deviceState = new DeviceStateImpl(deviceInfo);
177 this.addDeviceSynchronizeListener(deviceState);
178 this.addDeviceValidListener(deviceState);
180 final DeviceContext deviceContext = new DeviceContextImpl(connectionContext,
184 outboundQueueProvider,
187 connectionContext.getDeviceInfo(),
190 Verify.verify(deviceContexts.putIfAbsent(deviceInfo, deviceContext) == null, "DeviceCtx still not closed.");
192 deviceContext.setSwitchFeaturesMandatory(switchFeaturesMandatory);
194 ((ExtensionConverterProviderKeeper) deviceContext).setExtensionConverterProvider(extensionConverterProvider);
195 deviceContext.setNotificationPublishService(conductor.getNotificationPublishService());
197 updatePacketInRateLimiters();
199 final OpenflowProtocolListenerFullImpl messageListener = new OpenflowProtocolListenerFullImpl(
200 connectionAdapter, deviceContext);
201 connectionAdapter.setMessageListener(messageListener);
202 notifyDeviceValidListeners(deviceInfo, true);
204 deviceInitPhaseHandler.onDeviceContextLevelUp(connectionContext.getDeviceInfo());
206 notifyDeviceSynchronizeListeners(deviceInfo, true);
211 private void updatePacketInRateLimiters() {
212 synchronized (deviceContexts) {
213 final int deviceContextsSize = deviceContexts.size();
214 if (deviceContextsSize > 0) {
215 long freshNotificationLimit = globalNotificationQuota / deviceContextsSize;
216 if (freshNotificationLimit < 100) {
217 freshNotificationLimit = 100;
219 LOG.debug("fresh notification limit = {}", freshNotificationLimit);
220 for (final DeviceContext deviceContext : deviceContexts.values()) {
221 deviceContext.updatePacketInRateLimit(freshNotificationLimit);
228 public TranslatorLibrary oook() {
229 return translatorLibrary;
233 public void setTranslatorLibrary(final TranslatorLibrary translatorLibrary) {
234 this.translatorLibrary = translatorLibrary;
238 public void close() {
239 for (final Iterator<DeviceContext> iterator = Iterators.consumingIterator(deviceContexts.values().iterator());
240 iterator.hasNext();) {
241 final DeviceContext deviceCtx = iterator.next();
242 notifyDeviceValidListeners(deviceCtx.getDeviceInfo(), false);
243 deviceCtx.shutdownConnection();
244 deviceCtx.shuttingDownDataStoreTransactions();
247 if (spyPool != null) {
248 spyPool.shutdownNow();
254 public void onDeviceContextLevelDown(final DeviceInfo deviceInfo) {
255 LOG.debug("onDeviceContextClosed for Node {}", deviceInfo.getNodeId());
256 deviceContexts.remove(deviceInfo);
257 updatePacketInRateLimiters();
261 public void initialize() {
262 spyPool.scheduleAtFixedRate(conductor.getMessageIntelligenceAgency(), SPY_RATE, SPY_RATE, TimeUnit.SECONDS);
266 public void setExtensionConverterProvider(final ExtensionConverterProvider extensionConverterProvider) {
267 this.extensionConverterProvider = extensionConverterProvider;
271 public ExtensionConverterProvider getExtensionConverterProvider() {
272 return extensionConverterProvider;
276 public void setDeviceTerminationPhaseHandler(final DeviceTerminationPhaseHandler handler) {
277 this.deviceTerminPhaseHandler = handler;
281 public void onDeviceDisconnected(final ConnectionContext connectionContext) {
282 LOG.trace("onDeviceDisconnected method call for Node: {}", connectionContext.getNodeId());
283 final DeviceInfo deviceInfo = connectionContext.getDeviceInfo();
284 final DeviceContext deviceCtx = this.deviceContexts.get(deviceInfo);
286 if (null == deviceCtx) {
287 LOG.info("DeviceContext for Node {} was not found. Connection is terminated without OFP context suite.", deviceInfo.getNodeId());
291 if (!connectionContext.equals(deviceCtx.getPrimaryConnectionContext())) {
292 /* Connection is not PrimaryConnection so try to remove from Auxiliary Connections */
293 deviceCtx.removeAuxiliaryConnectionContext(connectionContext);
295 notifyDeviceValidListeners(deviceInfo, false);
296 /* Device is disconnected and so we need to close TxManager */
297 final ListenableFuture<Void> future = deviceCtx.shuttingDownDataStoreTransactions();
298 Futures.addCallback(future, new FutureCallback<Void>() {
301 public void onSuccess(final Void result) {
302 LOG.debug("TxChainManager for device {} is closed successful.", deviceInfo.getNodeId());
303 deviceTerminPhaseHandler.onDeviceContextLevelDown(deviceInfo);
307 public void onFailure(final Throwable t) {
308 LOG.warn("TxChainManager for device {} failed by closing.", deviceInfo.getNodeId(), t);
309 deviceTerminPhaseHandler.onDeviceContextLevelDown(deviceInfo);
312 /* Add timer for Close TxManager because it could fain ind cluster without notification */
313 final TimerTask timerTask = timeout -> {
314 if (!future.isDone()) {
315 LOG.info("Shutting down TxChain for node {} not completed during 10 sec. Continue anyway.", deviceInfo.getNodeId());
316 future.cancel(false);
319 conductor.newTimeout(timerTask, 10, TimeUnit.SECONDS);
324 void addDeviceContextToMap(final DeviceInfo deviceInfo, final DeviceContext deviceContext){
325 deviceContexts.put(deviceInfo, deviceContext);
329 public <T extends OFPContext> T gainContext(final DeviceInfo deviceInfo) {
330 return (T) deviceContexts.get(deviceInfo);
334 public ListenableFuture<Void> onClusterRoleChange(final DeviceInfo deviceInfo, final OfpRole role) {
335 DeviceContext deviceContext = deviceContexts.get(deviceInfo);
336 LOG.trace("onClusterRoleChange {} for node:", role, deviceInfo.getNodeId());
337 if (OfpRole.BECOMEMASTER.equals(role)) {
338 return onDeviceTakeClusterLeadership(deviceInfo);
340 return ((DeviceContextImpl)deviceContext).getTransactionChainManager().deactivateTransactionManager();
344 public void addDeviceSynchronizeListener(final DeviceSynchronizeListener deviceSynchronizeListener) {
345 this.deviceSynchronizedListeners.add(deviceSynchronizeListener);
349 public void notifyDeviceSynchronizeListeners(final DeviceInfo deviceInfo, final boolean deviceSynchronized) {
350 for (DeviceSynchronizeListener listener : deviceSynchronizedListeners) {
351 listener.deviceIsSynchronized(deviceInfo, deviceSynchronized);
356 public void addDeviceValidListener(final DeviceValidListener deviceValidListener) {
357 this.deviceValidListeners.add(deviceValidListener);
361 public void notifyDeviceValidListeners(final DeviceInfo deviceInfo, final boolean deviceValid) {
362 for (DeviceValidListener listener : deviceValidListeners) {
363 listener.deviceIsValid(deviceInfo, deviceValid);
368 public void setIsNotificationFlowRemovedOff(boolean isNotificationFlowRemovedOff) {
369 this.isNotificationFlowRemovedOff = isNotificationFlowRemovedOff;
373 public boolean getIsNotificationFlowRemovedOff() {
374 return this.isNotificationFlowRemovedOff;
377 private ListenableFuture<Void> onDeviceTakeClusterLeadership(final DeviceInfo deviceInfo) {
378 LOG.trace("onDeviceTakeClusterLeadership for node: {}", deviceInfo.getNodeId());
380 StatisticsContext statisticsContext = conductor.getStatisticsContext(deviceInfo);
381 if (statisticsContext == null) {
382 final String errMsg = String.format("DeviceCtx %s is up but we are missing StatisticsContext", deviceInfo.getDatapathId());
384 return Futures.immediateFailedFuture(new IllegalStateException(errMsg));
386 DeviceContext deviceContext = deviceContexts.get(deviceInfo);
387 /* Prepare init info collecting */
388 notifyDeviceSynchronizeListeners(deviceInfo, false);
389 ((DeviceContextImpl)deviceContext).getTransactionChainManager().activateTransactionManager();
390 ((DeviceContextImpl)deviceContext).getTransactionChainManager().initialSubmitWriteTransaction();
391 /* Init Collecting NodeInfo */
392 final ListenableFuture<Void> initCollectingDeviceInfo = DeviceInitializationUtils.initializeNodeInformation(
393 deviceContext, switchFeaturesMandatory, convertorExecutor);
394 /* Init Collecting StatInfo */
395 final ListenableFuture<Boolean> statPollFuture = Futures.transform(initCollectingDeviceInfo,
396 new AsyncFunction<Void, Boolean>() {
399 public ListenableFuture<Boolean> apply(@Nonnull final Void input) throws Exception {
400 statisticsContext.statListForCollectingInitialization();
401 return statisticsContext.initialGatherDynamicData();
405 return Futures.transform(statPollFuture, getInitialDeviceInformation(deviceContext));
408 private Function<Boolean, Void> getInitialDeviceInformation(final DeviceContext deviceContext) {
410 if (ConnectionContext.CONNECTION_STATE.RIP.equals(
411 conductor.gainConnectionStateSafely(deviceContext.getDeviceInfo())
413 final String errMsg =
414 String.format("We lost connection for Device %s, context has to be closed.",
415 deviceContext.getDeviceInfo().getNodeId());
417 throw new IllegalStateException(errMsg);
420 if (input == null || !input) {
421 final String errMsg =
422 String.format("Get Initial Device %s information fails",
423 deviceContext.getDeviceInfo().getNodeId());
425 throw new IllegalStateException(errMsg);
427 LOG.debug("Get Initial Device {} information is successful",
428 deviceContext.getDeviceInfo().getNodeId());
429 notifyDeviceSynchronizeListeners(deviceContext.getDeviceInfo(), true);
430 deviceContext.getDeviceState().setStatisticsPollingEnabledProp(true);