X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;f=openflowplugin-impl%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fopenflowplugin%2Fimpl%2Fdevice%2FDeviceManagerImpl.java;h=ab75ce3b82106dc81d74d33dbf25526997f14007;hb=c1f7214a7181ffabc24f89b812153b034f7f7079;hp=244069f2d9328a849de33b288b5afdafba329cbb;hpb=2e76db6b47cf358727f267fe74a1b0b2a6d4d001;p=openflowplugin.git diff --git a/openflowplugin-impl/src/main/java/org/opendaylight/openflowplugin/impl/device/DeviceManagerImpl.java b/openflowplugin-impl/src/main/java/org/opendaylight/openflowplugin/impl/device/DeviceManagerImpl.java index 244069f2d9..ab75ce3b82 100644 --- a/openflowplugin-impl/src/main/java/org/opendaylight/openflowplugin/impl/device/DeviceManagerImpl.java +++ b/openflowplugin-impl/src/main/java/org/opendaylight/openflowplugin/impl/device/DeviceManagerImpl.java @@ -1,4 +1,4 @@ -/** +/* * Copyright (c) 2015, 2017 Cisco Systems, Inc. and others. All rights reserved. * * This program and the accompanying materials are made available under the @@ -7,115 +7,90 @@ */ package org.opendaylight.openflowplugin.impl.device; -import com.google.common.collect.Iterators; -import com.google.common.util.concurrent.CheckedFuture; -import com.google.common.util.concurrent.FutureCallback; -import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.ListenableFuture; import io.netty.util.HashedWheelTimer; -import io.netty.util.internal.ConcurrentSet; -import java.util.Collections; -import java.util.Iterator; -import java.util.Optional; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; -import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; import java.util.concurrent.ScheduledThreadPoolExecutor; import java.util.concurrent.TimeUnit; -import javax.annotation.Nonnull; -import javax.annotation.Nullable; -import org.opendaylight.controller.md.sal.binding.api.DataBroker; -import org.opendaylight.controller.md.sal.binding.api.NotificationPublishService; -import org.opendaylight.controller.md.sal.binding.api.WriteTransaction; -import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType; -import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException; +import org.eclipse.jdt.annotation.NonNull; +import org.opendaylight.mdsal.binding.api.DataBroker; +import org.opendaylight.mdsal.binding.api.NotificationPublishService; +import org.opendaylight.mdsal.binding.api.WriteTransaction; +import org.opendaylight.mdsal.common.api.LogicalDatastoreType; import org.opendaylight.openflowjava.protocol.api.connection.OutboundQueueHandlerRegistration; +import org.opendaylight.openflowplugin.api.openflow.OFPContext; import org.opendaylight.openflowplugin.api.openflow.connection.ConnectionContext; import org.opendaylight.openflowplugin.api.openflow.connection.OutboundQueueProvider; import org.opendaylight.openflowplugin.api.openflow.device.DeviceContext; import org.opendaylight.openflowplugin.api.openflow.device.DeviceInfo; import org.opendaylight.openflowplugin.api.openflow.device.DeviceManager; import org.opendaylight.openflowplugin.api.openflow.device.TranslatorLibrary; +import org.opendaylight.openflowplugin.api.openflow.lifecycle.ContextChainHolder; import org.opendaylight.openflowplugin.api.openflow.statistics.ofpspecific.MessageSpy; import org.opendaylight.openflowplugin.extension.api.ExtensionConverterProviderKeeper; import org.opendaylight.openflowplugin.extension.api.core.extension.ExtensionConverterProvider; import org.opendaylight.openflowplugin.impl.connection.OutboundQueueProviderImpl; import org.opendaylight.openflowplugin.impl.device.initialization.DeviceInitializerProvider; import org.opendaylight.openflowplugin.impl.device.listener.OpenflowProtocolListenerFullImpl; -import org.opendaylight.openflowplugin.impl.services.sal.SalRoleServiceImpl; +import org.opendaylight.openflowplugin.impl.util.DeviceInitializationUtil; import org.opendaylight.openflowplugin.openflow.md.core.sal.convertor.ConvertorExecutor; +import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeId; import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeRef; import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeRemovedBuilder; import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeUpdatedBuilder; -import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.Nodes; -import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodesBuilder; import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node; import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.NodeKey; -import org.opendaylight.yangtools.yang.binding.InstanceIdentifier; +import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.openflow.provider.config.rev160510.OpenflowProviderConfig; +import org.opendaylight.yangtools.util.concurrent.QueuedNotificationManager; import org.opendaylight.yangtools.yang.binding.KeyedInstanceIdentifier; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -/** - * - */ public class DeviceManagerImpl implements DeviceManager, ExtensionConverterProviderKeeper { private static final Logger LOG = LoggerFactory.getLogger(DeviceManagerImpl.class); - - private long globalNotificationQuota; - private boolean switchFeaturesMandatory; - private boolean isFlowRemovedNotificationOn; - private boolean skipTableFeatures; private static final int SPY_RATE = 10; + private final OpenflowProviderConfig config; private final DataBroker dataBroker; private final DeviceInitializerProvider deviceInitializerProvider; private final ConvertorExecutor convertorExecutor; - private TranslatorLibrary translatorLibrary; - private final ConcurrentMap deviceContexts = new ConcurrentHashMap<>(); - private final Set notificationCreateNodeSend = new ConcurrentSet<>(); - - private long barrierIntervalNanos; - private int barrierCountLimit; - - private ExtensionConverterProvider extensionConverterProvider; - private ScheduledThreadPoolExecutor spyPool; + private final Set> notificationCreateNodeSend = + ConcurrentHashMap.newKeySet(); private final NotificationPublishService notificationPublishService; private final MessageSpy messageSpy; private final HashedWheelTimer hashedWheelTimer; - private final boolean useSingleLayerSerialization; - - public DeviceManagerImpl(@Nonnull final DataBroker dataBroker, - @Nonnull final MessageSpy messageSpy, - @Nullable final NotificationPublishService notificationPublishService, - @Nonnull final HashedWheelTimer hashedWheelTimer, - @Nonnull final ConvertorExecutor convertorExecutor, - @Nonnull final DeviceInitializerProvider deviceInitializerProvider, - final boolean useSingleLayerSerialization) { - + private final Object updatePacketInRateLimitersLock = new Object(); + private TranslatorLibrary translatorLibrary; + private ExtensionConverterProvider extensionConverterProvider; + private ScheduledThreadPoolExecutor spyPool; + private ContextChainHolder contextChainHolder; + private final QueuedNotificationManager queuedNotificationManager; + + public DeviceManagerImpl(@NonNull final OpenflowProviderConfig config, + @NonNull final DataBroker dataBroker, + @NonNull final MessageSpy messageSpy, + @NonNull final NotificationPublishService notificationPublishService, + @NonNull final HashedWheelTimer hashedWheelTimer, + @NonNull final ConvertorExecutor convertorExecutor, + @NonNull final DeviceInitializerProvider deviceInitializerProvider, + @NonNull final ExecutorService executorService) { + this.config = config; this.dataBroker = dataBroker; this.deviceInitializerProvider = deviceInitializerProvider; - this.useSingleLayerSerialization = useSingleLayerSerialization; - - /* merge empty nodes to oper DS to predict any problems with missing parent for Node */ - final WriteTransaction tx = dataBroker.newWriteOnlyTransaction(); - final NodesBuilder nodesBuilder = new NodesBuilder(); - nodesBuilder.setNode(Collections.emptyList()); - tx.merge(LogicalDatastoreType.OPERATIONAL, InstanceIdentifier.create(Nodes.class), nodesBuilder.build()); - try { - tx.submit().get(); - } catch (ExecutionException | InterruptedException e) { - LOG.error("Creation of node failed.", e); - throw new IllegalStateException(e); - } - this.convertorExecutor = convertorExecutor; this.hashedWheelTimer = hashedWheelTimer; this.spyPool = new ScheduledThreadPoolExecutor(1); this.notificationPublishService = notificationPublishService; this.messageSpy = messageSpy; + DeviceInitializationUtil.makeEmptyNodes(dataBroker); + this.queuedNotificationManager = QueuedNotificationManager.create(executorService, (key, entries) -> { + entries.forEach(jobEntry -> jobEntry.run()); + }, 2048, "port-status-queue"); } @Override @@ -130,16 +105,12 @@ public class DeviceManagerImpl implements DeviceManager, ExtensionConverterProvi @Override public void close() { - for (final Iterator iterator = Iterators.consumingIterator(deviceContexts.values().iterator()); - iterator.hasNext();) { - final DeviceContext deviceCtx = iterator.next(); - deviceCtx.shutdownConnection(); - deviceCtx.shuttingDownDataStoreTransactions(); + deviceContexts.values().forEach(OFPContext::close); + deviceContexts.clear(); + if (spyPool != null) { + spyPool.shutdownNow(); + spyPool = null; } - - Optional.ofNullable(spyPool).ifPresent(ScheduledThreadPoolExecutor::shutdownNow); - spyPool = null; - } @Override @@ -158,69 +129,16 @@ public class DeviceManagerImpl implements DeviceManager, ExtensionConverterProvi } @Override - public void setFlowRemovedNotificationOn(boolean isNotificationFlowRemovedOff) { - this.isFlowRemovedNotificationOn = isNotificationFlowRemovedOff; - } - - @Override - public boolean isFlowRemovedNotificationOn() { - return this.isFlowRemovedNotificationOn; - } - - @Override - public void setGlobalNotificationQuota(final long globalNotificationQuota) { - this.globalNotificationQuota = globalNotificationQuota; - } - - @Override - public void setSwitchFeaturesMandatory(final boolean switchFeaturesMandatory) { - this.switchFeaturesMandatory = switchFeaturesMandatory; - } + public ListenableFuture removeDeviceFromOperationalDS(@NonNull final KeyedInstanceIdentifier ii) { - @Override - public void setSkipTableFeatures(boolean skipTableFeaturesValue) { - skipTableFeatures = skipTableFeaturesValue; - } - - @Override - public void setBarrierCountLimit(final int barrierCountLimit) { - this.barrierCountLimit = barrierCountLimit; - } - - @Override - public void setBarrierInterval(final long barrierTimeoutLimit) { - this.barrierIntervalNanos = TimeUnit.MILLISECONDS.toNanos(barrierTimeoutLimit); - } - - @Override - public CheckedFuture removeDeviceFromOperationalDS(final KeyedInstanceIdentifier ii) { final WriteTransaction delWtx = dataBroker.newWriteOnlyTransaction(); delWtx.delete(LogicalDatastoreType.OPERATIONAL, ii); - final CheckedFuture delFuture = delWtx.submit(); - - Futures.addCallback(delFuture, new FutureCallback() { - @Override - public void onSuccess(final Void result) { - if (LOG.isDebugEnabled()) { - LOG.debug("Delete Node {} was successful", ii); - } - } + return delWtx.commit(); - @Override - public void onFailure(@Nonnull final Throwable t) { - LOG.warn("Delete node {} failed with exception {}", ii, t); - } - }); - - return delFuture; } @Override - public CheckedFuture removeDeviceFromOperationalDS(final DeviceInfo deviceInfo) { - return this.removeDeviceFromOperationalDS(deviceInfo.getNodeInstanceIdentifier()); - } - - public DeviceContext createContext(@Nonnull final ConnectionContext connectionContext) { + public DeviceContext createContext(@NonNull final ConnectionContext connectionContext) { LOG.info("ConnectionEvent: Device connected to controller, Device:{}, NodeId:{}", connectionContext.getConnectionAdapter().getRemoteAddress(), @@ -235,8 +153,8 @@ public class DeviceManagerImpl implements DeviceManager, ExtensionConverterProvi final OutboundQueueHandlerRegistration outboundQueueHandlerRegistration = connectionContext.getConnectionAdapter().registerOutboundQueueHandler( outboundQueueProvider, - barrierCountLimit, - barrierIntervalNanos); + config.getBarrierCountLimit().getValue().toJava(), + TimeUnit.MILLISECONDS.toNanos(config.getBarrierIntervalTimeoutLimit().getValue().toJava())); connectionContext.setOutboundQueueHandleRegistration(outboundQueueHandlerRegistration); @@ -245,15 +163,16 @@ public class DeviceManagerImpl implements DeviceManager, ExtensionConverterProvi dataBroker, messageSpy, translatorLibrary, - this, convertorExecutor, - skipTableFeatures, + config.isSkipTableFeatures(), hashedWheelTimer, - useSingleLayerSerialization, - deviceInitializerProvider); - - deviceContext.setSalRoleService(new SalRoleServiceImpl(deviceContext, deviceContext)); - deviceContext.setSwitchFeaturesMandatory(switchFeaturesMandatory); + config.isUseSingleLayerSerialization(), + deviceInitializerProvider, + config.isEnableFlowRemovedNotification(), + config.isSwitchFeaturesMandatory(), + contextChainHolder, + queuedNotificationManager, + config.isIsStatisticsPollingOn()); ((ExtensionConverterProviderKeeper) deviceContext).setExtensionConverterProvider(extensionConverterProvider); deviceContext.setNotificationPublishService(notificationPublishService); @@ -264,15 +183,16 @@ public class DeviceManagerImpl implements DeviceManager, ExtensionConverterProvi connectionContext.getConnectionAdapter(), deviceContext); connectionContext.getConnectionAdapter().setMessageListener(messageListener); + connectionContext.getConnectionAdapter().setAlienMessageListener(messageListener); return deviceContext; } private void updatePacketInRateLimiters() { - synchronized (deviceContexts) { + synchronized (updatePacketInRateLimitersLock) { final int deviceContextsSize = deviceContexts.size(); if (deviceContextsSize > 0) { - long freshNotificationLimit = globalNotificationQuota / deviceContextsSize; + long freshNotificationLimit = config.getGlobalNotificationQuota().toJava() / deviceContextsSize; if (freshNotificationLimit < 100) { freshNotificationLimit = 100; } @@ -286,49 +206,39 @@ public class DeviceManagerImpl implements DeviceManager, ExtensionConverterProvi } } - private void sendNodeRemovedNotification(final DeviceInfo deviceInfo) { - notificationCreateNodeSend.remove(deviceInfo); - NodeRemovedBuilder builder = new NodeRemovedBuilder(); - builder.setNodeRef(new NodeRef(deviceInfo.getNodeInstanceIdentifier())); - if (LOG.isDebugEnabled()) { - LOG.debug("Publishing node removed notification for {}", deviceInfo.getLOGValue()); - } - notificationPublishService.offerNotification(builder.build()); - } - - @Override public void onDeviceRemoved(final DeviceInfo deviceInfo) { - this.sendNodeRemovedNotification(deviceInfo); deviceContexts.remove(deviceInfo); if (LOG.isDebugEnabled()) { - LOG.debug("Device context removed for node {}", deviceInfo.getLOGValue()); - } - if (deviceContexts.size() > 0) { - this.updatePacketInRateLimiters(); + LOG.debug("Device context removed for node {}", deviceInfo); } + this.updatePacketInRateLimiters(); } @Override - public long getBarrierIntervalNanos() { - return barrierIntervalNanos; + public void sendNodeRemovedNotification(@NonNull final KeyedInstanceIdentifier instanceIdentifier) { + if (notificationCreateNodeSend.remove(instanceIdentifier)) { + NodeRemovedBuilder builder = new NodeRemovedBuilder(); + builder.setNodeRef(new NodeRef(instanceIdentifier)); + LOG.info("Publishing node removed notification for {}", instanceIdentifier.firstKeyOf(Node.class).getId()); + notificationPublishService.offerNotification(builder.build()); + } } @Override - public int getBarrierCountLimit() { - return barrierCountLimit; + public void setContextChainHolder(@NonNull final ContextChainHolder contextChainHolder) { + this.contextChainHolder = contextChainHolder; } @Override - public void sendNodeAddedNotification(@Nonnull final DeviceInfo deviceInfo) { - if (!notificationCreateNodeSend.contains(deviceInfo)) { - notificationCreateNodeSend.add(deviceInfo); + public void sendNodeAddedNotification(@NonNull final KeyedInstanceIdentifier instanceIdentifier) { + if (!notificationCreateNodeSend.contains(instanceIdentifier)) { + notificationCreateNodeSend.add(instanceIdentifier); + final NodeId id = instanceIdentifier.firstKeyOf(Node.class).getId(); NodeUpdatedBuilder builder = new NodeUpdatedBuilder(); - builder.setId(deviceInfo.getNodeId()); - builder.setNodeRef(new NodeRef(deviceInfo.getNodeInstanceIdentifier())); - if (LOG.isDebugEnabled()) { - LOG.debug("Publishing node added notification for {}", deviceInfo.getLOGValue()); - } + builder.setId(id); + builder.setNodeRef(new NodeRef(instanceIdentifier)); + LOG.info("Publishing node added notification for {}", id); notificationPublishService.offerNotification(builder.build()); } }