X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;f=openflowplugin-impl%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fopenflowplugin%2Fimpl%2Fdevice%2FDeviceManagerImpl.java;h=3d076300a96f4046488acc0127dc0c6eb071156a;hb=d8cc382a8dbdcb7c89b68457b3ae0b6d576ee28f;hp=31df880261d5dd668047c2b57febe1142531ca44;hpb=f9171764796b7be953be63305c193dcf9bfd81f9;p=openflowplugin.git diff --git a/openflowplugin-impl/src/main/java/org/opendaylight/openflowplugin/impl/device/DeviceManagerImpl.java b/openflowplugin-impl/src/main/java/org/opendaylight/openflowplugin/impl/device/DeviceManagerImpl.java index 31df880261..3d076300a9 100644 --- a/openflowplugin-impl/src/main/java/org/opendaylight/openflowplugin/impl/device/DeviceManagerImpl.java +++ b/openflowplugin-impl/src/main/java/org/opendaylight/openflowplugin/impl/device/DeviceManagerImpl.java @@ -14,7 +14,6 @@ import com.google.common.collect.Iterators; import com.google.common.util.concurrent.FutureCallback; import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; -import io.netty.util.Timeout; import io.netty.util.TimerTask; import java.util.Collections; import java.util.Iterator; @@ -26,25 +25,29 @@ import java.util.concurrent.TimeUnit; import javax.annotation.CheckForNull; import javax.annotation.Nonnull; import org.opendaylight.controller.md.sal.binding.api.DataBroker; -import org.opendaylight.controller.md.sal.binding.api.NotificationPublishService; import org.opendaylight.controller.md.sal.binding.api.WriteTransaction; import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType; +import org.opendaylight.mdsal.singleton.common.api.ClusterSingletonServiceProvider; import org.opendaylight.openflowjava.protocol.api.connection.ConnectionAdapter; import org.opendaylight.openflowjava.protocol.api.connection.OutboundQueueHandlerRegistration; +import org.opendaylight.openflowplugin.api.openflow.OFPContext; import org.opendaylight.openflowplugin.api.openflow.connection.ConnectionContext; import org.opendaylight.openflowplugin.api.openflow.connection.OutboundQueueProvider; import org.opendaylight.openflowplugin.api.openflow.device.DeviceContext; +import org.opendaylight.openflowplugin.api.openflow.device.DeviceInfo; import org.opendaylight.openflowplugin.api.openflow.device.DeviceManager; -import org.opendaylight.openflowplugin.api.openflow.device.DeviceState; import org.opendaylight.openflowplugin.api.openflow.device.TranslatorLibrary; import org.opendaylight.openflowplugin.api.openflow.device.handlers.DeviceInitializationPhaseHandler; import org.opendaylight.openflowplugin.api.openflow.device.handlers.DeviceTerminationPhaseHandler; import org.opendaylight.openflowplugin.api.openflow.lifecycle.LifecycleConductor; +import org.opendaylight.openflowplugin.api.openflow.lifecycle.LifecycleService; import org.opendaylight.openflowplugin.extension.api.ExtensionConverterProviderKeeper; import org.opendaylight.openflowplugin.extension.api.core.extension.ExtensionConverterProvider; import org.opendaylight.openflowplugin.impl.connection.OutboundQueueProviderImpl; import org.opendaylight.openflowplugin.impl.device.listener.OpenflowProtocolListenerFullImpl; -import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeId; +import org.opendaylight.openflowplugin.impl.lifecycle.LifecycleServiceImpl; +import org.opendaylight.openflowplugin.impl.util.DeviceInitializationUtils; +import org.opendaylight.openflowplugin.openflow.md.core.sal.convertor.ConvertorExecutor; import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.Nodes; import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodesBuilder; import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node; @@ -61,32 +64,41 @@ public class DeviceManagerImpl implements DeviceManager, ExtensionConverterProvi private final long globalNotificationQuota; private final boolean switchFeaturesMandatory; + private boolean isNotificationFlowRemovedOff; - private final int spyRate = 10; + private static final int SPY_RATE = 10; private final DataBroker dataBroker; + private final ConvertorExecutor convertorExecutor; private TranslatorLibrary translatorLibrary; private DeviceInitializationPhaseHandler deviceInitPhaseHandler; private DeviceTerminationPhaseHandler deviceTerminPhaseHandler; - private NotificationPublishService notificationPublishService; - private final ConcurrentMap deviceContexts = new ConcurrentHashMap<>(); + private final ConcurrentMap deviceContexts = new ConcurrentHashMap<>(); + private final ConcurrentMap lifecycleServices = new ConcurrentHashMap<>(); private final long barrierIntervalNanos; private final int barrierCountLimit; private ExtensionConverterProvider extensionConverterProvider; private ScheduledThreadPoolExecutor spyPool; + private final ClusterSingletonServiceProvider singletonServiceProvider; private final LifecycleConductor conductor; - private boolean isStatisticsRpcEnabled; public DeviceManagerImpl(@Nonnull final DataBroker dataBroker, - final long globalNotificationQuota, final boolean switchFeaturesMandatory, - final long barrierInterval, final int barrierCountLimit, - final LifecycleConductor lifecycleConductor) { + final long globalNotificationQuota, + final boolean switchFeaturesMandatory, + final long barrierInterval, + final int barrierCountLimit, + final LifecycleConductor lifecycleConductor, + boolean isNotificationFlowRemovedOff, + final ConvertorExecutor convertorExecutor, + final ClusterSingletonServiceProvider singletonServiceProvider) { this.switchFeaturesMandatory = switchFeaturesMandatory; this.globalNotificationQuota = globalNotificationQuota; + this.isNotificationFlowRemovedOff = isNotificationFlowRemovedOff; this.dataBroker = Preconditions.checkNotNull(dataBroker); + this.convertorExecutor = convertorExecutor; /* merge empty nodes to oper DS to predict any problems with missing parent for Node */ final WriteTransaction tx = dataBroker.newWriteOnlyTransaction(); @@ -105,6 +117,7 @@ public class DeviceManagerImpl implements DeviceManager, ExtensionConverterProvi this.conductor = lifecycleConductor; spyPool = new ScheduledThreadPoolExecutor(1); + this.singletonServiceProvider = singletonServiceProvider; } @@ -114,31 +127,31 @@ public class DeviceManagerImpl implements DeviceManager, ExtensionConverterProvi } @Override - public void onDeviceContextLevelUp(final NodeId nodeId) throws Exception { + public void onDeviceContextLevelUp(@CheckForNull DeviceInfo deviceInfo, final LifecycleService lifecycleService) throws Exception { // final phase - we have to add new Device to MD-SAL DataStore - LOG.debug("Final phase of DeviceContextLevelUp for Node: {} ", nodeId); - DeviceContext deviceContext = Preconditions.checkNotNull(deviceContexts.get(nodeId)); - ((DeviceContextImpl) deviceContext).initialSubmitTransaction(); + LOG.debug("Final phase of DeviceContextLevelUp for Node: {} ", deviceInfo.getNodeId()); + DeviceContext deviceContext = Preconditions.checkNotNull(deviceContexts.get(deviceInfo)); deviceContext.onPublished(); + lifecycleService.registerService(this.singletonServiceProvider); } @Override public boolean deviceConnected(@CheckForNull final ConnectionContext connectionContext) throws Exception { Preconditions.checkArgument(connectionContext != null); - NodeId nodeId = connectionContext.getNodeId(); - /** + DeviceInfo deviceInfo = connectionContext.getDeviceInfo(); + /* * This part prevent destroy another device context. Throwing here an exception result to propagate close connection * in {@link org.opendaylight.openflowplugin.impl.connection.org.opendaylight.openflowplugin.impl.connection.HandshakeContextImpl} * If context already exist we are in state closing process (connection flapping) and we should not propagate connection close */ - if (deviceContexts.containsKey(nodeId)) { + if (deviceContexts.containsKey(deviceInfo)) { LOG.warn("Rejecting connection from node which is already connected and there exist deviceContext for it: {}", connectionContext.getNodeId()); return false; } LOG.info("ConnectionEvent: Device connected to controller, Device:{}, NodeId:{}", - connectionContext.getConnectionAdapter().getRemoteAddress(), nodeId); + connectionContext.getConnectionAdapter().getRemoteAddress(), deviceInfo.getNodeId()); // Add Disconnect handler connectionContext.setDeviceDisconnectedHandler(DeviceManagerImpl.this); @@ -148,45 +161,42 @@ public class DeviceManagerImpl implements DeviceManager, ExtensionConverterProvi //FIXME: as soon as auxiliary connection are fully supported then this is needed only before device context published connectionAdapter.setPacketInFiltering(true); - final Short version = connectionContext.getFeatures().getVersion(); - final OutboundQueueProvider outboundQueueProvider = new OutboundQueueProviderImpl(version); + final OutboundQueueProvider outboundQueueProvider = new OutboundQueueProviderImpl(deviceInfo.getVersion()); connectionContext.setOutboundQueueProvider(outboundQueueProvider); final OutboundQueueHandlerRegistration outboundQueueHandlerRegistration = connectionAdapter.registerOutboundQueueHandler(outboundQueueProvider, barrierCountLimit, barrierIntervalNanos); connectionContext.setOutboundQueueHandleRegistration(outboundQueueHandlerRegistration); - final DeviceState deviceState = createDeviceState(connectionContext); final DeviceContext deviceContext = new DeviceContextImpl(connectionContext, - deviceState, dataBroker, conductor, outboundQueueProvider, translatorLibrary, - switchFeaturesMandatory); + this, + convertorExecutor); - Verify.verify(deviceContexts.putIfAbsent(nodeId, deviceContext) == null, "DeviceCtx still not closed."); + final LifecycleService lifecycleService = new LifecycleServiceImpl(); + lifecycleService.setDeviceContext(deviceContext); + + Verify.verify(deviceContexts.putIfAbsent(deviceInfo, deviceContext) == null, "DeviceCtx still not closed."); + lifecycleServices.putIfAbsent(deviceInfo, lifecycleService); + + deviceContext.setSwitchFeaturesMandatory(switchFeaturesMandatory); ((ExtensionConverterProviderKeeper) deviceContext).setExtensionConverterProvider(extensionConverterProvider); - deviceContext.setStatisticsRpcEnabled(isStatisticsRpcEnabled); - deviceContext.setNotificationPublishService(notificationPublishService); + deviceContext.setNotificationPublishService(conductor.getNotificationPublishService()); updatePacketInRateLimiters(); final OpenflowProtocolListenerFullImpl messageListener = new OpenflowProtocolListenerFullImpl( connectionAdapter, deviceContext); - connectionAdapter.setMessageListener(messageListener); - deviceState.setValid(true); - - deviceInitPhaseHandler.onDeviceContextLevelUp(nodeId); + connectionAdapter.setMessageListener(messageListener); + deviceInitPhaseHandler.onDeviceContextLevelUp(connectionContext.getDeviceInfo(), lifecycleService); return true; } - private static DeviceStateImpl createDeviceState(final @Nonnull ConnectionContext connectionContext) { - return new DeviceStateImpl(connectionContext.getFeatures(), connectionContext.getNodeId()); - } - private void updatePacketInRateLimiters() { synchronized (deviceContexts) { final int deviceContextsSize = deviceContexts.size(); @@ -213,11 +223,6 @@ public class DeviceManagerImpl implements DeviceManager, ExtensionConverterProvi this.translatorLibrary = translatorLibrary; } - @Override - public void setNotificationPublishService(final NotificationPublishService notificationService) { - notificationPublishService = notificationService; - } - @Override public void close() { for (final Iterator iterator = Iterators.consumingIterator(deviceContexts.values().iterator()); @@ -234,25 +239,21 @@ public class DeviceManagerImpl implements DeviceManager, ExtensionConverterProvi } @Override - public void onDeviceContextLevelDown(final DeviceContext deviceContext) { - LOG.debug("onDeviceContextClosed for Node {}", deviceContext.getDeviceState().getNodeId()); - deviceContexts.remove(deviceContext.getPrimaryConnectionContext().getNodeId(), deviceContext); + public void onDeviceContextLevelDown(final DeviceInfo deviceInfo) { + LOG.debug("onDeviceContextClosed for Node {}", deviceInfo.getNodeId()); + deviceContexts.remove(deviceInfo); updatePacketInRateLimiters(); + LifecycleService lifecycleService = lifecycleServices.remove(deviceInfo); + try { + lifecycleService.close(); + } catch (Exception e) { + LOG.warn("Closing service for node {} was unsuccessful ", deviceInfo.getNodeId().getValue(), e); + } } @Override public void initialize() { - spyPool.scheduleAtFixedRate(conductor.getMessageIntelligenceAgency(), spyRate, spyRate, TimeUnit.SECONDS); - } - - @Override - public DeviceContext getDeviceContextFromNodeId(final NodeId nodeId) { - return deviceContexts.get(nodeId); - } - - @Override - public void setStatisticsRpcEnabled(boolean isStatisticsRpcEnabled) { - this.isStatisticsRpcEnabled = isStatisticsRpcEnabled; + spyPool.scheduleAtFixedRate(conductor.getMessageIntelligenceAgency(), SPY_RATE, SPY_RATE, TimeUnit.SECONDS); } @Override @@ -273,11 +274,11 @@ public class DeviceManagerImpl implements DeviceManager, ExtensionConverterProvi @Override public void onDeviceDisconnected(final ConnectionContext connectionContext) { LOG.trace("onDeviceDisconnected method call for Node: {}", connectionContext.getNodeId()); - final NodeId nodeId = connectionContext.getNodeId(); - final DeviceContext deviceCtx = this.deviceContexts.get(nodeId); + final DeviceInfo deviceInfo = connectionContext.getDeviceInfo(); + final DeviceContext deviceCtx = this.deviceContexts.get(deviceInfo); if (null == deviceCtx) { - LOG.info("DeviceContext for Node {} was not found. Connection is terminated without OFP context suite.", nodeId); + LOG.info("DeviceContext for Node {} was not found. Connection is terminated without OFP context suite.", deviceInfo.getNodeId()); return; } @@ -291,25 +292,21 @@ public class DeviceManagerImpl implements DeviceManager, ExtensionConverterProvi @Override public void onSuccess(final Void result) { - LOG.debug("TxChainManager for device {} is closed successful.", nodeId); - deviceTerminPhaseHandler.onDeviceContextLevelDown(deviceCtx); + LOG.debug("TxChainManager for device {} is closed successful.", deviceInfo.getNodeId()); + deviceTerminPhaseHandler.onDeviceContextLevelDown(deviceInfo); } @Override public void onFailure(final Throwable t) { - LOG.warn("TxChainManager for device {} failed by closing.", nodeId, t); - deviceTerminPhaseHandler.onDeviceContextLevelDown(deviceCtx); + LOG.warn("TxChainManager for device {} failed by closing.", deviceInfo.getNodeId(), t); + deviceTerminPhaseHandler.onDeviceContextLevelDown(deviceInfo); } }); /* Add timer for Close TxManager because it could fain ind cluster without notification */ - final TimerTask timerTask = new TimerTask() { - - @Override - public void run(final Timeout timeout) throws Exception { - if (!future.isDone()) { - LOG.info("Shutting down TxChain for node {} not completed during 10 sec. Continue anyway.", nodeId); - future.cancel(false); - } + final TimerTask timerTask = timeout -> { + if (!future.isDone()) { + LOG.info("Shutting down TxChain for node {} not completed during 10 sec. Continue anyway.", deviceInfo.getNodeId()); + future.cancel(false); } }; conductor.newTimeout(timerTask, 10, TimeUnit.SECONDS); @@ -317,7 +314,23 @@ public class DeviceManagerImpl implements DeviceManager, ExtensionConverterProvi } @VisibleForTesting - void addDeviceContextToMap(final NodeId nodeId, final DeviceContext deviceContext){ - deviceContexts.put(nodeId, deviceContext); + void addDeviceContextToMap(final DeviceInfo deviceInfo, final DeviceContext deviceContext){ + deviceContexts.put(deviceInfo, deviceContext); + } + + @Override + public T gainContext(final DeviceInfo deviceInfo) { + return (T) deviceContexts.get(deviceInfo); } + + @Override + public void setIsNotificationFlowRemovedOff(boolean isNotificationFlowRemovedOff) { + this.isNotificationFlowRemovedOff = isNotificationFlowRemovedOff; + } + + @Override + public boolean getIsNotificationFlowRemovedOff() { + return this.isNotificationFlowRemovedOff; + } + }