X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;f=openflowplugin-impl%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fopenflowplugin%2Fimpl%2Fdevice%2FDeviceManagerImpl.java;h=ab75ce3b82106dc81d74d33dbf25526997f14007;hb=c1f7214a7181ffabc24f89b812153b034f7f7079;hp=32cb41d6a51faa3c882a4f43a37c4b8ce7a91f49;hpb=5ba35748d3bdaf34f96de152142a99e20d058b24;p=openflowplugin.git diff --git a/openflowplugin-impl/src/main/java/org/opendaylight/openflowplugin/impl/device/DeviceManagerImpl.java b/openflowplugin-impl/src/main/java/org/opendaylight/openflowplugin/impl/device/DeviceManagerImpl.java index 32cb41d6a5..ab75ce3b82 100644 --- a/openflowplugin-impl/src/main/java/org/opendaylight/openflowplugin/impl/device/DeviceManagerImpl.java +++ b/openflowplugin-impl/src/main/java/org/opendaylight/openflowplugin/impl/device/DeviceManagerImpl.java @@ -1,4 +1,4 @@ -/** +/* * Copyright (c) 2015, 2017 Cisco Systems, Inc. and others. All rights reserved. * * This program and the accompanying materials are made available under the @@ -9,18 +9,17 @@ package org.opendaylight.openflowplugin.impl.device; import com.google.common.util.concurrent.ListenableFuture; import io.netty.util.HashedWheelTimer; -import io.netty.util.internal.ConcurrentSet; -import java.util.Optional; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.ExecutorService; import java.util.concurrent.ScheduledThreadPoolExecutor; import java.util.concurrent.TimeUnit; -import javax.annotation.Nonnull; -import org.opendaylight.controller.md.sal.binding.api.DataBroker; -import org.opendaylight.controller.md.sal.binding.api.NotificationPublishService; -import org.opendaylight.controller.md.sal.binding.api.WriteTransaction; -import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType; +import org.eclipse.jdt.annotation.NonNull; +import org.opendaylight.mdsal.binding.api.DataBroker; +import org.opendaylight.mdsal.binding.api.NotificationPublishService; +import org.opendaylight.mdsal.binding.api.WriteTransaction; +import org.opendaylight.mdsal.common.api.LogicalDatastoreType; import org.opendaylight.openflowjava.protocol.api.connection.OutboundQueueHandlerRegistration; import org.opendaylight.openflowplugin.api.openflow.OFPContext; import org.opendaylight.openflowplugin.api.openflow.connection.ConnectionContext; @@ -29,6 +28,7 @@ import org.opendaylight.openflowplugin.api.openflow.device.DeviceContext; import org.opendaylight.openflowplugin.api.openflow.device.DeviceInfo; import org.opendaylight.openflowplugin.api.openflow.device.DeviceManager; import org.opendaylight.openflowplugin.api.openflow.device.TranslatorLibrary; +import org.opendaylight.openflowplugin.api.openflow.lifecycle.ContextChainHolder; import org.opendaylight.openflowplugin.api.openflow.statistics.ofpspecific.MessageSpy; import org.opendaylight.openflowplugin.extension.api.ExtensionConverterProviderKeeper; import org.opendaylight.openflowplugin.extension.api.core.extension.ExtensionConverterProvider; @@ -44,13 +44,11 @@ import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeUpd import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node; import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.NodeKey; import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.openflow.provider.config.rev160510.OpenflowProviderConfig; +import org.opendaylight.yangtools.util.concurrent.QueuedNotificationManager; import org.opendaylight.yangtools.yang.binding.KeyedInstanceIdentifier; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -/** - * - */ public class DeviceManagerImpl implements DeviceManager, ExtensionConverterProviderKeeper { private static final Logger LOG = LoggerFactory.getLogger(DeviceManagerImpl.class); @@ -61,21 +59,26 @@ public class DeviceManagerImpl implements DeviceManager, ExtensionConverterProvi private final DeviceInitializerProvider deviceInitializerProvider; private final ConvertorExecutor convertorExecutor; private final ConcurrentMap deviceContexts = new ConcurrentHashMap<>(); - private final Set> notificationCreateNodeSend = new ConcurrentSet<>(); + private final Set> notificationCreateNodeSend = + ConcurrentHashMap.newKeySet(); private final NotificationPublishService notificationPublishService; private final MessageSpy messageSpy; private final HashedWheelTimer hashedWheelTimer; + private final Object updatePacketInRateLimitersLock = new Object(); private TranslatorLibrary translatorLibrary; private ExtensionConverterProvider extensionConverterProvider; private ScheduledThreadPoolExecutor spyPool; - - public DeviceManagerImpl(@Nonnull final OpenflowProviderConfig config, - @Nonnull final DataBroker dataBroker, - @Nonnull final MessageSpy messageSpy, - @Nonnull final NotificationPublishService notificationPublishService, - @Nonnull final HashedWheelTimer hashedWheelTimer, - @Nonnull final ConvertorExecutor convertorExecutor, - @Nonnull final DeviceInitializerProvider deviceInitializerProvider) { + private ContextChainHolder contextChainHolder; + private final QueuedNotificationManager queuedNotificationManager; + + public DeviceManagerImpl(@NonNull final OpenflowProviderConfig config, + @NonNull final DataBroker dataBroker, + @NonNull final MessageSpy messageSpy, + @NonNull final NotificationPublishService notificationPublishService, + @NonNull final HashedWheelTimer hashedWheelTimer, + @NonNull final ConvertorExecutor convertorExecutor, + @NonNull final DeviceInitializerProvider deviceInitializerProvider, + @NonNull final ExecutorService executorService) { this.config = config; this.dataBroker = dataBroker; this.deviceInitializerProvider = deviceInitializerProvider; @@ -85,6 +88,9 @@ public class DeviceManagerImpl implements DeviceManager, ExtensionConverterProvi this.notificationPublishService = notificationPublishService; this.messageSpy = messageSpy; DeviceInitializationUtil.makeEmptyNodes(dataBroker); + this.queuedNotificationManager = QueuedNotificationManager.create(executorService, (key, entries) -> { + entries.forEach(jobEntry -> jobEntry.run()); + }, 2048, "port-status-queue"); } @Override @@ -101,9 +107,10 @@ public class DeviceManagerImpl implements DeviceManager, ExtensionConverterProvi public void close() { deviceContexts.values().forEach(OFPContext::close); deviceContexts.clear(); - Optional.ofNullable(spyPool).ifPresent(ScheduledThreadPoolExecutor::shutdownNow); - spyPool = null; - + if (spyPool != null) { + spyPool.shutdownNow(); + spyPool = null; + } } @Override @@ -122,16 +129,16 @@ public class DeviceManagerImpl implements DeviceManager, ExtensionConverterProvi } @Override - public ListenableFuture removeDeviceFromOperationalDS( - @Nonnull final KeyedInstanceIdentifier ii) { + public ListenableFuture removeDeviceFromOperationalDS(@NonNull final KeyedInstanceIdentifier ii) { final WriteTransaction delWtx = dataBroker.newWriteOnlyTransaction(); delWtx.delete(LogicalDatastoreType.OPERATIONAL, ii); - return delWtx.submit(); + return delWtx.commit(); } - public DeviceContext createContext(@Nonnull final ConnectionContext connectionContext) { + @Override + public DeviceContext createContext(@NonNull final ConnectionContext connectionContext) { LOG.info("ConnectionEvent: Device connected to controller, Device:{}, NodeId:{}", connectionContext.getConnectionAdapter().getRemoteAddress(), @@ -146,8 +153,8 @@ public class DeviceManagerImpl implements DeviceManager, ExtensionConverterProvi final OutboundQueueHandlerRegistration outboundQueueHandlerRegistration = connectionContext.getConnectionAdapter().registerOutboundQueueHandler( outboundQueueProvider, - config.getBarrierCountLimit().getValue(), - TimeUnit.MILLISECONDS.toNanos(config.getBarrierIntervalTimeoutLimit().getValue())); + config.getBarrierCountLimit().getValue().toJava(), + TimeUnit.MILLISECONDS.toNanos(config.getBarrierIntervalTimeoutLimit().getValue().toJava())); connectionContext.setOutboundQueueHandleRegistration(outboundQueueHandlerRegistration); @@ -162,8 +169,10 @@ public class DeviceManagerImpl implements DeviceManager, ExtensionConverterProvi config.isUseSingleLayerSerialization(), deviceInitializerProvider, config.isEnableFlowRemovedNotification(), - config.isSwitchFeaturesMandatory()); - + config.isSwitchFeaturesMandatory(), + contextChainHolder, + queuedNotificationManager, + config.isIsStatisticsPollingOn()); ((ExtensionConverterProviderKeeper) deviceContext).setExtensionConverterProvider(extensionConverterProvider); deviceContext.setNotificationPublishService(notificationPublishService); @@ -180,10 +189,10 @@ public class DeviceManagerImpl implements DeviceManager, ExtensionConverterProvi } private void updatePacketInRateLimiters() { - synchronized (deviceContexts) { + synchronized (updatePacketInRateLimitersLock) { final int deviceContextsSize = deviceContexts.size(); if (deviceContextsSize > 0) { - long freshNotificationLimit = config.getGlobalNotificationQuota() / deviceContextsSize; + long freshNotificationLimit = config.getGlobalNotificationQuota().toJava() / deviceContextsSize; if (freshNotificationLimit < 100) { freshNotificationLimit = 100; } @@ -201,15 +210,13 @@ public class DeviceManagerImpl implements DeviceManager, ExtensionConverterProvi public void onDeviceRemoved(final DeviceInfo deviceInfo) { deviceContexts.remove(deviceInfo); if (LOG.isDebugEnabled()) { - LOG.debug("Device context removed for node {}", deviceInfo.getLOGValue()); - } - if (deviceContexts.size() > 0) { - this.updatePacketInRateLimiters(); + LOG.debug("Device context removed for node {}", deviceInfo); } + this.updatePacketInRateLimiters(); } @Override - public void sendNodeRemovedNotification(@Nonnull final KeyedInstanceIdentifier instanceIdentifier) { + public void sendNodeRemovedNotification(@NonNull final KeyedInstanceIdentifier instanceIdentifier) { if (notificationCreateNodeSend.remove(instanceIdentifier)) { NodeRemovedBuilder builder = new NodeRemovedBuilder(); builder.setNodeRef(new NodeRef(instanceIdentifier)); @@ -219,7 +226,12 @@ public class DeviceManagerImpl implements DeviceManager, ExtensionConverterProvi } @Override - public void sendNodeAddedNotification(@Nonnull final KeyedInstanceIdentifier instanceIdentifier) { + public void setContextChainHolder(@NonNull final ContextChainHolder contextChainHolder) { + this.contextChainHolder = contextChainHolder; + } + + @Override + public void sendNodeAddedNotification(@NonNull final KeyedInstanceIdentifier instanceIdentifier) { if (!notificationCreateNodeSend.contains(instanceIdentifier)) { notificationCreateNodeSend.add(instanceIdentifier); final NodeId id = instanceIdentifier.firstKeyOf(Node.class).getId();