X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;f=openflowplugin-impl%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fopenflowplugin%2Fimpl%2Fdevice%2FDeviceManagerImpl.java;h=f265aed5d50a83b01bc85c5b9aaa20183ef1ddae;hb=6e511c65a597995ee38bb1e78c6b24967ca755c1;hp=00cf64754abebf1a1cc58676bd156b00cf545ce3;hpb=3adcfd7384fb449022766506a23fdc5cb98bd79d;p=openflowplugin.git diff --git a/openflowplugin-impl/src/main/java/org/opendaylight/openflowplugin/impl/device/DeviceManagerImpl.java b/openflowplugin-impl/src/main/java/org/opendaylight/openflowplugin/impl/device/DeviceManagerImpl.java index 00cf64754a..362dbe1714 100644 --- a/openflowplugin-impl/src/main/java/org/opendaylight/openflowplugin/impl/device/DeviceManagerImpl.java +++ b/openflowplugin-impl/src/main/java/org/opendaylight/openflowplugin/impl/device/DeviceManagerImpl.java @@ -7,123 +7,96 @@ */ package org.opendaylight.openflowplugin.impl.device; +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Function; import com.google.common.base.Preconditions; +import com.google.common.base.Verify; +import com.google.common.collect.Iterators; import com.google.common.util.concurrent.AsyncFunction; import com.google.common.util.concurrent.FutureCallback; import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; -import io.netty.util.HashedWheelTimer; -import java.math.BigInteger; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collection; +import io.netty.util.TimerTask; import java.util.Collections; +import java.util.HashSet; import java.util.Iterator; -import java.util.List; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ExecutionException; import java.util.concurrent.ScheduledThreadPoolExecutor; import java.util.concurrent.TimeUnit; import javax.annotation.CheckForNull; import javax.annotation.Nonnull; import org.opendaylight.controller.md.sal.binding.api.DataBroker; -import org.opendaylight.controller.md.sal.binding.api.NotificationPublishService; -import org.opendaylight.controller.md.sal.binding.api.NotificationService; import org.opendaylight.controller.md.sal.binding.api.WriteTransaction; import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType; import org.opendaylight.openflowjava.protocol.api.connection.ConnectionAdapter; -import org.opendaylight.openflowjava.protocol.api.connection.OutboundQueue; -import org.opendaylight.openflowplugin.api.ConnectionException; -import org.opendaylight.openflowplugin.api.OFConstants; +import org.opendaylight.openflowjava.protocol.api.connection.OutboundQueueHandlerRegistration; +import org.opendaylight.openflowplugin.api.openflow.OFPContext; import org.opendaylight.openflowplugin.api.openflow.connection.ConnectionContext; import org.opendaylight.openflowplugin.api.openflow.connection.OutboundQueueProvider; import org.opendaylight.openflowplugin.api.openflow.device.DeviceContext; +import org.opendaylight.openflowplugin.api.openflow.device.DeviceInfo; import org.opendaylight.openflowplugin.api.openflow.device.DeviceManager; import org.opendaylight.openflowplugin.api.openflow.device.DeviceState; -import org.opendaylight.openflowplugin.api.openflow.device.MessageTranslator; -import org.opendaylight.openflowplugin.api.openflow.device.RequestContext; +import org.opendaylight.openflowplugin.api.openflow.device.DeviceSynchronizeListener; +import org.opendaylight.openflowplugin.api.openflow.device.DeviceValidListener; import org.opendaylight.openflowplugin.api.openflow.device.TranslatorLibrary; -import org.opendaylight.openflowplugin.api.openflow.device.Xid; import org.opendaylight.openflowplugin.api.openflow.device.handlers.DeviceInitializationPhaseHandler; -import org.opendaylight.openflowplugin.api.openflow.device.handlers.MultiMsgCollector; -import org.opendaylight.openflowplugin.api.openflow.md.core.TranslatorKey; -import org.opendaylight.openflowplugin.api.openflow.statistics.ofpspecific.MessageIntelligenceAgency; -import org.opendaylight.openflowplugin.impl.common.MultipartRequestInputFactory; -import org.opendaylight.openflowplugin.impl.common.NodeStaticReplyTranslatorUtil; +import org.opendaylight.openflowplugin.api.openflow.device.handlers.DeviceTerminationPhaseHandler; +import org.opendaylight.openflowplugin.api.openflow.lifecycle.LifecycleConductor; +import org.opendaylight.openflowplugin.api.openflow.statistics.StatisticsContext; +import org.opendaylight.openflowplugin.extension.api.ExtensionConverterProviderKeeper; +import org.opendaylight.openflowplugin.extension.api.core.extension.ExtensionConverterProvider; import org.opendaylight.openflowplugin.impl.connection.OutboundQueueProviderImpl; import org.opendaylight.openflowplugin.impl.device.listener.OpenflowProtocolListenerFullImpl; -import org.opendaylight.openflowplugin.impl.rpc.AbstractRequestContext; -import org.opendaylight.openflowplugin.impl.util.DeviceStateUtil; -import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowCapableNode; -import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowCapableNodeBuilder; -import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowCapableNodeConnector; -import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.Table; -import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.TableBuilder; -import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.TableKey; -import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.table.statistics.rev131215.FlowTableStatisticsData; -import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.table.statistics.rev131215.FlowTableStatisticsDataBuilder; -import org.opendaylight.yang.gen.v1.urn.opendaylight.group.statistics.rev131111.NodeGroupFeatures; -import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeConnectorId; +import org.opendaylight.openflowplugin.impl.util.DeviceInitializationUtils; import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.Nodes; import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodesBuilder; -import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.node.NodeConnector; -import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.node.NodeConnectorBuilder; import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node; -import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.NodeBuilder; -import org.opendaylight.yang.gen.v1.urn.opendaylight.meter.statistics.rev131111.NodeMeterFeatures; -import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.common.types.rev130731.Capabilities; -import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.common.types.rev130731.CapabilitiesV10; -import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.common.types.rev130731.MultipartType; -import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.MultipartReply; -import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.OfHeader; -import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.PortGrouping; -import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.multipart.reply.MultipartReplyBody; -import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.multipart.reply.multipart.reply.body.MultipartReplyDescCase; -import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.multipart.reply.multipart.reply.body.MultipartReplyGroupFeaturesCase; -import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.multipart.reply.multipart.reply.body.MultipartReplyMeterFeaturesCase; -import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.multipart.reply.multipart.reply.body.MultipartReplyPortDescCase; -import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.multipart.reply.multipart.reply.body.MultipartReplyTableFeaturesCase; -import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.multipart.reply.multipart.reply.body.multipart.reply.desc._case.MultipartReplyDesc; -import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.multipart.reply.multipart.reply.body.multipart.reply.group.features._case.MultipartReplyGroupFeatures; -import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.multipart.reply.multipart.reply.body.multipart.reply.meter.features._case.MultipartReplyMeterFeatures; -import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.multipart.reply.multipart.reply.body.multipart.reply.port.desc._case.MultipartReplyPortDesc; -import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.multipart.reply.multipart.reply.body.multipart.reply.table.features._case.MultipartReplyTableFeatures; -import org.opendaylight.yang.gen.v1.urn.opendaylight.port.statistics.rev131214.FlowCapableNodeConnectorStatisticsData; -import org.opendaylight.yang.gen.v1.urn.opendaylight.port.statistics.rev131214.FlowCapableNodeConnectorStatisticsDataBuilder; -import org.opendaylight.yang.gen.v1.urn.opendaylight.table.types.rev131026.table.features.TableFeatures; +import org.opendaylight.yang.gen.v1.urn.opendaylight.role.service.rev150727.OfpRole; import org.opendaylight.yangtools.yang.binding.InstanceIdentifier; -import org.opendaylight.yangtools.yang.common.RpcError; -import org.opendaylight.yangtools.yang.common.RpcResult; import org.slf4j.Logger; import org.slf4j.LoggerFactory; /** * */ -public class DeviceManagerImpl implements DeviceManager, AutoCloseable { +public class DeviceManagerImpl implements DeviceManager, ExtensionConverterProviderKeeper { private static final Logger LOG = LoggerFactory.getLogger(DeviceManagerImpl.class); - private static final long TICK_DURATION = 10; // 0.5 sec. - private ScheduledThreadPoolExecutor spyPool; + private final long globalNotificationQuota; + private final boolean switchFeaturesMandatory; + private boolean isNotificationFlowRemovedOff; + private final int spyRate = 10; private final DataBroker dataBroker; - private final HashedWheelTimer hashedWheelTimer; private TranslatorLibrary translatorLibrary; private DeviceInitializationPhaseHandler deviceInitPhaseHandler; - private NotificationService notificationService; - private NotificationPublishService notificationPublishService; + private DeviceTerminationPhaseHandler deviceTerminPhaseHandler; - private final List deviceContexts = new ArrayList(); - private final MessageIntelligenceAgency messageIntelligenceAgency; + private final ConcurrentMap deviceContexts = new ConcurrentHashMap<>(); + + private final long barrierIntervalNanos; + private final int barrierCountLimit; + private ExtensionConverterProvider extensionConverterProvider; + private ScheduledThreadPoolExecutor spyPool; + private Set deviceSynchronizedListeners; + private Set deviceValidListeners; - private final long barrierNanos = TimeUnit.MILLISECONDS.toNanos(500); - private final int maxQueueDepth = 25600; + private final LifecycleConductor conductor; public DeviceManagerImpl(@Nonnull final DataBroker dataBroker, - @Nonnull final MessageIntelligenceAgency messageIntelligenceAgency) { + final long globalNotificationQuota, final boolean switchFeaturesMandatory, + final long barrierInterval, final int barrierCountLimit, + final LifecycleConductor lifecycleConductor, boolean isNotificationFlowRemovedOff) { + this.switchFeaturesMandatory = switchFeaturesMandatory; + this.globalNotificationQuota = globalNotificationQuota; + this.isNotificationFlowRemovedOff = isNotificationFlowRemovedOff; this.dataBroker = Preconditions.checkNotNull(dataBroker); - hashedWheelTimer = new HashedWheelTimer(TICK_DURATION, TimeUnit.MILLISECONDS, 500); /* merge empty nodes to oper DS to predict any problems with missing parent for Node */ final WriteTransaction tx = dataBroker.newWriteOnlyTransaction(); @@ -137,402 +110,310 @@ public class DeviceManagerImpl implements DeviceManager, AutoCloseable { throw new IllegalStateException(e); } - this.messageIntelligenceAgency = messageIntelligenceAgency; + this.barrierIntervalNanos = TimeUnit.MILLISECONDS.toNanos(barrierInterval); + this.barrierCountLimit = barrierCountLimit; + + this.conductor = lifecycleConductor; + spyPool = new ScheduledThreadPoolExecutor(1); + this.deviceSynchronizedListeners = new HashSet<>(); + this.deviceValidListeners = new HashSet<>(); } + @Override public void setDeviceInitializationPhaseHandler(final DeviceInitializationPhaseHandler handler) { - deviceInitPhaseHandler = handler; + this.deviceInitPhaseHandler = handler; } @Override - public void onDeviceContextLevelUp(final DeviceContext deviceContext) { + public void onDeviceContextLevelUp(@CheckForNull DeviceInfo deviceInfo) throws Exception { // final phase - we have to add new Device to MD-SAL DataStore - Preconditions.checkNotNull(deviceContext); + LOG.debug("Final phase of DeviceContextLevelUp for Node: {} ", deviceInfo.getNodeId()); + DeviceContext deviceContext = Preconditions.checkNotNull(deviceContexts.get(deviceInfo)); ((DeviceContextImpl) deviceContext).initialSubmitTransaction(); - deviceContext.onPublished(); } @Override - public void deviceConnected(@CheckForNull final ConnectionContext connectionContext) { + public boolean deviceConnected(@CheckForNull final ConnectionContext connectionContext) throws Exception { Preconditions.checkArgument(connectionContext != null); + DeviceInfo deviceInfo = connectionContext.getDeviceInfo(); + /** + * This part prevent destroy another device context. Throwing here an exception result to propagate close connection + * in {@link org.opendaylight.openflowplugin.impl.connection.org.opendaylight.openflowplugin.impl.connection.HandshakeContextImpl} + * If context already exist we are in state closing process (connection flapping) and we should not propagate connection close + */ + if (deviceContexts.containsKey(deviceInfo)) { + LOG.warn("Rejecting connection from node which is already connected and there exist deviceContext for it: {}", connectionContext.getNodeId()); + return false; + } + + LOG.info("ConnectionEvent: Device connected to controller, Device:{}, NodeId:{}", + connectionContext.getConnectionAdapter().getRemoteAddress(), deviceInfo.getNodeId()); + + // Add Disconnect handler + connectionContext.setDeviceDisconnectedHandler(DeviceManagerImpl.this); // Cache this for clarity final ConnectionAdapter connectionAdapter = connectionContext.getConnectionAdapter(); //FIXME: as soon as auxiliary connection are fully supported then this is needed only before device context published connectionAdapter.setPacketInFiltering(true); - final Short version = connectionContext.getFeatures().getVersion(); - final OutboundQueueProvider outboundQueueProvider = new OutboundQueueProviderImpl(version); + final OutboundQueueProvider outboundQueueProvider = new OutboundQueueProviderImpl(deviceInfo.getVersion()); - final DeviceState deviceState = new DeviceStateImpl(connectionContext.getFeatures(), connectionContext.getNodeId()); + connectionContext.setOutboundQueueProvider(outboundQueueProvider); + final OutboundQueueHandlerRegistration outboundQueueHandlerRegistration = + connectionAdapter.registerOutboundQueueHandler(outboundQueueProvider, barrierCountLimit, barrierIntervalNanos); + connectionContext.setOutboundQueueHandleRegistration(outboundQueueHandlerRegistration); - final DeviceContext deviceContext = new DeviceContextImpl(connectionContext, deviceState, dataBroker, hashedWheelTimer, messageIntelligenceAgency); - deviceContext.registerOutboundQueueProvider(outboundQueueProvider, maxQueueDepth, barrierNanos); - deviceContext.setNotificationService(notificationService); - deviceContext.setNotificationPublishService(notificationPublishService); - final NodeBuilder nodeBuilder = new NodeBuilder().setId(deviceState.getNodeId()).setNodeConnector(Collections.emptyList()); - deviceContext.writeToTransaction(LogicalDatastoreType.OPERATIONAL, deviceState.getNodeInstanceIdentifier(), nodeBuilder.build()); + final DeviceState deviceState = new DeviceStateImpl(deviceInfo); + this.addDeviceSynchronizeListener(deviceState); + this.addDeviceValidListener(deviceState); - connectionContext.setDeviceDisconnectedHandler(deviceContext); - deviceContext.setTranslatorLibrary(translatorLibrary); - deviceContext.addDeviceContextClosedHandler(this); + final DeviceContext deviceContext = new DeviceContextImpl(connectionContext, + deviceState, + dataBroker, + conductor, + outboundQueueProvider, + translatorLibrary, + this); - final OpenflowProtocolListenerFullImpl messageListener = new OpenflowProtocolListenerFullImpl( - connectionAdapter, deviceContext); - connectionAdapter.setMessageListener(messageListener); + Verify.verify(deviceContexts.putIfAbsent(deviceInfo, deviceContext) == null, "DeviceCtx still not closed."); - final ListenableFuture>>> deviceFeaturesFuture; - if (OFConstants.OFP_VERSION_1_0 == version) { - final CapabilitiesV10 capabilitiesV10 = connectionContext.getFeatures().getCapabilitiesV10(); + ((ExtensionConverterProviderKeeper) deviceContext).setExtensionConverterProvider(extensionConverterProvider); + deviceContext.setNotificationPublishService(conductor.getNotificationPublishService()); - DeviceStateUtil.setDeviceStateBasedOnV10Capabilities(deviceState, capabilitiesV10); + updatePacketInRateLimiters(); - deviceFeaturesFuture = createDeviceFeaturesForOF10(deviceContext, deviceState); - // create empty tables after device description is processed - chainTableTrunkWriteOF10(deviceContext, deviceFeaturesFuture); - - final short ofVersion = deviceContext.getDeviceState().getVersion(); - final TranslatorKey translatorKey = new TranslatorKey(ofVersion, PortGrouping.class.getName()); - final MessageTranslator translator = deviceContext.oook().lookupTranslator(translatorKey); - final BigInteger dataPathId = deviceContext.getPrimaryConnectionContext().getFeatures().getDatapathId(); - - for (final PortGrouping port : connectionContext.getFeatures().getPhyPort()) { - final FlowCapableNodeConnector fcNodeConnector = translator.translate(port, deviceContext, null); + final OpenflowProtocolListenerFullImpl messageListener = new OpenflowProtocolListenerFullImpl( + connectionAdapter, deviceContext); + connectionAdapter.setMessageListener(messageListener); + notifyDeviceValidListeners(deviceInfo, true); - final NodeConnectorId nodeConnectorId = NodeStaticReplyTranslatorUtil.nodeConnectorId(dataPathId.toString(), port.getPortNo(), ofVersion); - final NodeConnectorBuilder ncBuilder = new NodeConnectorBuilder().setId(nodeConnectorId); - ncBuilder.addAugmentation(FlowCapableNodeConnector.class, fcNodeConnector); - ncBuilder.addAugmentation(FlowCapableNodeConnectorStatisticsData.class, new FlowCapableNodeConnectorStatisticsDataBuilder().build()); - final NodeConnector connector = ncBuilder.build(); - final InstanceIdentifier connectorII = deviceState.getNodeInstanceIdentifier().child(NodeConnector.class, connector.getKey()); - deviceContext.writeToTransaction(LogicalDatastoreType.OPERATIONAL, connectorII, connector); - } - } else if (OFConstants.OFP_VERSION_1_3 == version) { - final Capabilities capabilities = connectionContext.getFeatures().getCapabilities(); - LOG.debug("Setting capabilities for device {}", deviceContext.getDeviceState().getNodeId()); - DeviceStateUtil.setDeviceStateBasedOnV13Capabilities(deviceState, capabilities); - deviceFeaturesFuture = createDeviceFeaturesForOF13(deviceContext, deviceState); - } else { - deviceFeaturesFuture = Futures.immediateFailedFuture(new ConnectionException("Unsupported version " + version)); - } + deviceInitPhaseHandler.onDeviceContextLevelUp(connectionContext.getDeviceInfo()); - Futures.addCallback(deviceFeaturesFuture, new FutureCallback>>>() { - @Override - public void onSuccess(final List>> result) { - deviceContext.getDeviceState().setValid(true); - deviceInitPhaseHandler.onDeviceContextLevelUp(deviceContext); - LOG.trace("Device context level up called."); - } + notifyDeviceSynchronizeListeners(deviceInfo, true); - @Override - public void onFailure(final Throwable t) { - // FIXME : remove session - LOG.trace("Device capabilities gathering future failed."); - LOG.trace("more info in exploration failure..", t); - } - }); + return true; } - private static void chainTableTrunkWriteOF10(final DeviceContext deviceContext, final ListenableFuture>>> deviceFeaturesFuture) { - Futures.addCallback(deviceFeaturesFuture, new FutureCallback>>>() { - @Override - public void onSuccess(final List>> results) { - boolean allSucceeded = true; - for (final RpcResult> rpcResult : results) { - allSucceeded &= rpcResult.isSuccessful(); + private void updatePacketInRateLimiters() { + synchronized (deviceContexts) { + final int deviceContextsSize = deviceContexts.size(); + if (deviceContextsSize > 0) { + long freshNotificationLimit = globalNotificationQuota / deviceContextsSize; + if (freshNotificationLimit < 100) { + freshNotificationLimit = 100; } - if (allSucceeded) { - createEmptyFlowCapableNodeInDs(deviceContext); - makeEmptyTables(deviceContext, deviceContext.getDeviceState().getNodeInstanceIdentifier(), - deviceContext.getDeviceState().getFeatures().getTables()); + LOG.debug("fresh notification limit = {}", freshNotificationLimit); + for (final DeviceContext deviceContext : deviceContexts.values()) { + deviceContext.updatePacketInRateLimit(freshNotificationLimit); } } - - @Override - public void onFailure(final Throwable t) { - //NOOP - } - }); + } } - - private static ListenableFuture>>> createDeviceFeaturesForOF10(final DeviceContext deviceContext, - final DeviceState deviceState) { - final ListenableFuture>> replyDesc = getNodeStaticInfo(MultipartType.OFPMPDESC, - deviceContext, - deviceState.getNodeInstanceIdentifier(), - deviceState.getVersion()); - - return Futures.allAsList(Arrays.asList(replyDesc)); + @Override + public TranslatorLibrary oook() { + return translatorLibrary; } - private static ListenableFuture>>> createDeviceFeaturesForOF13(final DeviceContext deviceContext, - final DeviceState deviceState) { - - final ListenableFuture>> replyDesc = getNodeStaticInfo(MultipartType.OFPMPDESC, - deviceContext, - deviceState.getNodeInstanceIdentifier(), - deviceState.getVersion()); + @Override + public void setTranslatorLibrary(final TranslatorLibrary translatorLibrary) { + this.translatorLibrary = translatorLibrary; + } - //first process description reply, write data to DS and write consequent data if successful - return Futures.transform(replyDesc, new AsyncFunction>, List>>>() { - @Override - public ListenableFuture>>> apply(final RpcResult> rpcResult) throws Exception { - - translateAndWriteReply(MultipartType.OFPMPDESC, deviceContext, deviceState.getNodeInstanceIdentifier(), rpcResult.getResult()); - - final ListenableFuture>> replyMeterFeature = getNodeStaticInfo(MultipartType.OFPMPMETERFEATURES, - deviceContext, - deviceState.getNodeInstanceIdentifier(), - deviceState.getVersion()); - - createSuccessProcessingCallback(MultipartType.OFPMPMETERFEATURES, - deviceContext, - deviceState.getNodeInstanceIdentifier(), - replyMeterFeature); - - final ListenableFuture>> replyGroupFeatures = getNodeStaticInfo(MultipartType.OFPMPGROUPFEATURES, - deviceContext, - deviceState.getNodeInstanceIdentifier(), - deviceState.getVersion()); - createSuccessProcessingCallback(MultipartType.OFPMPGROUPFEATURES, - deviceContext, - deviceState.getNodeInstanceIdentifier(), - replyGroupFeatures); - - final ListenableFuture>> replyTableFeatures = getNodeStaticInfo(MultipartType.OFPMPTABLEFEATURES, - deviceContext, - deviceState.getNodeInstanceIdentifier(), - deviceState.getVersion()); - createSuccessProcessingCallback(MultipartType.OFPMPTABLEFEATURES, - deviceContext, - deviceState.getNodeInstanceIdentifier(), - replyTableFeatures); - - final ListenableFuture>> replyPortDescription = getNodeStaticInfo(MultipartType.OFPMPPORTDESC, - deviceContext, - deviceState.getNodeInstanceIdentifier(), - deviceState.getVersion()); - createSuccessProcessingCallback(MultipartType.OFPMPPORTDESC, - deviceContext, - deviceState.getNodeInstanceIdentifier(), - replyPortDescription); - - - return Futures.allAsList(Arrays.asList( - replyMeterFeature, - replyGroupFeatures, -// replyTableFeatures, - replyPortDescription)); - } - }); + @Override + public void close() { + for (final Iterator iterator = Iterators.consumingIterator(deviceContexts.values().iterator()); + iterator.hasNext();) { + final DeviceContext deviceCtx = iterator.next(); + notifyDeviceValidListeners(deviceCtx.getDeviceInfo(), false); + deviceCtx.shutdownConnection(); + deviceCtx.shuttingDownDataStoreTransactions(); + } + if (spyPool != null) { + spyPool.shutdownNow(); + spyPool = null; + } } @Override - public TranslatorLibrary oook() { - return translatorLibrary; + public void onDeviceContextLevelDown(final DeviceInfo deviceInfo) { + LOG.debug("onDeviceContextClosed for Node {}", deviceInfo.getNodeId()); + deviceContexts.remove(deviceInfo); + updatePacketInRateLimiters(); } @Override - public void setTranslatorLibrary(final TranslatorLibrary translatorLibrary) { - this.translatorLibrary = translatorLibrary; + public void initialize() { + spyPool.scheduleAtFixedRate(conductor.getMessageIntelligenceAgency(), spyRate, spyRate, TimeUnit.SECONDS); } - private static ListenableFuture>> getNodeStaticInfo(final MultipartType type, final DeviceContext deviceContext, - final InstanceIdentifier nodeII, final short version) { - - final OutboundQueue queue = deviceContext.getPrimaryConnectionContext().getOutboundQueueProvider(); + @Override + public void setExtensionConverterProvider(final ExtensionConverterProvider extensionConverterProvider) { + this.extensionConverterProvider = extensionConverterProvider; + } - final Long reserved = deviceContext.getReservedXid(); - final RequestContext> requestContext = new AbstractRequestContext>(reserved) { - @Override - public void close() { - //NOOP - } - }; + @Override + public ExtensionConverterProvider getExtensionConverterProvider() { + return extensionConverterProvider; + } - final Xid xid = requestContext.getXid(); + @Override + public void setDeviceTerminationPhaseHandler(final DeviceTerminationPhaseHandler handler) { + this.deviceTerminPhaseHandler = handler; + } - LOG.trace("Hooking xid {} to device context - precaution.", reserved); + @Override + public void onDeviceDisconnected(final ConnectionContext connectionContext) { + LOG.trace("onDeviceDisconnected method call for Node: {}", connectionContext.getNodeId()); + final DeviceInfo deviceInfo = connectionContext.getDeviceInfo(); + final DeviceContext deviceCtx = this.deviceContexts.get(deviceInfo); + + if (null == deviceCtx) { + LOG.info("DeviceContext for Node {} was not found. Connection is terminated without OFP context suite.", deviceInfo.getNodeId()); + return; + } - final MultiMsgCollector multiMsgCollector = deviceContext.getMultiMsgCollector(requestContext); - queue.commitEntry(xid.getValue(), MultipartRequestInputFactory.makeMultipartRequestInput(xid.getValue(), version, type), new FutureCallback() { - @Override - public void onSuccess(final OfHeader ofHeader) { - if (ofHeader instanceof MultipartReply) { - final MultipartReply multipartReply = (MultipartReply) ofHeader; - multiMsgCollector.addMultipartMsg(multipartReply); - } else if (null != ofHeader) { - LOG.info("Unexpected response type received {}.", ofHeader.getClass()); - } else { - multiMsgCollector.endCollecting(); - LOG.info("Response received is null."); + if (!connectionContext.equals(deviceCtx.getPrimaryConnectionContext())) { + /* Connection is not PrimaryConnection so try to remove from Auxiliary Connections */ + deviceCtx.removeAuxiliaryConnectionContext(connectionContext); + } else { + notifyDeviceValidListeners(deviceInfo, false); + /* Device is disconnected and so we need to close TxManager */ + final ListenableFuture future = deviceCtx.shuttingDownDataStoreTransactions(); + Futures.addCallback(future, new FutureCallback() { + + @Override + public void onSuccess(final Void result) { + LOG.debug("TxChainManager for device {} is closed successful.", deviceInfo.getNodeId()); + deviceTerminPhaseHandler.onDeviceContextLevelDown(deviceInfo); } - } - @Override - public void onFailure(final Throwable t) { - LOG.info("Fail response from OutboundQueue for multipart type {}.", type, t); - requestContext.close(); - if (MultipartType.OFPMPTABLEFEATURES.equals(type)) { - makeEmptyTables(deviceContext, nodeII, deviceContext.getPrimaryConnectionContext().getFeatures().getTables()); + @Override + public void onFailure(final Throwable t) { + LOG.warn("TxChainManager for device {} failed by closing.", deviceInfo.getNodeId(), t); + deviceTerminPhaseHandler.onDeviceContextLevelDown(deviceInfo); } - } - }); - - return requestContext.getFuture(); + }); + /* Add timer for Close TxManager because it could fain ind cluster without notification */ + final TimerTask timerTask = timeout -> { + if (!future.isDone()) { + LOG.info("Shutting down TxChain for node {} not completed during 10 sec. Continue anyway.", deviceInfo.getNodeId()); + future.cancel(false); + } + }; + conductor.newTimeout(timerTask, 10, TimeUnit.SECONDS); + } } - private static void createSuccessProcessingCallback(final MultipartType type, final DeviceContext deviceContext, final InstanceIdentifier nodeII, final ListenableFuture>> requestContextFuture) { - Futures.addCallback(requestContextFuture, new FutureCallback>>() { - @Override - public void onSuccess(final RpcResult> rpcResult) { - final List result = rpcResult.getResult(); - if (result != null) { - LOG.info("Static node {} info: {} collected", nodeII.toString(), type); - translateAndWriteReply(type, deviceContext, nodeII, result); - } else { - final Iterator rpcErrorIterator = rpcResult.getErrors().iterator(); - while (rpcErrorIterator.hasNext()) { - final RpcError rpcError = rpcErrorIterator.next(); - LOG.info("Failed to retrieve static node {} info: {}", type, rpcError.getMessage()); - if (null != rpcError.getCause()) { - LOG.trace("Detailed error:", rpcError.getCause()); - } - } - if (MultipartType.OFPMPTABLEFEATURES.equals(type)) { - makeEmptyTables(deviceContext, nodeII, deviceContext.getPrimaryConnectionContext().getFeatures().getTables()); - } - } - } + @VisibleForTesting + void addDeviceContextToMap(final DeviceInfo deviceInfo, final DeviceContext deviceContext){ + deviceContexts.put(deviceInfo, deviceContext); + } - @Override - public void onFailure(final Throwable throwable) { - LOG.info("Request of type {} for static info of node {} failed.", type, nodeII); - } - }); + @Override + public T gainContext(final DeviceInfo deviceInfo) { + return (T) deviceContexts.get(deviceInfo); } - // FIXME : remove after ovs tableFeatures fix - private static void makeEmptyTables(final DeviceContext dContext, final InstanceIdentifier nodeII, final Short nrOfTables) { - LOG.debug("About to create {} empty tables.", nrOfTables); - for (int i = 0; i < nrOfTables; i++) { - final short tId = (short) i; - final InstanceIdentifier tableII = nodeII.augmentation(FlowCapableNode.class).child(Table.class, new TableKey(tId)); - final TableBuilder tableBuilder = new TableBuilder().setId(tId).addAugmentation(FlowTableStatisticsData.class, new FlowTableStatisticsDataBuilder().build()); - dContext.writeToTransaction(LogicalDatastoreType.OPERATIONAL, tableII, tableBuilder.build()); + @Override + public ListenableFuture onClusterRoleChange(final DeviceInfo deviceInfo, final OfpRole role) { + DeviceContext deviceContext = conductor.getDeviceContext(deviceInfo); + LOG.trace("onClusterRoleChange {} for node:", role, deviceInfo.getNodeId()); + if (OfpRole.BECOMEMASTER.equals(role)) { + return onDeviceTakeClusterLeadership(deviceInfo); } + return ((DeviceContextImpl)deviceContext).getTransactionChainManager().deactivateTransactionManager(); } - private static void translateAndWriteReply(final MultipartType type, final DeviceContext dContext, - final InstanceIdentifier nodeII, final Collection result) { - for (final MultipartReply reply : result) { - final MultipartReplyBody body = reply.getMultipartReplyBody(); - switch (type) { - case OFPMPDESC: - Preconditions.checkArgument(body instanceof MultipartReplyDescCase); - final MultipartReplyDesc replyDesc = ((MultipartReplyDescCase) body).getMultipartReplyDesc(); - final FlowCapableNode fcNode = NodeStaticReplyTranslatorUtil.nodeDescTranslator(replyDesc); - final InstanceIdentifier fNodeII = nodeII.augmentation(FlowCapableNode.class); - dContext.writeToTransaction(LogicalDatastoreType.OPERATIONAL, fNodeII, fcNode); - break; - - case OFPMPTABLEFEATURES: - Preconditions.checkArgument(body instanceof MultipartReplyTableFeaturesCase); - final MultipartReplyTableFeatures tableFeatures = ((MultipartReplyTableFeaturesCase) body).getMultipartReplyTableFeatures(); - final List tables = NodeStaticReplyTranslatorUtil.nodeTableFeatureTranslator(tableFeatures); - for (final TableFeatures table : tables) { - final Short tableId = table.getTableId(); - final InstanceIdentifier
tableII = nodeII.augmentation(FlowCapableNode.class).child(Table.class, new TableKey(tableId)); - final TableBuilder tableBuilder = new TableBuilder().setId(tableId).setTableFeatures(Collections.singletonList(table)); - tableBuilder.addAugmentation(FlowTableStatisticsData.class, new FlowTableStatisticsDataBuilder().build()); - dContext.writeToTransaction(LogicalDatastoreType.OPERATIONAL, tableII, tableBuilder.build()); - } - break; - - case OFPMPMETERFEATURES: - Preconditions.checkArgument(body instanceof MultipartReplyMeterFeaturesCase); - final MultipartReplyMeterFeatures meterFeatures = ((MultipartReplyMeterFeaturesCase) body).getMultipartReplyMeterFeatures(); - final NodeMeterFeatures mFeature = NodeStaticReplyTranslatorUtil.nodeMeterFeatureTranslator(meterFeatures); - final InstanceIdentifier mFeatureII = nodeII.augmentation(NodeMeterFeatures.class); - dContext.writeToTransaction(LogicalDatastoreType.OPERATIONAL, mFeatureII, mFeature); - if (0L < mFeature.getMeterFeatures().getMaxMeter().getValue()) { - dContext.getDeviceState().setMeterAvailable(true); - } - break; - - case OFPMPGROUPFEATURES: - Preconditions.checkArgument(body instanceof MultipartReplyGroupFeaturesCase); - final MultipartReplyGroupFeatures groupFeatures = ((MultipartReplyGroupFeaturesCase) body).getMultipartReplyGroupFeatures(); - final NodeGroupFeatures gFeature = NodeStaticReplyTranslatorUtil.nodeGroupFeatureTranslator(groupFeatures); - final InstanceIdentifier gFeatureII = nodeII.augmentation(NodeGroupFeatures.class); - dContext.writeToTransaction(LogicalDatastoreType.OPERATIONAL, gFeatureII, gFeature); - break; - - case OFPMPPORTDESC: - Preconditions.checkArgument(body instanceof MultipartReplyPortDescCase); - final MultipartReplyPortDesc portDesc = ((MultipartReplyPortDescCase) body).getMultipartReplyPortDesc(); - for (final PortGrouping port : portDesc.getPorts()) { - final short ofVersion = dContext.getDeviceState().getVersion(); - final TranslatorKey translatorKey = new TranslatorKey(ofVersion, PortGrouping.class.getName()); - final MessageTranslator translator = dContext.oook().lookupTranslator(translatorKey); - final FlowCapableNodeConnector fcNodeConnector = translator.translate(port, dContext, null); - - final BigInteger dataPathId = dContext.getPrimaryConnectionContext().getFeatures().getDatapathId(); - final NodeConnectorId nodeConnectorId = NodeStaticReplyTranslatorUtil.nodeConnectorId(dataPathId.toString(), port.getPortNo(), ofVersion); - final NodeConnectorBuilder ncBuilder = new NodeConnectorBuilder().setId(nodeConnectorId); - ncBuilder.addAugmentation(FlowCapableNodeConnector.class, fcNodeConnector); - - ncBuilder.addAugmentation(FlowCapableNodeConnectorStatisticsData.class, new FlowCapableNodeConnectorStatisticsDataBuilder().build()); - final NodeConnector connector = ncBuilder.build(); - - final InstanceIdentifier connectorII = nodeII.child(NodeConnector.class, connector.getKey()); - dContext.writeToTransaction(LogicalDatastoreType.OPERATIONAL, connectorII, connector); - } - - break; - - default: - throw new IllegalArgumentException("Unnexpected MultipartType " + type); - } - } + @Override + public void addDeviceSynchronizeListener(final DeviceSynchronizeListener deviceSynchronizeListener) { + this.deviceSynchronizedListeners.add(deviceSynchronizeListener); } @Override - public void setNotificationService(final NotificationService notificationServiceParam) { - notificationService = notificationServiceParam; + public void notifyDeviceSynchronizeListeners(final DeviceInfo deviceInfo, final boolean deviceSynchronized) { + for (DeviceSynchronizeListener listener : deviceSynchronizedListeners) { + listener.deviceIsSynchronized(deviceInfo, deviceSynchronized); + } } @Override - public void setNotificationPublishService(final NotificationPublishService notificationService) { - notificationPublishService = notificationService; + public void addDeviceValidListener(final DeviceValidListener deviceValidListener) { + this.deviceValidListeners.add(deviceValidListener); } @Override - public void close() throws Exception { - for (final DeviceContext deviceContext : deviceContexts) { - deviceContext.close(); + public void notifyDeviceValidListeners(final DeviceInfo deviceInfo, final boolean deviceValid) { + for (DeviceValidListener listener : deviceValidListeners) { + listener.deviceIsValid(deviceInfo, deviceValid); } } - private static void createEmptyFlowCapableNodeInDs(final DeviceContext deviceContext) { - final FlowCapableNodeBuilder flowCapableNodeBuilder = new FlowCapableNodeBuilder(); - final InstanceIdentifier fNodeII = deviceContext.getDeviceState().getNodeInstanceIdentifier().augmentation(FlowCapableNode.class); - deviceContext.writeToTransaction(LogicalDatastoreType.OPERATIONAL, fNodeII, flowCapableNodeBuilder.build()); + @Override + public void setIsNotificationFlowRemovedOff(boolean isNotificationFlowRemovedOff) { + this.isNotificationFlowRemovedOff = isNotificationFlowRemovedOff; } @Override - public void onDeviceContextClosed(final DeviceContext deviceContext) { - deviceContexts.remove(deviceContext); + public boolean getIsNotificationFlowRemovedOff() { + return this.isNotificationFlowRemovedOff; } - @Override - public void initialize() { - spyPool = new ScheduledThreadPoolExecutor(1); - spyPool.scheduleAtFixedRate(messageIntelligenceAgency, spyRate, spyRate, TimeUnit.SECONDS); + private ListenableFuture onDeviceTakeClusterLeadership(final DeviceInfo deviceInfo) { + LOG.trace("onDeviceTakeClusterLeadership for node: {}", deviceInfo.getNodeId()); + /* validation */ + StatisticsContext statisticsContext = conductor.getStatisticsContext(deviceInfo); + if (statisticsContext == null) { + final String errMsg = String.format("DeviceCtx %s is up but we are missing StatisticsContext", deviceInfo.getDatapathId()); + LOG.warn(errMsg); + return Futures.immediateFailedFuture(new IllegalStateException(errMsg)); + } + DeviceContext deviceContext = conductor.getDeviceContext(deviceInfo); + /* Prepare init info collecting */ + notifyDeviceSynchronizeListeners(deviceInfo, false); + ((DeviceContextImpl)deviceContext).getTransactionChainManager().activateTransactionManager(); + /* Init Collecting NodeInfo */ + final ListenableFuture initCollectingDeviceInfo = DeviceInitializationUtils.initializeNodeInformation( + deviceContext, switchFeaturesMandatory); + /* Init Collecting StatInfo */ + final ListenableFuture statPollFuture = Futures.transform(initCollectingDeviceInfo, + new AsyncFunction() { + + @Override + public ListenableFuture apply(@Nonnull final Void input) throws Exception { + statisticsContext.statListForCollectingInitialization(); + return statisticsContext.initialGatherDynamicData(); + } + }); + + return Futures.transform(statPollFuture, new Function() { + + @Override + public Void apply(final Boolean input) { + if (ConnectionContext.CONNECTION_STATE.RIP.equals(conductor.gainConnectionStateSafely(deviceInfo))) { + final String errMsg = String.format("We lost connection for Device %s, context has to be closed.", + deviceInfo.getNodeId()); + LOG.warn(errMsg); + throw new IllegalStateException(errMsg); + } + if (!input) { + final String errMsg = String.format("Get Initial Device %s information fails", + deviceInfo.getNodeId()); + LOG.warn(errMsg); + throw new IllegalStateException(errMsg); + } + LOG.debug("Get Initial Device {} information is successful", deviceInfo.getNodeId()); + notifyDeviceSynchronizeListeners(deviceInfo, true); + ((DeviceContextImpl)deviceContext).getTransactionChainManager().initialSubmitWriteTransaction(); + deviceContext.getDeviceState().setStatisticsPollingEnabledProp(true); + return null; + } + }); } + }