X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;f=openflowplugin-impl%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fopenflowplugin%2Fimpl%2Fdevice%2FDeviceContextImpl.java;h=c7141da52bc8e95d05d56b4ec0ffd811be38e4b9;hb=d3464a585a22e59c22cb5098442d2276a2ff3ad4;hp=6f4d7941607d7ae18c8fb58a67186deac84dd015;hpb=30933e036613db28b069631d4bcaeca745f5e98d;p=openflowplugin.git diff --git a/openflowplugin-impl/src/main/java/org/opendaylight/openflowplugin/impl/device/DeviceContextImpl.java b/openflowplugin-impl/src/main/java/org/opendaylight/openflowplugin/impl/device/DeviceContextImpl.java index 6f4d794160..c7141da52b 100644 --- a/openflowplugin-impl/src/main/java/org/opendaylight/openflowplugin/impl/device/DeviceContextImpl.java +++ b/openflowplugin-impl/src/main/java/org/opendaylight/openflowplugin/impl/device/DeviceContextImpl.java @@ -8,11 +8,8 @@ package org.opendaylight.openflowplugin.impl.device; import com.google.common.annotations.VisibleForTesting; -import com.google.common.base.Function; import com.google.common.base.Optional; -import com.google.common.base.Preconditions; import com.google.common.base.Verify; -import com.google.common.util.concurrent.AsyncFunction; import com.google.common.util.concurrent.FutureCallback; import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.JdkFutureAdapters; @@ -21,12 +18,15 @@ import io.netty.util.HashedWheelTimer; import io.netty.util.Timeout; import io.netty.util.TimerTask; import java.math.BigInteger; +import java.util.Collection; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Objects; import java.util.concurrent.ExecutionException; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; import javax.annotation.Nonnull; import javax.annotation.Nullable; import org.opendaylight.controller.md.sal.binding.api.DataBroker; @@ -35,12 +35,10 @@ import org.opendaylight.controller.md.sal.binding.api.ReadOnlyTransaction; import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType; import org.opendaylight.mdsal.singleton.common.api.ServiceGroupIdentifier; import org.opendaylight.openflowjava.protocol.api.connection.ConnectionAdapter; -import org.opendaylight.openflowjava.protocol.api.connection.OutboundQueueHandlerRegistration; import org.opendaylight.openflowjava.protocol.api.keys.MessageTypeKey; import org.opendaylight.openflowplugin.api.ConnectionException; import org.opendaylight.openflowplugin.api.OFConstants; import org.opendaylight.openflowplugin.api.openflow.connection.ConnectionContext; -import org.opendaylight.openflowplugin.api.openflow.connection.OutboundQueueProvider; import org.opendaylight.openflowplugin.api.openflow.device.DeviceContext; import org.opendaylight.openflowplugin.api.openflow.device.DeviceInfo; import org.opendaylight.openflowplugin.api.openflow.device.DeviceManager; @@ -51,7 +49,8 @@ import org.opendaylight.openflowplugin.api.openflow.device.TranslatorLibrary; import org.opendaylight.openflowplugin.api.openflow.device.Xid; import org.opendaylight.openflowplugin.api.openflow.device.handlers.ClusterInitializationPhaseHandler; import org.opendaylight.openflowplugin.api.openflow.device.handlers.MultiMsgCollector; -import org.opendaylight.openflowplugin.api.openflow.lifecycle.LifecycleService; +import org.opendaylight.openflowplugin.api.openflow.lifecycle.ContextChainMastershipState; +import org.opendaylight.openflowplugin.api.openflow.lifecycle.ContextChainState; import org.opendaylight.openflowplugin.api.openflow.lifecycle.MastershipChangeListener; import org.opendaylight.openflowplugin.api.openflow.md.core.SwitchConnectionDistinguisher; import org.opendaylight.openflowplugin.api.openflow.md.core.TranslatorKey; @@ -71,19 +70,16 @@ import org.opendaylight.openflowplugin.extension.api.core.extension.ExtensionCon import org.opendaylight.openflowplugin.extension.api.exception.ConversionException; import org.opendaylight.openflowplugin.extension.api.path.MessagePath; import org.opendaylight.openflowplugin.impl.common.ItemLifeCycleSourceImpl; -import org.opendaylight.openflowplugin.impl.connection.OutboundQueueProviderImpl; import org.opendaylight.openflowplugin.impl.datastore.MultipartWriterProvider; import org.opendaylight.openflowplugin.impl.datastore.MultipartWriterProviderFactory; import org.opendaylight.openflowplugin.impl.device.initialization.AbstractDeviceInitializer; import org.opendaylight.openflowplugin.impl.device.initialization.DeviceInitializerProvider; import org.opendaylight.openflowplugin.impl.device.listener.MultiMsgCollectorImpl; -import org.opendaylight.openflowplugin.impl.device.listener.OpenflowProtocolListenerFullImpl; import org.opendaylight.openflowplugin.impl.registry.flow.DeviceFlowRegistryImpl; import org.opendaylight.openflowplugin.impl.registry.flow.FlowRegistryKeyFactory; import org.opendaylight.openflowplugin.impl.registry.group.DeviceGroupRegistryImpl; import org.opendaylight.openflowplugin.impl.registry.meter.DeviceMeterRegistryImpl; import org.opendaylight.openflowplugin.impl.rpc.AbstractRequestContext; -import org.opendaylight.openflowplugin.impl.services.sal.SalRoleServiceImpl; import org.opendaylight.openflowplugin.openflow.md.core.sal.convertor.ConvertorExecutor; import org.opendaylight.openflowplugin.openflow.md.core.session.SwitchConnectionCookieOFImpl; import org.opendaylight.openflowplugin.openflow.md.util.InventoryDataServiceUtil; @@ -93,14 +89,10 @@ import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.Fl import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.Table; import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.table.Flow; import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.table.FlowKey; -import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeConnectorId; import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeRef; -import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeRemovedBuilder; -import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeUpdatedBuilder; import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.node.NodeConnector; import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.node.NodeConnectorBuilder; import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.node.NodeConnectorKey; -import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node; import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.common.types.rev130731.PortReason; import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.Error; import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.ExperimenterMessage; @@ -109,6 +101,7 @@ import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.PacketIn; import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.PacketInMessage; import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.PortGrouping; +import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.PortStatus; import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.PortStatusMessage; import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.experimenter.core.ExperimenterDataOfChoice; import org.opendaylight.yang.gen.v1.urn.opendaylight.openflowplugin.experimenter.types.rev151020.experimenter.core.message.ExperimenterMessageOfChoice; @@ -141,8 +134,12 @@ public class DeviceContextImpl implements DeviceContext, ExtensionConverterProvi // Timeout in seconds after what we will give up on propagating role private static final int SET_ROLE_TIMEOUT = 10; + // Timeout in milliseconds after what we will give up on initializing device + private static final int DEVICE_INIT_TIMEOUT = 9000; + private static final int LOW_WATERMARK = 1000; private static final int HIGH_WATERMARK = 2000; + private final MultipartWriterProvider writerProvider; private boolean initialized; @@ -171,29 +168,27 @@ public class DeviceContextImpl implements DeviceContext, ExtensionConverterProvi private boolean switchFeaturesMandatory; private DeviceInfo deviceInfo; private final ConvertorExecutor convertorExecutor; - private volatile CONTEXT_STATE state; + private volatile ContextState state; private ClusterInitializationPhaseHandler clusterInitializationPhaseHandler; private final DeviceManager myManager; private final DeviceInitializerProvider deviceInitializerProvider; private final boolean useSingleLayerSerialization; - private Boolean isAddNotificationSent = false; - private OutboundQueueProvider outboundQueueProvider; - private boolean wasOnceMaster; + private boolean hasState; + private boolean isInitialTransactionSubmitted; DeviceContextImpl( - @Nonnull final ConnectionContext primaryConnectionContext, - @Nonnull final DataBroker dataBroker, - @Nonnull final MessageSpy messageSpy, - @Nonnull final TranslatorLibrary translatorLibrary, - @Nonnull final DeviceManager contextManager, - final ConvertorExecutor convertorExecutor, - final boolean skipTableFeatures, - final HashedWheelTimer hashedWheelTimer, - final boolean useSingleLayerSerialization, - final DeviceInitializerProvider deviceInitializerProvider) { + @Nonnull final ConnectionContext primaryConnectionContext, + @Nonnull final DataBroker dataBroker, + @Nonnull final MessageSpy messageSpy, + @Nonnull final TranslatorLibrary translatorLibrary, + @Nonnull final DeviceManager contextManager, + final ConvertorExecutor convertorExecutor, + final boolean skipTableFeatures, + final HashedWheelTimer hashedWheelTimer, + final boolean useSingleLayerSerialization, + final DeviceInitializerProvider deviceInitializerProvider) { this.primaryConnectionContext = primaryConnectionContext; - this.outboundQueueProvider = (OutboundQueueProvider) primaryConnectionContext.getOutboundQueueProvider(); this.deviceInfo = primaryConnectionContext.getDeviceInfo(); this.hashedWheelTimer = hashedWheelTimer; this.deviceInitializerProvider = deviceInitializerProvider; @@ -201,7 +196,7 @@ public class DeviceContextImpl implements DeviceContext, ExtensionConverterProvi this.deviceState = new DeviceStateImpl(); this.dataBroker = dataBroker; this.auxiliaryConnectionContexts = new HashMap<>(); - this.messageSpy = Preconditions.checkNotNull(messageSpy); + this.messageSpy = messageSpy; this.packetInLimiter = new PacketInRateLimiter(primaryConnectionContext.getConnectionAdapter(), /*initial*/ LOW_WATERMARK, /*initial*/HIGH_WATERMARK, this.messageSpy, REJECTED_DRAIN_FACTOR); @@ -217,19 +212,18 @@ public class DeviceContextImpl implements DeviceContext, ExtensionConverterProvi this.itemLifeCycleSourceRegistry = new ItemLifeCycleRegistryImpl(); this.flowLifeCycleKeeper = new ItemLifeCycleSourceImpl(); this.itemLifeCycleSourceRegistry.registerLifeCycleSource(flowLifeCycleKeeper); - this.state = CONTEXT_STATE.INITIALIZATION; + this.state = ContextState.INITIALIZATION; this.convertorExecutor = convertorExecutor; this.skipTableFeatures = skipTableFeatures; this.useSingleLayerSerialization = useSingleLayerSerialization; this.initialized = false; - this.wasOnceMaster = false; + writerProvider = MultipartWriterProviderFactory.createDefaultProvider(this); } @Override - public void initialSubmitTransaction() { - if (initialized) { - transactionChainManager.initialSubmitWriteTransaction(); - } + public boolean initialSubmitTransaction() { + return (initialized &&(isInitialTransactionSubmitted = + transactionChainManager.initialSubmitWriteTransaction())); } @Override @@ -260,6 +254,11 @@ public class DeviceContextImpl implements DeviceContext, ExtensionConverterProvi return dataBroker.newReadOnlyTransaction(); } + @Override + public boolean isTransactionsEnabled() { + return isInitialTransactionSubmitted; + } + @Override public void writeToTransaction(final LogicalDatastoreType store, final InstanceIdentifier path, @@ -318,19 +317,19 @@ public class DeviceContextImpl implements DeviceContext, ExtensionConverterProvi @Override public void processReply(final OfHeader ofHeader) { messageSpy.spyMessage( - ofHeader.getImplementedInterface(), - (ofHeader instanceof Error) - ? MessageSpy.STATISTIC_GROUP.FROM_SWITCH_PUBLISHED_FAILURE - : MessageSpy.STATISTIC_GROUP.FROM_SWITCH_PUBLISHED_SUCCESS); + ofHeader.getImplementedInterface(), + (ofHeader instanceof Error) + ? MessageSpy.StatisticsGroup.FROM_SWITCH_PUBLISHED_FAILURE + : MessageSpy.StatisticsGroup.FROM_SWITCH_PUBLISHED_SUCCESS); } @Override public void processReply(final Xid xid, final List ofHeaderList) { ofHeaderList.forEach(header -> messageSpy.spyMessage( - header.getImplementedInterface(), - (header instanceof Error) - ? MessageSpy.STATISTIC_GROUP.FROM_SWITCH_PUBLISHED_FAILURE - : MessageSpy.STATISTIC_GROUP.FROM_SWITCH_PUBLISHED_SUCCESS)); + header.getImplementedInterface(), + (header instanceof Error) + ? MessageSpy.StatisticsGroup.FROM_SWITCH_PUBLISHED_FAILURE + : MessageSpy.StatisticsGroup.FROM_SWITCH_PUBLISHED_SUCCESS)); } @Override @@ -339,7 +338,7 @@ public class DeviceContextImpl implements DeviceContext, ExtensionConverterProvi final org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.FlowRemoved flowRemovedNotification = flowRemovedTranslator.translate(flowRemoved, deviceInfo, null); - if(!myManager.isFlowRemovedNotificationOn()) { + if(myManager.isFlowRemovedNotificationOn()) { // Trigger off a notification notificationPublishService.offerNotification(flowRemovedNotification); } else if(LOG.isDebugEnabled()) { @@ -368,81 +367,72 @@ public class DeviceContextImpl implements DeviceContext, ExtensionConverterProvi } } - @Override - public void sendNodeAddedNotification() { - if (!isAddNotificationSent) { - isAddNotificationSent = true; - NodeUpdatedBuilder builder = new NodeUpdatedBuilder(); - builder.setId(getDeviceInfo().getNodeId()); - builder.setNodeRef(new NodeRef(getDeviceInfo().getNodeInstanceIdentifier())); - LOG.debug("Publishing node added notification for {}", builder.build()); - notificationPublishService.offerNotification(builder.build()); - } - } - - @Override - public void sendNodeRemovedNotification() { - NodeRemovedBuilder builder = new NodeRemovedBuilder(); - builder.setNodeRef(new NodeRef(getDeviceInfo().getNodeInstanceIdentifier())); - LOG.debug("Publishing node removed notification for {}", builder.build()); - notificationPublishService.offerNotification(builder.build()); - } - @Override public void processPortStatusMessage(final PortStatusMessage portStatus) { - messageSpy.spyMessage(portStatus.getImplementedInterface(), MessageSpy.STATISTIC_GROUP.FROM_SWITCH_PUBLISHED_SUCCESS); - final FlowCapableNodeConnector flowCapableNodeConnector = portStatusTranslator.translate(portStatus, getDeviceInfo(), null); + messageSpy.spyMessage(portStatus.getImplementedInterface(), MessageSpy.StatisticsGroup.FROM_SWITCH_PUBLISHED_SUCCESS); - final KeyedInstanceIdentifier iiToNodeConnector = provideIIToNodeConnector(portStatus.getPortNo(), portStatus.getVersion()); - try { - if (portStatus.getReason().equals(PortReason.OFPPRADD) || portStatus.getReason().equals(PortReason.OFPPRMODIFY)) { - // because of ADD status node connector has to be created - final NodeConnectorBuilder nConnectorBuilder = new NodeConnectorBuilder().setKey(iiToNodeConnector.getKey()); - nConnectorBuilder.addAugmentation(FlowCapableNodeConnectorStatisticsData.class, new FlowCapableNodeConnectorStatisticsDataBuilder().build()); - nConnectorBuilder.addAugmentation(FlowCapableNodeConnector.class, flowCapableNodeConnector); - writeToTransaction(LogicalDatastoreType.OPERATIONAL, iiToNodeConnector, nConnectorBuilder.build()); - } else if (portStatus.getReason().equals(PortReason.OFPPRDELETE)) { - addDeleteToTxChain(LogicalDatastoreType.OPERATIONAL, iiToNodeConnector); + if (initialized) { + try { + writePortStatusMessage(portStatus); + submitTransaction(); + } catch (final Exception e) { + LOG.warn("Error processing port status message for port {} on device {}", + portStatus.getPortNo(), getDeviceInfo().getLOGValue(), e); } - submitTransaction(); - } catch (final Exception e) { - LOG.warn("Error processing port status message for port {} on device {} : {}", portStatus.getPortNo(), - getDeviceInfo().getNodeId().toString(), e); + } else if (!hasState) { + primaryConnectionContext.handlePortStatusMessage(portStatus); } } - private KeyedInstanceIdentifier provideIIToNodeConnector(final long portNo, final short version) { - final InstanceIdentifier iiToNodes = getDeviceInfo().getNodeInstanceIdentifier(); - final BigInteger dataPathId = getDeviceInfo().getDatapathId(); - final NodeConnectorId nodeConnectorId = InventoryDataServiceUtil.nodeConnectorIdfromDatapathPortNo(dataPathId, portNo, OpenflowVersion.get(version)); - return iiToNodes.child(NodeConnector.class, new NodeConnectorKey(nodeConnectorId)); + private void writePortStatusMessage(final PortStatus portStatusMessage) { + final FlowCapableNodeConnector flowCapableNodeConnector = portStatusTranslator + .translate(portStatusMessage, getDeviceInfo(), null); + + final KeyedInstanceIdentifier iiToNodeConnector = getDeviceInfo() + .getNodeInstanceIdentifier() + .child(NodeConnector.class, new NodeConnectorKey(InventoryDataServiceUtil + .nodeConnectorIdfromDatapathPortNo( + deviceInfo.getDatapathId(), + portStatusMessage.getPortNo(), + OpenflowVersion.get(deviceInfo.getVersion())))); + + if (PortReason.OFPPRADD.equals(portStatusMessage.getReason()) || PortReason.OFPPRMODIFY.equals(portStatusMessage.getReason())) { + // because of ADD status node connector has to be created + writeToTransaction(LogicalDatastoreType.OPERATIONAL, iiToNodeConnector, new NodeConnectorBuilder() + .setKey(iiToNodeConnector.getKey()) + .addAugmentation(FlowCapableNodeConnectorStatisticsData.class, new FlowCapableNodeConnectorStatisticsDataBuilder().build()) + .addAugmentation(FlowCapableNodeConnector.class, flowCapableNodeConnector) + .build()); + } else if (PortReason.OFPPRDELETE.equals(portStatusMessage.getReason())) { + addDeleteToTxChain(LogicalDatastoreType.OPERATIONAL, iiToNodeConnector); + } } @Override public void processPacketInMessage(final PacketInMessage packetInMessage) { - messageSpy.spyMessage(packetInMessage.getImplementedInterface(), MessageSpy.STATISTIC_GROUP.FROM_SWITCH); + messageSpy.spyMessage(packetInMessage.getImplementedInterface(), MessageSpy.StatisticsGroup.FROM_SWITCH); final ConnectionAdapter connectionAdapter = getPrimaryConnectionContext().getConnectionAdapter(); final PacketReceived packetReceived = packetInTranslator.translate(packetInMessage, getDeviceInfo(), null); if (packetReceived == null) { LOG.debug("Received a null packet from switch {}", connectionAdapter.getRemoteAddress()); - messageSpy.spyMessage(packetInMessage.getImplementedInterface(), MessageSpy.STATISTIC_GROUP.FROM_SWITCH_TRANSLATE_SRC_FAILURE); + messageSpy.spyMessage(packetInMessage.getImplementedInterface(), MessageSpy.StatisticsGroup.FROM_SWITCH_TRANSLATE_SRC_FAILURE); return; } else { - messageSpy.spyMessage(packetReceived.getImplementedInterface(), MessageSpy.STATISTIC_GROUP.FROM_SWITCH_TRANSLATE_OUT_SUCCESS); + messageSpy.spyMessage(packetReceived.getImplementedInterface(), MessageSpy.StatisticsGroup.FROM_SWITCH_TRANSLATE_OUT_SUCCESS); } if (!packetInLimiter.acquirePermit()) { LOG.debug("Packet limited"); // TODO: save packet into emergency slot if possible - messageSpy.spyMessage(packetReceived.getImplementedInterface(), MessageSpy.STATISTIC_GROUP.FROM_SWITCH_PACKET_IN_LIMIT_REACHED_AND_DROPPED); + messageSpy.spyMessage(packetReceived.getImplementedInterface(), MessageSpy.StatisticsGroup.FROM_SWITCH_PACKET_IN_LIMIT_REACHED_AND_DROPPED); return; } final ListenableFuture offerNotification = notificationPublishService.offerNotification(packetReceived); if (NotificationPublishService.REJECTED.equals(offerNotification)) { LOG.debug("notification offer rejected"); - messageSpy.spyMessage(packetReceived.getImplementedInterface(), MessageSpy.STATISTIC_GROUP.FROM_SWITCH_NOTIFICATION_REJECTED); + messageSpy.spyMessage(packetReceived.getImplementedInterface(), MessageSpy.StatisticsGroup.FROM_SWITCH_NOTIFICATION_REJECTED); packetInLimiter.drainLowWaterMark(); packetInLimiter.releasePermit(); return; @@ -451,13 +441,13 @@ public class DeviceContextImpl implements DeviceContext, ExtensionConverterProvi Futures.addCallback(offerNotification, new FutureCallback() { @Override public void onSuccess(final Object result) { - messageSpy.spyMessage(packetReceived.getImplementedInterface(), MessageSpy.STATISTIC_GROUP.FROM_SWITCH_PUBLISHED_SUCCESS); + messageSpy.spyMessage(packetReceived.getImplementedInterface(), MessageSpy.StatisticsGroup.FROM_SWITCH_PUBLISHED_SUCCESS); packetInLimiter.releasePermit(); } @Override public void onFailure(final Throwable t) { - messageSpy.spyMessage(packetReceived.getImplementedInterface(), MessageSpy.STATISTIC_GROUP.FROM_SWITCH_NOTIFICATION_REJECTED); + messageSpy.spyMessage(packetReceived.getImplementedInterface(), MessageSpy.StatisticsGroup.FROM_SWITCH_NOTIFICATION_REJECTED); LOG.debug("notification offer failed: {}", t.getMessage()); LOG.trace("notification offer failed..", t); packetInLimiter.releasePermit(); @@ -484,7 +474,7 @@ public class DeviceContextImpl implements DeviceContext, ExtensionConverterProvi try { messageOfChoice = messageConverter.convert(vendorData, MessagePath.MESSAGE_NOTIFICATION); final ExperimenterMessageFromDevBuilder experimenterMessageFromDevBld = new ExperimenterMessageFromDevBuilder() - .setNode(new NodeRef(getDeviceInfo().getNodeInstanceIdentifier())) + .setNode(new NodeRef(getDeviceInfo().getNodeInstanceIdentifier())) .setExperimenterMessageOfChoice(messageOfChoice); // publish notificationPublishService.offerNotification(experimenterMessageFromDevBld.build()); @@ -520,14 +510,9 @@ public class DeviceContextImpl implements DeviceContext, ExtensionConverterProvi @Override public void onPublished() { - Verify.verify(CONTEXT_STATE.INITIALIZATION.equals(getState())); - this.state = CONTEXT_STATE.WORKING; - synchronized (primaryConnectionContext) { - primaryConnectionContext.getConnectionAdapter().setPacketInFiltering(false); - } - for (final ConnectionContext switchAuxConnectionContext : auxiliaryConnectionContexts.values()) { - switchAuxConnectionContext.getConnectionAdapter().setPacketInFiltering(false); - } + Verify.verify(ContextState.INITIALIZATION.equals(getState())); + this.state = ContextState.WORKING; + primaryConnectionContext.getConnectionAdapter().setPacketInFiltering(false); } @Override @@ -560,7 +545,7 @@ public class DeviceContextImpl implements DeviceContext, ExtensionConverterProvi if (LOG.isDebugEnabled()) { LOG.debug("Shutdown method for node {}", getDeviceInfo().getLOGValue()); } - if (CONTEXT_STATE.TERMINATION.equals(getState())) { + if (ContextState.TERMINATION.equals(getState())) { LOG.debug("DeviceCtx for Node {} is in termination process.", getDeviceInfo().getLOGValue()); return; } @@ -605,79 +590,15 @@ public class DeviceContextImpl implements DeviceContext, ExtensionConverterProvi } @Override - public synchronized void replaceConnection(final ConnectionContext connectionContext) { - - primaryConnectionContext = null; - deviceInfo = null; - packetInLimiter = null; - - primaryConnectionContext = connectionContext; - deviceInfo = primaryConnectionContext.getDeviceInfo(); - - packetInLimiter = new PacketInRateLimiter(primaryConnectionContext.getConnectionAdapter(), - /*initial*/ LOW_WATERMARK, /*initial*/HIGH_WATERMARK, messageSpy, REJECTED_DRAIN_FACTOR); - - primaryConnectionContext.setOutboundQueueProvider(outboundQueueProvider); - - final OutboundQueueHandlerRegistration outboundQueueHandlerRegistration = - primaryConnectionContext.getConnectionAdapter().registerOutboundQueueHandler( - outboundQueueProvider, - myManager.getBarrierCountLimit(), - myManager.getBarrierIntervalNanos()); - - primaryConnectionContext.setOutboundQueueHandleRegistration(outboundQueueHandlerRegistration); - - final OpenflowProtocolListenerFullImpl messageListener = new OpenflowProtocolListenerFullImpl( - primaryConnectionContext.getConnectionAdapter(), this); - - primaryConnectionContext.getConnectionAdapter().setMessageListener(messageListener); - - LOG.info("ConnectionEvent: Connection on device:{}, NodeId:{} switched.", - primaryConnectionContext.getConnectionAdapter().getRemoteAddress(), - primaryConnectionContext.getDeviceInfo().getNodeId()); - - } - - @Override - public CONTEXT_STATE getState() { + public ContextState getState() { return this.state; } @Override - public ListenableFuture stopClusterServices(boolean connectionInterrupted) { - final ListenableFuture deactivateTxManagerFuture = initialized + public ListenableFuture stopClusterServices() { + return initialized ? transactionChainManager.deactivateTransactionManager() : Futures.immediateFuture(null); - - if (!connectionInterrupted) { - final ListenableFuture makeSlaveFuture - = Futures.transform(makeDeviceSlave(), new Function, Void>() { - @Nullable - @Override - public Void apply(@Nullable RpcResult setRoleOutputRpcResult) { - return null; - } - }); - - Futures.addCallback(makeSlaveFuture, new FutureCallback() { - @Override - public void onSuccess(@Nullable Void aVoid) { - if (LOG.isDebugEnabled()) { - LOG.debug("Role SLAVE was successfully propagated on device, node {}", deviceInfo.getLOGValue()); - } - sendNodeAddedNotification(); - } - - @Override - public void onFailure(final Throwable throwable) { - LOG.warn("Was not able to set role SLAVE to device on node {} ", deviceInfo.getLOGValue()); - LOG.trace("Error occurred on device role setting, probably connection loss: ", throwable); - } - }); - - } - - return deactivateTxManagerFuture; } @Override @@ -692,14 +613,7 @@ public class DeviceContextImpl implements DeviceContext, ExtensionConverterProvi @Override public void close() { - this.state = CONTEXT_STATE.TERMINATION; - } - - @Override - public void putLifecycleServiceIntoTxChainManager(final LifecycleService lifecycleService){ - if (initialized) { - this.transactionChainManager.setLifecycleService(lifecycleService); - } + //NOOP } @Override @@ -722,32 +636,37 @@ public class DeviceContextImpl implements DeviceContext, ExtensionConverterProvi this.clusterInitializationPhaseHandler = handler; } - @Override - public void masterSuccessful(){ - this.wasOnceMaster = true; - } - @Override public boolean onContextInstantiateService(final MastershipChangeListener mastershipChangeListener) { LOG.info("Starting device context cluster services for node {}", deviceInfo.getLOGValue()); - lazyTransactionManagerInitialization(); - this.transactionChainManager.activateTransactionManager(); + try { + final List portStatusMessages = primaryConnectionContext + .retrieveAndClearPortStatusMessages(); + + portStatusMessages.forEach(this::writePortStatusMessage); + submitTransaction(); + } catch (final Exception ex) { + LOG.warn("Error processing port status messages from device {}", getDeviceInfo().getLOGValue(), ex); + return false; + } try { final java.util.Optional initializer = deviceInitializerProvider - .lookup(deviceInfo.getVersion()); + .lookup(deviceInfo.getVersion()); if (initializer.isPresent()) { - final MultipartWriterProvider writerProvider = MultipartWriterProviderFactory.createDefaultProvider(this); - initializer.get().initialize(this, switchFeaturesMandatory, writerProvider, convertorExecutor); + initializer + .get() + .initialize(this, switchFeaturesMandatory, writerProvider, convertorExecutor) + .get(DEVICE_INIT_TIMEOUT, TimeUnit.MILLISECONDS); } else { throw new ExecutionException(new ConnectionException("Unsupported version " + deviceInfo.getVersion())); } - } catch (ExecutionException | InterruptedException e) { - LOG.warn("Device {} cannot be initialized: ", deviceInfo.getLOGValue(), e); + } catch (ExecutionException | InterruptedException | TimeoutException ex) { + LOG.warn("Device {} cannot be initialized: ", deviceInfo.getLOGValue(), ex); return false; } @@ -755,7 +674,8 @@ public class DeviceContextImpl implements DeviceContext, ExtensionConverterProvi new RpcResultFutureCallback(mastershipChangeListener)); final ListenableFuture>> deviceFlowRegistryFill = getDeviceFlowRegistry().fill(); - Futures.addCallback(deviceFlowRegistryFill, new DeviceFlowRegistryCallback(deviceFlowRegistryFill)); + Futures.addCallback(deviceFlowRegistryFill, + new DeviceFlowRegistryCallback(deviceFlowRegistryFill, mastershipChangeListener)); return this.clusterInitializationPhaseHandler.onContextInstantiateService(mastershipChangeListener); } @@ -770,6 +690,7 @@ public class DeviceContextImpl implements DeviceContext, ExtensionConverterProvi this.deviceFlowRegistry = new DeviceFlowRegistryImpl(deviceInfo.getVersion(), dataBroker, deviceInfo.getNodeInstanceIdentifier()); this.deviceGroupRegistry = new DeviceGroupRegistryImpl(); this.deviceMeterRegistry = new DeviceMeterRegistryImpl(); + this.transactionChainManager.activateTransactionManager(); this.initialized = true; } } @@ -785,7 +706,7 @@ public class DeviceContextImpl implements DeviceContext, ExtensionConverterProvi } - ListenableFuture> sendRoleChangeToDevice(final OfpRole newRole) { + private ListenableFuture> sendRoleChangeToDevice(final OfpRole newRole) { if (LOG.isDebugEnabled()) { LOG.debug("Sending new role {} to device {}", newRole, deviceInfo.getNodeId()); } @@ -819,6 +740,11 @@ public class DeviceContextImpl implements DeviceContext, ExtensionConverterProvi return sendRoleChangeToDevice(OfpRole.BECOMESLAVE); } + @Override + public void onStateAcquired(final ContextChainState state) { + hasState = true; + } + private class RpcResultFutureCallback implements FutureCallback> { private final MastershipChangeListener mastershipChangeListener; @@ -829,31 +755,56 @@ public class DeviceContextImpl implements DeviceContext, ExtensionConverterProvi @Override public void onSuccess(@Nullable RpcResult setRoleOutputRpcResult) { + this.mastershipChangeListener.onMasterRoleAcquired( + deviceInfo, + ContextChainMastershipState.MASTER_ON_DEVICE + ); if (LOG.isDebugEnabled()) { LOG.debug("Role MASTER was successfully set on device, node {}", deviceInfo.getLOGValue()); } - sendNodeAddedNotification(); } @Override public void onFailure(final Throwable throwable) { - LOG.warn("Was not able to set MASTER role on device, node {}", deviceInfo.getLOGValue()); - mastershipChangeListener.onNotAbleToStartMastership(deviceInfo); + mastershipChangeListener.onNotAbleToStartMastershipMandatory( + deviceInfo, + "Was not able to set MASTER role on device"); } } private class DeviceFlowRegistryCallback implements FutureCallback>> { private final ListenableFuture>> deviceFlowRegistryFill; + private final MastershipChangeListener mastershipChangeListener; - DeviceFlowRegistryCallback(ListenableFuture>> deviceFlowRegistryFill) { + DeviceFlowRegistryCallback( + ListenableFuture>> deviceFlowRegistryFill, + MastershipChangeListener mastershipChangeListener) { this.deviceFlowRegistryFill = deviceFlowRegistryFill; + this.mastershipChangeListener = mastershipChangeListener; } @Override public void onSuccess(@Nullable List> result) { if (LOG.isDebugEnabled()) { - LOG.debug("Finished filling flow registry with flows for node: {}", deviceInfo.getLOGValue()); + // Count all flows we read from datastore for debugging purposes. + // This number do not always represent how many flows were actually added + // to DeviceFlowRegistry, because of possible duplicates. + long flowCount = Optional.fromNullable(result).asSet().stream() + .flatMap(Collection::stream) + .filter(Objects::nonNull) + .flatMap(flowCapableNodeOptional -> flowCapableNodeOptional.asSet().stream()) + .filter(Objects::nonNull) + .filter(flowCapableNode -> Objects.nonNull(flowCapableNode.getTable())) + .flatMap(flowCapableNode -> flowCapableNode.getTable().stream()) + .filter(Objects::nonNull) + .filter(table -> Objects.nonNull(table.getFlow())) + .flatMap(table -> table.getFlow().stream()) + .filter(Objects::nonNull) + .count(); + + LOG.debug("Finished filling flow registry with {} flows for node: {}", flowCount, deviceInfo.getLOGValue()); } + this.mastershipChangeListener.onMasterRoleAcquired(deviceInfo, ContextChainMastershipState.INITIAL_FLOW_REGISTRY_FILL); } @Override @@ -865,6 +816,10 @@ public class DeviceContextImpl implements DeviceContext, ExtensionConverterProvi } else { LOG.warn("Failed filling flow registry with flows for node: {} with exception: {}", deviceInfo.getLOGValue(), t); } + mastershipChangeListener.onNotAbleToStartMastership( + deviceInfo, + "Was not able to fill flow registry on device", + false); } }