X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;f=openflowplugin-impl%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fopenflowplugin%2Fimpl%2Fdevice%2FDeviceContextImpl.java;h=96acf077a969620cd8ccaeef7d571fdc33f3a83c;hb=f53689206f94ea1ca7cb15ec32149c8ef85041a9;hp=bcf4259fc9632f3fcdb507737f14fc0255bc380d;hpb=31772f5ccce34997b0dc1031b441af4c3ef21557;p=openflowplugin.git diff --git a/openflowplugin-impl/src/main/java/org/opendaylight/openflowplugin/impl/device/DeviceContextImpl.java b/openflowplugin-impl/src/main/java/org/opendaylight/openflowplugin/impl/device/DeviceContextImpl.java index bcf4259fc9..96acf077a9 100644 --- a/openflowplugin-impl/src/main/java/org/opendaylight/openflowplugin/impl/device/DeviceContextImpl.java +++ b/openflowplugin-impl/src/main/java/org/opendaylight/openflowplugin/impl/device/DeviceContextImpl.java @@ -18,25 +18,27 @@ import java.math.BigInteger; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.concurrent.ExecutionException; import javax.annotation.Nonnull; import org.opendaylight.controller.md.sal.binding.api.DataBroker; import org.opendaylight.controller.md.sal.binding.api.NotificationPublishService; import org.opendaylight.controller.md.sal.binding.api.ReadOnlyTransaction; import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType; +import org.opendaylight.controller.md.sal.common.api.data.TransactionChainClosedException; +import org.opendaylight.mdsal.singleton.common.api.ServiceGroupIdentifier; import org.opendaylight.openflowjava.protocol.api.connection.ConnectionAdapter; -import org.opendaylight.openflowjava.protocol.api.connection.OutboundQueue; import org.opendaylight.openflowjava.protocol.api.keys.MessageTypeKey; import org.opendaylight.openflowplugin.api.openflow.connection.ConnectionContext; -import org.opendaylight.openflowplugin.api.openflow.connection.OutboundQueueProvider; import org.opendaylight.openflowplugin.api.openflow.device.DeviceContext; import org.opendaylight.openflowplugin.api.openflow.device.DeviceInfo; +import org.opendaylight.openflowplugin.api.openflow.device.DeviceManager; import org.opendaylight.openflowplugin.api.openflow.device.DeviceState; import org.opendaylight.openflowplugin.api.openflow.device.MessageTranslator; import org.opendaylight.openflowplugin.api.openflow.device.RequestContext; import org.opendaylight.openflowplugin.api.openflow.device.TranslatorLibrary; import org.opendaylight.openflowplugin.api.openflow.device.Xid; import org.opendaylight.openflowplugin.api.openflow.device.handlers.MultiMsgCollector; -import org.opendaylight.openflowplugin.api.openflow.lifecycle.LifecycleConductor; +import org.opendaylight.openflowplugin.api.openflow.lifecycle.LifecycleService; import org.opendaylight.openflowplugin.api.openflow.md.core.SwitchConnectionDistinguisher; import org.opendaylight.openflowplugin.api.openflow.md.core.TranslatorKey; import org.opendaylight.openflowplugin.api.openflow.registry.ItemLifeCycleRegistry; @@ -60,6 +62,8 @@ import org.opendaylight.openflowplugin.impl.registry.flow.DeviceFlowRegistryImpl import org.opendaylight.openflowplugin.impl.registry.flow.FlowRegistryKeyFactory; import org.opendaylight.openflowplugin.impl.registry.group.DeviceGroupRegistryImpl; import org.opendaylight.openflowplugin.impl.registry.meter.DeviceMeterRegistryImpl; +import org.opendaylight.openflowplugin.impl.util.DeviceInitializationUtils; +import org.opendaylight.openflowplugin.openflow.md.core.sal.convertor.ConvertorExecutor; import org.opendaylight.openflowplugin.openflow.md.core.session.SwitchConnectionCookieOFImpl; import org.opendaylight.yang.gen.v1.urn.opendaylight.experimenter.message.service.rev151020.ExperimenterMessageFromDevBuilder; import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowCapableNode; @@ -97,7 +101,7 @@ import org.slf4j.LoggerFactory; /** * */ -public class DeviceContextImpl implements DeviceContext, ExtensionConverterProviderKeeper { +public class DeviceContextImpl implements DeviceContext, ExtensionConverterProviderKeeper{ private static final Logger LOG = LoggerFactory.getLogger(DeviceContextImpl.class); @@ -108,7 +112,7 @@ public class DeviceContextImpl implements DeviceContext, ExtensionConverterProvi // TODO: high water mark factor should be parametrized private static final float HIGH_WATERMARK_FACTOR = 0.95f; - private final ConnectionContext primaryConnectionContext; + private ConnectionContext primaryConnectionContext; private final DeviceState deviceState; private final DataBroker dataBroker; private final Map auxiliaryConnectionContexts; @@ -120,7 +124,6 @@ public class DeviceContextImpl implements DeviceContext, ExtensionConverterProvi private final MessageSpy messageSpy; private final ItemLifeCycleKeeper flowLifeCycleKeeper; private NotificationPublishService notificationPublishService; - private final OutboundQueue outboundQueueProvider; private Timeout barrierTaskTimeout; private final MessageTranslator portStatusTranslator; private final MessageTranslator packetInTranslator; @@ -128,34 +131,34 @@ public class DeviceContextImpl implements DeviceContext, ExtensionConverterProvi private final TranslatorLibrary translatorLibrary; private final ItemLifeCycleRegistry itemLifeCycleSourceRegistry; private ExtensionConverterProvider extensionConverterProvider; + private final DeviceManager deviceManager; + private boolean switchFeaturesMandatory; private final DeviceInfo deviceInfo; - - private volatile CONTEXT_STATE contextState; - - @VisibleForTesting - DeviceContextImpl(@Nonnull final ConnectionContext primaryConnectionContext, - @Nonnull final DeviceState deviceState, - @Nonnull final DataBroker dataBroker, - @Nonnull final LifecycleConductor conductor, - @Nonnull final OutboundQueueProvider outboundQueueProvider, - @Nonnull final TranslatorLibrary translatorLibrary) { + private final ConvertorExecutor convertorExecutor; + private volatile CONTEXT_STATE state; + + public DeviceContextImpl( + @Nonnull final ConnectionContext primaryConnectionContext, + @Nonnull final DataBroker dataBroker, + @Nonnull final MessageSpy messageSpy, + @Nonnull final TranslatorLibrary translatorLibrary, + @Nonnull final DeviceManager manager, + final ConvertorExecutor convertorExecutor) { this.primaryConnectionContext = Preconditions.checkNotNull(primaryConnectionContext); - this.deviceState = Preconditions.checkNotNull(deviceState); + this.deviceInfo = primaryConnectionContext.getDeviceInfo(); + this.deviceState = new DeviceStateImpl(); this.dataBroker = Preconditions.checkNotNull(dataBroker); - Preconditions.checkNotNull(conductor); - this.outboundQueueProvider = Preconditions.checkNotNull(outboundQueueProvider); - deviceInfo = primaryConnectionContext.getDeviceInfo(); - this.transactionChainManager = new TransactionChainManager(dataBroker, deviceInfo, conductor); + this.transactionChainManager = new TransactionChainManager(dataBroker, deviceInfo); auxiliaryConnectionContexts = new HashMap<>(); - deviceFlowRegistry = new DeviceFlowRegistryImpl(); + deviceFlowRegistry = new DeviceFlowRegistryImpl(dataBroker, deviceInfo.getNodeInstanceIdentifier()); deviceGroupRegistry = new DeviceGroupRegistryImpl(); deviceMeterRegistry = new DeviceMeterRegistryImpl(); - messageSpy = conductor.getMessageIntelligenceAgency(); - + this.messageSpy = Preconditions.checkNotNull(messageSpy); + this.deviceManager = Preconditions.checkNotNull(manager); packetInLimiter = new PacketInRateLimiter(primaryConnectionContext.getConnectionAdapter(), - /*initial*/ 1000, /*initial*/2000, messageSpy, REJECTED_DRAIN_FACTOR); + /*initial*/ 1000, /*initial*/2000, this.messageSpy, REJECTED_DRAIN_FACTOR); this.translatorLibrary = translatorLibrary; portStatusTranslator = translatorLibrary.lookupTranslator( @@ -168,20 +171,17 @@ public class DeviceContextImpl implements DeviceContext, ExtensionConverterProvi itemLifeCycleSourceRegistry = new ItemLifeCycleRegistryImpl(); flowLifeCycleKeeper = new ItemLifeCycleSourceImpl(); itemLifeCycleSourceRegistry.registerLifeCycleSource(flowLifeCycleKeeper); - contextState = CONTEXT_STATE.INITIALIZATION; + this.state = CONTEXT_STATE.INITIALIZATION; + this.convertorExecutor = convertorExecutor; } /** * This method is called from {@link DeviceManagerImpl} only. So we could say "posthandshake process finish" * and we are able to set a scheduler for an automatic transaction submitting by time (0,5sec). */ - void initialSubmitTransaction() { - transactionChainManager.initialSubmitWriteTransaction(); - } - @Override - public Long reserveXidForDeviceMessage() { - return outboundQueueProvider.reserveEntry(); + public void initialSubmitTransaction() { + transactionChainManager.initialSubmitWriteTransaction(); } @Override @@ -198,7 +198,7 @@ public class DeviceContextImpl implements DeviceContext, ExtensionConverterProvi public void removeAuxiliaryConnectionContext(final ConnectionContext connectionContext) { final SwitchConnectionDistinguisher connectionDistinguisher = createConnectionDistinguisher(connectionContext); LOG.debug("auxiliary connection dropped: {}, nodeId:{}", connectionContext.getConnectionAdapter() - .getRemoteAddress(), deviceInfo.getNodeId()); + .getRemoteAddress(), getDeviceInfo().getLOGValue()); auxiliaryConnectionContexts.remove(connectionDistinguisher); } @@ -207,11 +207,6 @@ public class DeviceContextImpl implements DeviceContext, ExtensionConverterProvi return deviceState; } - @Override - public DeviceInfo getDeviceInfo() { - return this.deviceInfo; - } - @Override public ReadOnlyTransaction getReadTransaction() { return dataBroker.newReadOnlyTransaction(); @@ -219,17 +214,20 @@ public class DeviceContextImpl implements DeviceContext, ExtensionConverterProvi @Override public void writeToTransaction(final LogicalDatastoreType store, - final InstanceIdentifier path, final T data) throws Exception { + final InstanceIdentifier path, + final T data){ transactionChainManager.writeToTransaction(store, path, data, false); } @Override - public void writeToTransactionWithParentsSlow(LogicalDatastoreType store, InstanceIdentifier path, T data) throws Exception { + public void writeToTransactionWithParentsSlow(final LogicalDatastoreType store, + final InstanceIdentifier path, + final T data){ transactionChainManager.writeToTransaction(store, path, data, true); } @Override - public void addDeleteToTxChain(final LogicalDatastoreType store, final InstanceIdentifier path) throws Exception { + public void addDeleteToTxChain(final LogicalDatastoreType store, final InstanceIdentifier path) throws TransactionChainClosedException { transactionChainManager.addDeleteOperationTotTxChain(store, path); } @@ -281,11 +279,19 @@ public class DeviceContextImpl implements DeviceContext, ExtensionConverterProvi @Override public void processFlowRemovedMessage(final FlowRemoved flowRemoved) { + //1. translate to general flow (table, priority, match, cookie) + final org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.FlowRemoved flowRemovedNotification = + flowRemovedTranslator.translate(flowRemoved, deviceInfo, null); + + if(!deviceManager.getIsNotificationFlowRemovedOff()) { + // Trigger off a notification + notificationPublishService.offerNotification(flowRemovedNotification); + } else if(LOG.isDebugEnabled()) { + LOG.debug("For nodeId={} isNotificationFlowRemovedOff={}", getDeviceInfo().getLOGValue(), deviceManager.getIsNotificationFlowRemovedOff()); + } + final ItemLifecycleListener itemLifecycleListener = flowLifeCycleKeeper.getItemLifecycleListener(); if (itemLifecycleListener != null) { - //1. translate to general flow (table, priority, match, cookie) - final org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.FlowRemoved flowRemovedNotification = - flowRemovedTranslator.translate(flowRemoved, deviceInfo, null); //2. create registry key final FlowRegistryKey flowRegKey = FlowRegistryKeyFactory.create(flowRemovedNotification); //3. lookup flowId @@ -299,8 +305,6 @@ public class DeviceContextImpl implements DeviceContext, ExtensionConverterProvi .child(Flow.class, new FlowKey(flowDescriptor.getFlowId())); // b) notify listener itemLifecycleListener.onRemoved(flowPath); - // c) trigger off a notification - notificationPublishService.offerNotification(flowRemovedNotification); } else { LOG.debug("flow id not found: nodeId={} tableId={}, priority={}", getDeviceInfo().getNodeId(), flowRegKey.getTableId(), flowRemovedNotification.getPriority()); @@ -311,7 +315,7 @@ public class DeviceContextImpl implements DeviceContext, ExtensionConverterProvi @Override public void processPortStatusMessage(final PortStatusMessage portStatus) { messageSpy.spyMessage(portStatus.getImplementedInterface(), MessageSpy.STATISTIC_GROUP.FROM_SWITCH_PUBLISHED_SUCCESS); - final FlowCapableNodeConnector flowCapableNodeConnector = portStatusTranslator.translate(portStatus, deviceInfo, null); + final FlowCapableNodeConnector flowCapableNodeConnector = portStatusTranslator.translate(portStatus, getDeviceInfo(), null); final KeyedInstanceIdentifier iiToNodeConnector = provideIIToNodeConnector(portStatus.getPortNo(), portStatus.getVersion()); try { @@ -326,13 +330,13 @@ public class DeviceContextImpl implements DeviceContext, ExtensionConverterProvi } submitTransaction(); } catch (final Exception e) { - LOG.warn("Error processing port status message: {}", e.getMessage()); + LOG.warn("Error processing port status message: ", e); } } private KeyedInstanceIdentifier provideIIToNodeConnector(final long portNo, final short version) { - final InstanceIdentifier iiToNodes = deviceInfo.getNodeInstanceIdentifier(); - final BigInteger dataPathId = deviceInfo.getDatapathId(); + final InstanceIdentifier iiToNodes = getDeviceInfo().getNodeInstanceIdentifier(); + final BigInteger dataPathId = getDeviceInfo().getDatapathId(); final NodeConnectorId nodeConnectorId = NodeStaticReplyTranslatorUtil.nodeConnectorId(dataPathId.toString(), portNo, version); return iiToNodes.child(NodeConnector.class, new NodeConnectorKey(nodeConnectorId)); } @@ -341,7 +345,7 @@ public class DeviceContextImpl implements DeviceContext, ExtensionConverterProvi public void processPacketInMessage(final PacketInMessage packetInMessage) { messageSpy.spyMessage(packetInMessage.getImplementedInterface(), MessageSpy.STATISTIC_GROUP.FROM_SWITCH); final ConnectionAdapter connectionAdapter = getPrimaryConnectionContext().getConnectionAdapter(); - final PacketReceived packetReceived = packetInTranslator.translate(packetInMessage, deviceInfo, null); + final PacketReceived packetReceived = packetInTranslator.translate(packetInMessage, getDeviceInfo(), null); if (packetReceived == null) { LOG.debug("Received a null packet from switch {}", connectionAdapter.getRemoteAddress()); @@ -389,13 +393,13 @@ public class DeviceContextImpl implements DeviceContext, ExtensionConverterProvi // lookup converter final ExperimenterDataOfChoice vendorData = notification.getExperimenterDataOfChoice(); final MessageTypeKey key = new MessageTypeKey<>( - deviceInfo.getVersion(), + getDeviceInfo().getVersion(), (Class) vendorData.getImplementedInterface()); final ConvertorMessageFromOFJava messageConverter = extensionConverterProvider.getMessageConverter(key); if (messageConverter == null) { LOG.warn("custom converter for {}[OF:{}] not found", notification.getExperimenterDataOfChoice().getImplementedInterface(), - deviceInfo.getVersion()); + getDeviceInfo().getVersion()); return; } // build notification @@ -403,7 +407,7 @@ public class DeviceContextImpl implements DeviceContext, ExtensionConverterProvi try { messageOfChoice = messageConverter.convert(vendorData, MessagePath.MESSAGE_NOTIFICATION); final ExperimenterMessageFromDevBuilder experimenterMessageFromDevBld = new ExperimenterMessageFromDevBuilder() - .setNode(new NodeRef(deviceInfo.getNodeInstanceIdentifier())) + .setNode(new NodeRef(getDeviceInfo().getNodeInstanceIdentifier())) .setExperimenterMessageOfChoice(messageOfChoice); // publish notificationPublishService.offerNotification(experimenterMessageFromDevBld.build()); @@ -421,7 +425,7 @@ public class DeviceContextImpl implements DeviceContext, ExtensionConverterProvi public synchronized void close() { LOG.debug("closing deviceContext: {}, nodeId:{}", getPrimaryConnectionContext().getConnectionAdapter().getRemoteAddress(), - getDeviceInfo().getNodeId()); + getDeviceInfo().getLOGValue()); // NOOP throw new UnsupportedOperationException("Autocloseble.close will be removed soon"); } @@ -448,8 +452,8 @@ public class DeviceContextImpl implements DeviceContext, ExtensionConverterProvi @Override public void onPublished() { - Verify.verify(CONTEXT_STATE.INITIALIZATION.equals(contextState)); - contextState = CONTEXT_STATE.WORKING; + Verify.verify(CONTEXT_STATE.INITIALIZATION.equals(getState())); + setState(CONTEXT_STATE.WORKING); primaryConnectionContext.getConnectionAdapter().setPacketInFiltering(false); for (final ConnectionContext switchAuxConnectionContext : auxiliaryConnectionContexts.values()) { switchAuxConnectionContext.getConnectionAdapter().setPacketInFiltering(false); @@ -483,15 +487,15 @@ public class DeviceContextImpl implements DeviceContext, ExtensionConverterProvi @Override public synchronized void shutdownConnection() { - LOG.debug("Shutdown method for node {}", deviceInfo.getNodeId()); - if (CONTEXT_STATE.TERMINATION.equals(contextState)) { - LOG.debug("DeviceCtx for Node {} is in termination process.", deviceInfo.getNodeId()); + LOG.debug("Shutdown method for node {}", getDeviceInfo().getLOGValue()); + if (CONTEXT_STATE.TERMINATION.equals(getState())) { + LOG.debug("DeviceCtx for Node {} is in termination process.", getDeviceInfo().getLOGValue()); return; } - contextState = CONTEXT_STATE.TERMINATION; + setState(CONTEXT_STATE.TERMINATION); if (ConnectionContext.CONNECTION_STATE.RIP.equals(getPrimaryConnectionContext().getConnectionState())) { - LOG.debug("ConnectionCtx for Node {} is in RIP state.", deviceInfo.getNodeId()); + LOG.debug("ConnectionCtx for Node {} is in RIP state.", getDeviceInfo().getLOGValue()); return; } /* Terminate Auxiliary Connection */ @@ -517,8 +521,54 @@ public class DeviceContextImpl implements DeviceContext, ExtensionConverterProvi return this.transactionChainManager; } + @Override + public void setSwitchFeaturesMandatory(boolean switchFeaturesMandatory) { + this.switchFeaturesMandatory = switchFeaturesMandatory; + } + @Override public CONTEXT_STATE getState() { - return this.contextState; + return this.state; + } + + @Override + public void setState(CONTEXT_STATE state) { + this.state = state; + } + + @Override + public void startupClusterServices() throws ExecutionException, InterruptedException { + LOG.debug("Initializing transaction chain manager for node {}", getDeviceInfo().getLOGValue()); + this.transactionChainManager.activateTransactionManager(); + LOG.debug("Waiting to get node {} information", getDeviceInfo().getLOGValue()); + DeviceInitializationUtils.initializeNodeInformation(this, switchFeaturesMandatory, this.convertorExecutor); + } + + @Override + public ListenableFuture stopClusterServices(boolean deviceDisconnected) { + return this.transactionChainManager.deactivateTransactionManager(); + } + + @Override + public ServiceGroupIdentifier getServiceIdentifier() { + return this.deviceInfo.getServiceIdentifier(); + } + + @Override + public DeviceInfo getDeviceInfo() { + return this.deviceInfo; + } + + @Override + public void putLifecycleServiceIntoTxChainManager(final LifecycleService lifecycleService){ + this.transactionChainManager.setLifecycleService(lifecycleService); + } + + @Override + public void replaceConnectionContext(final ConnectionContext connectionContext){ + // Act like we are initializing the context + setState(CONTEXT_STATE.INITIALIZATION); + this.primaryConnectionContext = connectionContext; + this.onPublished(); } }