X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;f=openflowplugin-impl%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fopenflowplugin%2Fimpl%2Fdevice%2FDeviceManagerImpl.java;h=0aeda1a8ce31f4bbc7093c9c6402e8bec4401fa9;hb=cfe3a97837951ebbedb337dc988027f10c49f714;hp=1c4428aca6dc51dfced1faaf71d703cad7323d12;hpb=4ad505cc51f9bda8290c6191e4e16a5d36ce3b27;p=openflowplugin.git diff --git a/openflowplugin-impl/src/main/java/org/opendaylight/openflowplugin/impl/device/DeviceManagerImpl.java b/openflowplugin-impl/src/main/java/org/opendaylight/openflowplugin/impl/device/DeviceManagerImpl.java index 1c4428aca6..0aeda1a8ce 100644 --- a/openflowplugin-impl/src/main/java/org/opendaylight/openflowplugin/impl/device/DeviceManagerImpl.java +++ b/openflowplugin-impl/src/main/java/org/opendaylight/openflowplugin/impl/device/DeviceManagerImpl.java @@ -1,5 +1,5 @@ -/** - * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved. +/* + * Copyright (c) 2015, 2017 Cisco Systems, Inc. and others. All rights reserved. * * This program and the accompanying materials are made available under the * terms of the Eclipse Public License v1.0 which accompanies this distribution, @@ -7,154 +7,84 @@ */ package org.opendaylight.openflowplugin.impl.device; -import com.google.common.annotations.VisibleForTesting; -import com.google.common.base.Preconditions; -import com.google.common.collect.Iterators; -import com.google.common.util.concurrent.CheckedFuture; -import com.google.common.util.concurrent.FutureCallback; -import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; import io.netty.util.HashedWheelTimer; -import io.netty.util.TimerTask; -import java.util.Collections; -import java.util.Iterator; -import java.util.Objects; import java.util.Optional; +import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; -import java.util.concurrent.ExecutionException; import java.util.concurrent.ScheduledThreadPoolExecutor; import java.util.concurrent.TimeUnit; -import javax.annotation.CheckForNull; import javax.annotation.Nonnull; -import org.opendaylight.controller.md.sal.binding.api.DataBroker; -import org.opendaylight.controller.md.sal.binding.api.NotificationPublishService; -import org.opendaylight.controller.md.sal.binding.api.WriteTransaction; -import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType; -import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException; -import org.opendaylight.mdsal.singleton.common.api.ClusterSingletonServiceProvider; +import org.opendaylight.mdsal.binding.api.DataBroker; +import org.opendaylight.mdsal.binding.api.NotificationPublishService; +import org.opendaylight.mdsal.binding.api.WriteTransaction; +import org.opendaylight.mdsal.common.api.LogicalDatastoreType; import org.opendaylight.openflowjava.protocol.api.connection.OutboundQueueHandlerRegistration; import org.opendaylight.openflowplugin.api.openflow.OFPContext; import org.opendaylight.openflowplugin.api.openflow.connection.ConnectionContext; -import org.opendaylight.openflowplugin.api.openflow.connection.ConnectionStatus; import org.opendaylight.openflowplugin.api.openflow.connection.OutboundQueueProvider; import org.opendaylight.openflowplugin.api.openflow.device.DeviceContext; import org.opendaylight.openflowplugin.api.openflow.device.DeviceInfo; import org.opendaylight.openflowplugin.api.openflow.device.DeviceManager; import org.opendaylight.openflowplugin.api.openflow.device.TranslatorLibrary; -import org.opendaylight.openflowplugin.api.openflow.device.handlers.DeviceInitializationPhaseHandler; -import org.opendaylight.openflowplugin.api.openflow.device.handlers.DeviceTerminationPhaseHandler; -import org.opendaylight.openflowplugin.api.openflow.lifecycle.LifecycleService; +import org.opendaylight.openflowplugin.api.openflow.lifecycle.ContextChainHolder; import org.opendaylight.openflowplugin.api.openflow.statistics.ofpspecific.MessageSpy; import org.opendaylight.openflowplugin.extension.api.ExtensionConverterProviderKeeper; import org.opendaylight.openflowplugin.extension.api.core.extension.ExtensionConverterProvider; import org.opendaylight.openflowplugin.impl.connection.OutboundQueueProviderImpl; import org.opendaylight.openflowplugin.impl.device.initialization.DeviceInitializerProvider; import org.opendaylight.openflowplugin.impl.device.listener.OpenflowProtocolListenerFullImpl; -import org.opendaylight.openflowplugin.impl.services.sal.SalRoleServiceImpl; +import org.opendaylight.openflowplugin.impl.util.DeviceInitializationUtil; import org.opendaylight.openflowplugin.openflow.md.core.sal.convertor.ConvertorExecutor; -import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.Nodes; -import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodesBuilder; +import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeId; +import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeRef; +import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeRemovedBuilder; +import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeUpdatedBuilder; import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node; -import org.opendaylight.yangtools.yang.binding.InstanceIdentifier; +import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.NodeKey; +import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.openflow.provider.config.rev160510.OpenflowProviderConfig; +import org.opendaylight.yangtools.yang.binding.KeyedInstanceIdentifier; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -/** - * - */ public class DeviceManagerImpl implements DeviceManager, ExtensionConverterProviderKeeper { private static final Logger LOG = LoggerFactory.getLogger(DeviceManagerImpl.class); - - private final long globalNotificationQuota; - private final boolean switchFeaturesMandatory; - private boolean isFlowRemovedNotificationOn; - private boolean skipTableFeatures; private static final int SPY_RATE = 10; + private final OpenflowProviderConfig config; private final DataBroker dataBroker; private final DeviceInitializerProvider deviceInitializerProvider; private final ConvertorExecutor convertorExecutor; - private TranslatorLibrary translatorLibrary; - private DeviceInitializationPhaseHandler deviceInitPhaseHandler; - private DeviceTerminationPhaseHandler deviceTerminPhaseHandler; - private final ConcurrentMap deviceContexts = new ConcurrentHashMap<>(); - private final ConcurrentMap lifecycleServices = new ConcurrentHashMap<>(); - - private long barrierIntervalNanos; - private int barrierCountLimit; - - private ExtensionConverterProvider extensionConverterProvider; - private ScheduledThreadPoolExecutor spyPool; + private final Set> notificationCreateNodeSend = + ConcurrentHashMap.newKeySet(); private final NotificationPublishService notificationPublishService; private final MessageSpy messageSpy; private final HashedWheelTimer hashedWheelTimer; - private boolean useSingleLayerSerialization; - - public DeviceManagerImpl(@Nonnull final DataBroker dataBroker, - final long globalNotificationQuota, - final boolean switchFeaturesMandatory, - final long barrierInterval, - final int barrierCountLimit, - final MessageSpy messageSpy, - final boolean isFlowRemovedNotificationOn, - final ClusterSingletonServiceProvider singletonServiceProvider, - final NotificationPublishService notificationPublishService, - final HashedWheelTimer hashedWheelTimer, - final ConvertorExecutor convertorExecutor, - final boolean skipTableFeatures, - final boolean useSingleLayerSerialization, - final DeviceInitializerProvider deviceInitializerProvider) { - + private final Object updatePacketInRateLimitersLock = new Object(); + private TranslatorLibrary translatorLibrary; + private ExtensionConverterProvider extensionConverterProvider; + private ScheduledThreadPoolExecutor spyPool; + private ContextChainHolder contextChainHolder; + + public DeviceManagerImpl(@Nonnull final OpenflowProviderConfig config, + @Nonnull final DataBroker dataBroker, + @Nonnull final MessageSpy messageSpy, + @Nonnull final NotificationPublishService notificationPublishService, + @Nonnull final HashedWheelTimer hashedWheelTimer, + @Nonnull final ConvertorExecutor convertorExecutor, + @Nonnull final DeviceInitializerProvider deviceInitializerProvider) { + this.config = config; this.dataBroker = dataBroker; this.deviceInitializerProvider = deviceInitializerProvider; - - /* merge empty nodes to oper DS to predict any problems with missing parent for Node */ - final WriteTransaction tx = dataBroker.newWriteOnlyTransaction(); - final NodesBuilder nodesBuilder = new NodesBuilder(); - nodesBuilder.setNode(Collections.emptyList()); - tx.merge(LogicalDatastoreType.OPERATIONAL, InstanceIdentifier.create(Nodes.class), nodesBuilder.build()); - try { - tx.submit().get(); - } catch (ExecutionException | InterruptedException e) { - LOG.error("Creation of node failed.", e); - throw new IllegalStateException(e); - } - - this.switchFeaturesMandatory = switchFeaturesMandatory; - this.globalNotificationQuota = globalNotificationQuota; - this.isFlowRemovedNotificationOn = isFlowRemovedNotificationOn; - this.skipTableFeatures = skipTableFeatures; this.convertorExecutor = convertorExecutor; this.hashedWheelTimer = hashedWheelTimer; - this.barrierIntervalNanos = TimeUnit.MILLISECONDS.toNanos(barrierInterval); - this.barrierCountLimit = barrierCountLimit; this.spyPool = new ScheduledThreadPoolExecutor(1); this.notificationPublishService = notificationPublishService; this.messageSpy = messageSpy; - this.useSingleLayerSerialization = useSingleLayerSerialization; - } - - - @Override - public void setDeviceInitializationPhaseHandler(final DeviceInitializationPhaseHandler handler) { - this.deviceInitPhaseHandler = handler; - } - - @Override - public void onDeviceContextLevelUp(@CheckForNull DeviceInfo deviceInfo, final LifecycleService lifecycleService) throws Exception { - // final phase - we have to add new Device to MD-SAL DataStore - LOG.debug("Final phase of DeviceContextLevelUp for Node: {} ", deviceInfo.getNodeId()); - DeviceContext deviceContext = Preconditions.checkNotNull(deviceContexts.get(deviceInfo)); - deviceContext.onPublished(); - lifecycleService.registerDeviceRemovedHandler(this); - } - - @Override - public ConnectionStatus deviceConnected(@CheckForNull final ConnectionContext connectionContext) throws Exception { - return ConnectionStatus.MAY_CONTINUE; + DeviceInitializationUtil.makeEmptyNodes(dataBroker); } @Override @@ -169,24 +99,13 @@ public class DeviceManagerImpl implements DeviceManager, ExtensionConverterProvi @Override public void close() { - for (final Iterator iterator = Iterators.consumingIterator(deviceContexts.values().iterator()); - iterator.hasNext();) { - final DeviceContext deviceCtx = iterator.next(); - deviceCtx.shutdownConnection(); - deviceCtx.shuttingDownDataStoreTransactions(); - } - + deviceContexts.values().forEach(OFPContext::close); + deviceContexts.clear(); Optional.ofNullable(spyPool).ifPresent(ScheduledThreadPoolExecutor::shutdownNow); spyPool = null; } - @Override - public void onDeviceContextLevelDown(final DeviceInfo deviceInfo) { - updatePacketInRateLimiters(); - Optional.ofNullable(lifecycleServices.get(deviceInfo)).ifPresent(LifecycleService::close); - } - @Override public void initialize() { spyPool.scheduleAtFixedRate(messageSpy, SPY_RATE, SPY_RATE, TimeUnit.SECONDS); @@ -203,125 +122,16 @@ public class DeviceManagerImpl implements DeviceManager, ExtensionConverterProvi } @Override - public void setDeviceTerminationPhaseHandler(final DeviceTerminationPhaseHandler handler) { - this.deviceTerminPhaseHandler = handler; - } - - @Override - public void onDeviceDisconnected(final ConnectionContext connectionContext) { - LOG.trace("onDeviceDisconnected method call for Node: {}", connectionContext.getNodeId()); - final DeviceInfo deviceInfo = connectionContext.getDeviceInfo(); - final DeviceContext deviceCtx = this.deviceContexts.get(deviceInfo); - - if (Objects.isNull(deviceCtx)) { - LOG.info("DeviceContext for Node {} was not found. Connection is terminated without OFP context suite.", deviceInfo.getLOGValue()); - return; - } - - if (!connectionContext.equals(deviceCtx.getPrimaryConnectionContext())) { - LOG.debug("Node {} disconnected, but not primary connection.", connectionContext.getDeviceInfo().getLOGValue()); - // Connection is not PrimaryConnection so try to remove from Auxiliary Connections - deviceCtx.removeAuxiliaryConnectionContext(connectionContext); - // If this is not primary connection, we should not continue disabling everything - return; - } - - if (deviceCtx.getState().equals(OFPContext.CONTEXT_STATE.TERMINATION)) { - LOG.info("Device context for node {} is already is termination state, waiting for close all context", deviceInfo.getLOGValue()); - return; - } + public ListenableFuture removeDeviceFromOperationalDS(@Nonnull final KeyedInstanceIdentifier ii) { - deviceCtx.close(); - - // TODO: Auxiliary connections supported ? - // Device is disconnected and so we need to close TxManager - final ListenableFuture future = deviceCtx.shuttingDownDataStoreTransactions(); - Futures.addCallback(future, new FutureCallback() { - @Override - public void onSuccess(final Void result) { - LOG.debug("TxChainManager for device {} is closed successful.", deviceInfo.getLOGValue()); - deviceTerminPhaseHandler.onDeviceContextLevelDown(deviceInfo); - } - - @Override - public void onFailure(final Throwable t) { - LOG.warn("TxChainManager for device {} failed by closing.", deviceInfo.getLOGValue()); - LOG.trace("TxChainManager failed by closing. ", t); - deviceTerminPhaseHandler.onDeviceContextLevelDown(deviceInfo); - } - }); - - // Add timer for Close TxManager because it could fail in cluster without notification - final TimerTask timerTask = timeout -> { - if (!future.isDone()) { - LOG.warn("Shutting down TxChain for node {} not completed during 10 sec. Continue anyway.", deviceInfo.getLOGValue()); - future.cancel(false); - } - }; - - hashedWheelTimer.newTimeout(timerTask, 10, TimeUnit.SECONDS); - } - - @VisibleForTesting - void addDeviceContextToMap(final DeviceInfo deviceInfo, final DeviceContext deviceContext){ - deviceContexts.put(deviceInfo, deviceContext); - } - - @Override - public void setFlowRemovedNotificationOn(boolean isNotificationFlowRemovedOff) { - this.isFlowRemovedNotificationOn = isNotificationFlowRemovedOff; - } - - @Override - public boolean isFlowRemovedNotificationOn() { - return this.isFlowRemovedNotificationOn; - } - - - @Override - public void setSkipTableFeatures(boolean skipTableFeaturesValue) { - skipTableFeatures = skipTableFeaturesValue; - } - - @Override - public void setBarrierCountLimit(final int barrierCountLimit) { - this.barrierCountLimit = barrierCountLimit; - } - - @Override - public void setBarrierInterval(final long barrierTimeoutLimit) { - this.barrierIntervalNanos = TimeUnit.MILLISECONDS.toNanos(barrierTimeoutLimit); - } - - @Override - public CheckedFuture removeDeviceFromOperationalDS(final DeviceInfo deviceInfo) { final WriteTransaction delWtx = dataBroker.newWriteOnlyTransaction(); - delWtx.delete(LogicalDatastoreType.OPERATIONAL, deviceInfo.getNodeInstanceIdentifier()); - final CheckedFuture delFuture = delWtx.submit(); - - Futures.addCallback(delFuture, new FutureCallback() { - @Override - public void onSuccess(final Void result) { - if (LOG.isDebugEnabled()) { - LOG.debug("Delete Node {} was successful", deviceInfo.getLOGValue()); - } - } - - @Override - public void onFailure(@Nonnull final Throwable t) { - LOG.warn("Delete node {} failed with exception {}", deviceInfo.getLOGValue(), t); - } - }); + delWtx.delete(LogicalDatastoreType.OPERATIONAL, ii); + return delWtx.commit(); - return delFuture; } @Override - public void setUseSingleLayerSerialization(final Boolean useSingleLayerSerialization) { - this.useSingleLayerSerialization = useSingleLayerSerialization; - } - - public DeviceContext createContext(@CheckForNull final ConnectionContext connectionContext) { + public DeviceContext createContext(@Nonnull final ConnectionContext connectionContext) { LOG.info("ConnectionEvent: Device connected to controller, Device:{}, NodeId:{}", connectionContext.getConnectionAdapter().getRemoteAddress(), @@ -336,8 +146,8 @@ public class DeviceManagerImpl implements DeviceManager, ExtensionConverterProvi final OutboundQueueHandlerRegistration outboundQueueHandlerRegistration = connectionContext.getConnectionAdapter().registerOutboundQueueHandler( outboundQueueProvider, - barrierCountLimit, - barrierIntervalNanos); + config.getBarrierCountLimit().getValue(), + TimeUnit.MILLISECONDS.toNanos(config.getBarrierIntervalTimeoutLimit().getValue())); connectionContext.setOutboundQueueHandleRegistration(outboundQueueHandlerRegistration); @@ -346,15 +156,15 @@ public class DeviceManagerImpl implements DeviceManager, ExtensionConverterProvi dataBroker, messageSpy, translatorLibrary, - this, convertorExecutor, - skipTableFeatures, + config.isSkipTableFeatures(), hashedWheelTimer, - useSingleLayerSerialization, - deviceInitializerProvider); + config.isUseSingleLayerSerialization(), + deviceInitializerProvider, + config.isEnableFlowRemovedNotification(), + config.isSwitchFeaturesMandatory(), + contextChainHolder); - deviceContext.setSalRoleService(new SalRoleServiceImpl(deviceContext, deviceContext)); - deviceContext.setSwitchFeaturesMandatory(switchFeaturesMandatory); ((ExtensionConverterProviderKeeper) deviceContext).setExtensionConverterProvider(extensionConverterProvider); deviceContext.setNotificationPublishService(notificationPublishService); @@ -365,15 +175,16 @@ public class DeviceManagerImpl implements DeviceManager, ExtensionConverterProvi connectionContext.getConnectionAdapter(), deviceContext); connectionContext.getConnectionAdapter().setMessageListener(messageListener); + connectionContext.getConnectionAdapter().setAlienMessageListener(messageListener); return deviceContext; } private void updatePacketInRateLimiters() { - synchronized (deviceContexts) { + synchronized (updatePacketInRateLimitersLock) { final int deviceContextsSize = deviceContexts.size(); if (deviceContextsSize > 0) { - long freshNotificationLimit = globalNotificationQuota / deviceContextsSize; + long freshNotificationLimit = config.getGlobalNotificationQuota() / deviceContextsSize; if (freshNotificationLimit < 100) { freshNotificationLimit = 100; } @@ -387,21 +198,41 @@ public class DeviceManagerImpl implements DeviceManager, ExtensionConverterProvi } } - public void onDeviceRemoved(DeviceInfo deviceInfo) { + @Override + public void onDeviceRemoved(final DeviceInfo deviceInfo) { deviceContexts.remove(deviceInfo); - LOG.debug("Device context removed for node {}", deviceInfo.getLOGValue()); + if (LOG.isDebugEnabled()) { + LOG.debug("Device context removed for node {}", deviceInfo); + } - lifecycleServices.remove(deviceInfo); - LOG.debug("Lifecycle service removed for node {}", deviceInfo.getLOGValue()); + this.updatePacketInRateLimiters(); } @Override - public long getBarrierIntervalNanos() { - return barrierIntervalNanos; + public void sendNodeRemovedNotification(@Nonnull final KeyedInstanceIdentifier instanceIdentifier) { + if (notificationCreateNodeSend.remove(instanceIdentifier)) { + NodeRemovedBuilder builder = new NodeRemovedBuilder(); + builder.setNodeRef(new NodeRef(instanceIdentifier)); + LOG.info("Publishing node removed notification for {}", instanceIdentifier.firstKeyOf(Node.class).getId()); + notificationPublishService.offerNotification(builder.build()); + } } @Override - public int getBarrierCountLimit() { - return barrierCountLimit; + public void setContextChainHolder(@Nonnull ContextChainHolder contextChainHolder) { + this.contextChainHolder = contextChainHolder; + } + + @Override + public void sendNodeAddedNotification(@Nonnull final KeyedInstanceIdentifier instanceIdentifier) { + if (!notificationCreateNodeSend.contains(instanceIdentifier)) { + notificationCreateNodeSend.add(instanceIdentifier); + final NodeId id = instanceIdentifier.firstKeyOf(Node.class).getId(); + NodeUpdatedBuilder builder = new NodeUpdatedBuilder(); + builder.setId(id); + builder.setNodeRef(new NodeRef(instanceIdentifier)); + LOG.info("Publishing node added notification for {}", id); + notificationPublishService.offerNotification(builder.build()); + } } }