X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;f=openflowplugin-impl%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fopenflowplugin%2Fimpl%2Fdevice%2FDeviceManagerImpl.java;h=7fc078ccd4ec76b3feca7d9cd5cac18a0f970c85;hb=3814bf8d87a764c519c1a92cbb9da14651bf905a;hp=0560e945d5034c25fe9ad27e5103ae9e9b316f20;hpb=812ba2457c5a2699b62ae3c62db60a2b23cdcf09;p=openflowplugin.git diff --git a/openflowplugin-impl/src/main/java/org/opendaylight/openflowplugin/impl/device/DeviceManagerImpl.java b/openflowplugin-impl/src/main/java/org/opendaylight/openflowplugin/impl/device/DeviceManagerImpl.java index 0560e945d5..7fc078ccd4 100644 --- a/openflowplugin-impl/src/main/java/org/opendaylight/openflowplugin/impl/device/DeviceManagerImpl.java +++ b/openflowplugin-impl/src/main/java/org/opendaylight/openflowplugin/impl/device/DeviceManagerImpl.java @@ -8,18 +8,19 @@ package org.opendaylight.openflowplugin.impl.device; import com.google.common.base.Preconditions; +import com.google.common.base.Verify; +import com.google.common.collect.Iterators; import com.google.common.util.concurrent.FutureCallback; import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; -import com.google.common.util.concurrent.SettableFuture; import io.netty.util.HashedWheelTimer; -import java.math.BigInteger; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collection; +import io.netty.util.Timeout; +import io.netty.util.TimerTask; import java.util.Collections; import java.util.Iterator; -import java.util.List; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.ExecutionException; import java.util.concurrent.ScheduledThreadPoolExecutor; import java.util.concurrent.TimeUnit; import javax.annotation.CheckForNull; @@ -29,286 +30,164 @@ import org.opendaylight.controller.md.sal.binding.api.NotificationPublishService import org.opendaylight.controller.md.sal.binding.api.NotificationService; import org.opendaylight.controller.md.sal.binding.api.WriteTransaction; import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType; -import org.opendaylight.openflowjava.protocol.api.connection.OutboundQueue; -import org.opendaylight.openflowplugin.api.ConnectionException; -import org.opendaylight.openflowplugin.api.OFConstants; +import org.opendaylight.openflowjava.protocol.api.connection.ConnectionAdapter; +import org.opendaylight.openflowjava.protocol.api.connection.OutboundQueueHandlerRegistration; import org.opendaylight.openflowplugin.api.openflow.connection.ConnectionContext; import org.opendaylight.openflowplugin.api.openflow.connection.OutboundQueueProvider; -import org.opendaylight.openflowplugin.api.openflow.connection.ThrottledNotificationsOfferer; import org.opendaylight.openflowplugin.api.openflow.device.DeviceContext; import org.opendaylight.openflowplugin.api.openflow.device.DeviceManager; import org.opendaylight.openflowplugin.api.openflow.device.DeviceState; -import org.opendaylight.openflowplugin.api.openflow.device.MessageTranslator; -import org.opendaylight.openflowplugin.api.openflow.device.RequestContext; -import org.opendaylight.openflowplugin.api.openflow.device.RequestContextStack; import org.opendaylight.openflowplugin.api.openflow.device.TranslatorLibrary; -import org.opendaylight.openflowplugin.api.openflow.device.Xid; import org.opendaylight.openflowplugin.api.openflow.device.handlers.DeviceInitializationPhaseHandler; -import org.opendaylight.openflowplugin.api.openflow.device.handlers.MultiMsgCollector; -import org.opendaylight.openflowplugin.api.openflow.md.core.TranslatorKey; +import org.opendaylight.openflowplugin.api.openflow.device.handlers.DeviceTerminationPhaseHandler; import org.opendaylight.openflowplugin.api.openflow.statistics.ofpspecific.MessageIntelligenceAgency; -import org.opendaylight.openflowplugin.impl.common.MultipartRequestInputFactory; -import org.opendaylight.openflowplugin.impl.common.NodeStaticReplyTranslatorUtil; +import org.opendaylight.openflowplugin.extension.api.ExtensionConverterProviderKeeper; +import org.opendaylight.openflowplugin.extension.api.core.extension.ExtensionConverterProvider; import org.opendaylight.openflowplugin.impl.connection.OutboundQueueProviderImpl; -import org.opendaylight.openflowplugin.impl.connection.ThrottledNotificationsOffererImpl; import org.opendaylight.openflowplugin.impl.device.listener.OpenflowProtocolListenerFullImpl; -import org.opendaylight.openflowplugin.impl.rpc.RequestContextImpl; -import org.opendaylight.openflowplugin.impl.util.DeviceStateUtil; -import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowCapableNode; -import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowCapableNodeBuilder; -import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowCapableNodeConnector; -import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.Table; -import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.TableBuilder; -import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.TableKey; -import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.table.statistics.rev131215.FlowTableStatisticsData; -import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.table.statistics.rev131215.FlowTableStatisticsDataBuilder; -import org.opendaylight.yang.gen.v1.urn.opendaylight.group.statistics.rev131111.NodeGroupFeatures; -import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeConnectorId; +import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeId; import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.Nodes; import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodesBuilder; -import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.node.NodeConnector; -import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.node.NodeConnectorBuilder; import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node; -import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.NodeBuilder; -import org.opendaylight.yang.gen.v1.urn.opendaylight.meter.statistics.rev131111.NodeMeterFeatures; -import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.common.types.rev130731.Capabilities; -import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.common.types.rev130731.CapabilitiesV10; -import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.common.types.rev130731.MultipartType; -import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.MultipartReply; -import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.OfHeader; -import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.PacketInMessage; -import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.PortGrouping; -import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.multipart.reply.MultipartReplyBody; -import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.multipart.reply.multipart.reply.body.MultipartReplyDescCase; -import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.multipart.reply.multipart.reply.body.MultipartReplyGroupFeaturesCase; -import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.multipart.reply.multipart.reply.body.MultipartReplyMeterFeaturesCase; -import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.multipart.reply.multipart.reply.body.MultipartReplyPortDescCase; -import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.multipart.reply.multipart.reply.body.MultipartReplyTableFeaturesCase; -import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.multipart.reply.multipart.reply.body.multipart.reply.desc._case.MultipartReplyDesc; -import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.multipart.reply.multipart.reply.body.multipart.reply.group.features._case.MultipartReplyGroupFeatures; -import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.multipart.reply.multipart.reply.body.multipart.reply.meter.features._case.MultipartReplyMeterFeatures; -import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.multipart.reply.multipart.reply.body.multipart.reply.port.desc._case.MultipartReplyPortDesc; -import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.multipart.reply.multipart.reply.body.multipart.reply.table.features._case.MultipartReplyTableFeatures; -import org.opendaylight.yang.gen.v1.urn.opendaylight.port.statistics.rev131214.FlowCapableNodeConnectorStatisticsData; -import org.opendaylight.yang.gen.v1.urn.opendaylight.port.statistics.rev131214.FlowCapableNodeConnectorStatisticsDataBuilder; -import org.opendaylight.yang.gen.v1.urn.opendaylight.table.types.rev131026.table.features.TableFeatures; import org.opendaylight.yangtools.yang.binding.InstanceIdentifier; -import org.opendaylight.yangtools.yang.common.RpcError; -import org.opendaylight.yangtools.yang.common.RpcResult; import org.slf4j.Logger; import org.slf4j.LoggerFactory; /** * */ -public class DeviceManagerImpl implements DeviceManager, AutoCloseable { +public class DeviceManagerImpl implements DeviceManager, ExtensionConverterProviderKeeper { private static final Logger LOG = LoggerFactory.getLogger(DeviceManagerImpl.class); private static final long TICK_DURATION = 10; // 0.5 sec. + private final long globalNotificationQuota; + private final boolean switchFeaturesMandatory; + private ScheduledThreadPoolExecutor spyPool; private final int spyRate = 10; private final DataBroker dataBroker; private final HashedWheelTimer hashedWheelTimer; - private RequestContextStack emptyRequestContextStack; private TranslatorLibrary translatorLibrary; private DeviceInitializationPhaseHandler deviceInitPhaseHandler; + private DeviceTerminationPhaseHandler deviceTerminPhaseHandler; private NotificationService notificationService; private NotificationPublishService notificationPublishService; - private ThrottledNotificationsOfferer throttledNotificationsOfferer; - private final List deviceContexts = new ArrayList(); + private final ConcurrentMap deviceContexts = new ConcurrentHashMap<>(); private final MessageIntelligenceAgency messageIntelligenceAgency; - private final long barrierNanos = TimeUnit.MILLISECONDS.toNanos(500); - private final int maxQueueDepth = 25600; + private final long barrierIntervalNanos; + private final int barrierCountLimit; + private ExtensionConverterProvider extensionConverterProvider; public DeviceManagerImpl(@Nonnull final DataBroker dataBroker, - @Nonnull final MessageIntelligenceAgency messageIntelligenceAgency) { + @Nonnull final MessageIntelligenceAgency messageIntelligenceAgency, + final long globalNotificationQuota, final boolean switchFeaturesMandatory, + final long barrierInterval, final int barrierCountLimit) { + this.switchFeaturesMandatory = switchFeaturesMandatory; + this.globalNotificationQuota = globalNotificationQuota; this.dataBroker = Preconditions.checkNotNull(dataBroker); hashedWheelTimer = new HashedWheelTimer(TICK_DURATION, TimeUnit.MILLISECONDS, 500); /* merge empty nodes to oper DS to predict any problems with missing parent for Node */ final WriteTransaction tx = dataBroker.newWriteOnlyTransaction(); - tx.merge(LogicalDatastoreType.OPERATIONAL, InstanceIdentifier.create(Nodes.class), new NodesBuilder().build()); - tx.submit(); - - this.messageIntelligenceAgency = messageIntelligenceAgency; - emptyRequestContextStack = new RequestContextStack() { - @Override - public void forgetRequestContext(final RequestContext requestContext) { - //NOOP - } - - @Override - public SettableFuture> storeOrFail(final RequestContext data) { - return data.getFuture(); - } + final NodesBuilder nodesBuilder = new NodesBuilder(); + nodesBuilder.setNode(Collections.emptyList()); + tx.merge(LogicalDatastoreType.OPERATIONAL, InstanceIdentifier.create(Nodes.class), nodesBuilder.build()); + try { + tx.submit().get(); + } catch (ExecutionException | InterruptedException e) { + LOG.error("Creation of node failed.", e); + throw new IllegalStateException(e); + } - @Override - public RequestContext createRequestContext() { - return new RequestContextImpl<>(this); - } - }; + this.messageIntelligenceAgency = messageIntelligenceAgency; + this.barrierIntervalNanos = TimeUnit.MILLISECONDS.toNanos(barrierInterval); + this.barrierCountLimit = barrierCountLimit; } + @Override public void setDeviceInitializationPhaseHandler(final DeviceInitializationPhaseHandler handler) { - deviceInitPhaseHandler = handler; + this.deviceInitPhaseHandler = handler; } @Override - public void onDeviceContextLevelUp(final DeviceContext deviceContext) { + public void onDeviceContextLevelUp(final DeviceContext deviceContext) throws Exception { // final phase - we have to add new Device to MD-SAL DataStore + LOG.debug("Final phase of DeviceContextLevelUp for Node: {} ", deviceContext.getDeviceState().getNodeId()); Preconditions.checkNotNull(deviceContext); - ((DeviceContextImpl) deviceContext).submitTransaction(); + ((DeviceContextImpl) deviceContext).initialSubmitTransaction(); + deviceContext.onPublished(); } @Override - public void deviceConnected(@CheckForNull final ConnectionContext connectionContext) { + public void deviceConnected(@CheckForNull final ConnectionContext connectionContext) throws Exception { Preconditions.checkArgument(connectionContext != null); + Preconditions.checkState(!deviceContexts.containsKey(connectionContext.getNodeId()), + "Rejecting connection from node which is already connected and there exist deviceContext for it: {}", + connectionContext.getNodeId() + ); + LOG.info("ConnectionEvent: Device connected to controller, Device:{}, NodeId:{}", + connectionContext.getConnectionAdapter().getRemoteAddress(), connectionContext.getNodeId()); + + // Add Disconnect handler + connectionContext.setDeviceDisconnectedHandler(DeviceManagerImpl.this); + // Cache this for clarity + final ConnectionAdapter connectionAdapter = connectionContext.getConnectionAdapter(); + + //FIXME: as soon as auxiliary connection are fully supported then this is needed only before device context published + connectionAdapter.setPacketInFiltering(true); final Short version = connectionContext.getFeatures().getVersion(); - OutboundQueueProvider outboundQueueProvider = new OutboundQueueProviderImpl(version); - connectionContext.getConnectionAdapter().registerOutboundQueueHandler(outboundQueueProvider, maxQueueDepth, barrierNanos); + final OutboundQueueProvider outboundQueueProvider = new OutboundQueueProviderImpl(version); + connectionContext.setOutboundQueueProvider(outboundQueueProvider); + final OutboundQueueHandlerRegistration outboundQueueHandlerRegistration = + connectionAdapter.registerOutboundQueueHandler(outboundQueueProvider, barrierCountLimit, barrierIntervalNanos); + connectionContext.setOutboundQueueHandleRegistration(outboundQueueHandlerRegistration); - final DeviceState deviceState = new DeviceStateImpl(connectionContext.getFeatures(), connectionContext.getNodeId()); + final DeviceState deviceState = createDeviceState(connectionContext); + final DeviceContext deviceContext = new DeviceContextImpl(connectionContext, deviceState, dataBroker, + hashedWheelTimer, messageIntelligenceAgency, outboundQueueProvider, translatorLibrary, switchFeaturesMandatory); - final DeviceContext deviceContext = new DeviceContextImpl(connectionContext, deviceState, dataBroker, hashedWheelTimer, messageIntelligenceAgency, throttledNotificationsOfferer); + Verify.verify(deviceContexts.putIfAbsent(connectionContext.getNodeId(), deviceContext) == null, "DeviceCtx still not closed."); + deviceContext.addDeviceContextClosedHandler(this); + ((ExtensionConverterProviderKeeper) deviceContext).setExtensionConverterProvider(extensionConverterProvider); deviceContext.setNotificationService(notificationService); deviceContext.setNotificationPublishService(notificationPublishService); - deviceContext.writeToTransaction(LogicalDatastoreType.OPERATIONAL, deviceState.getNodeInstanceIdentifier(), new NodeBuilder().setId(deviceState.getNodeId()).build()); - connectionContext.setDeviceDisconnectedHandler(deviceContext); - deviceContext.setTranslatorLibrary(translatorLibrary); - deviceContext.addDeviceContextClosedHandler(this); + updatePacketInRateLimiters(); final OpenflowProtocolListenerFullImpl messageListener = new OpenflowProtocolListenerFullImpl( - connectionContext.getConnectionAdapter(), deviceContext); - - final ListenableFuture>>> deviceFeaturesFuture; - - - - if (OFConstants.OFP_VERSION_1_0 == version) { - final CapabilitiesV10 capabilitiesV10 = connectionContext.getFeatures().getCapabilitiesV10(); - - DeviceStateUtil.setDeviceStateBasedOnV10Capabilities(deviceState, capabilitiesV10); - - deviceFeaturesFuture = createDeviceFeaturesForOF10(deviceContext, deviceState); - // create empty tables after device description is processed - chainTableTrunkWriteOF10(deviceContext, deviceFeaturesFuture); - - for (final PortGrouping port : connectionContext.getFeatures().getPhyPort()) { - final short ofVersion = deviceContext.getDeviceState().getVersion(); - final TranslatorKey translatorKey = new TranslatorKey(ofVersion, PortGrouping.class.getName()); - final MessageTranslator translator = deviceContext.oook().lookupTranslator(translatorKey); - final FlowCapableNodeConnector fcNodeConnector = translator.translate(port, deviceContext, null); - - final BigInteger dataPathId = deviceContext.getPrimaryConnectionContext().getFeatures().getDatapathId(); - final NodeConnectorId nodeConnectorId = NodeStaticReplyTranslatorUtil.nodeConnectorId(dataPathId.toString(), port.getPortNo(), ofVersion); - final NodeConnectorBuilder ncBuilder = new NodeConnectorBuilder().setId(nodeConnectorId); - ncBuilder.addAugmentation(FlowCapableNodeConnector.class, fcNodeConnector); - ncBuilder.addAugmentation(FlowCapableNodeConnectorStatisticsData.class, new FlowCapableNodeConnectorStatisticsDataBuilder().build()); - final NodeConnector connector = ncBuilder.build(); - final InstanceIdentifier connectorII = deviceState.getNodeInstanceIdentifier().child(NodeConnector.class, connector.getKey()); - deviceContext.writeToTransaction(LogicalDatastoreType.OPERATIONAL, connectorII, connector); - } - } else if (OFConstants.OFP_VERSION_1_3 == version) { - final Capabilities capabilities = connectionContext.getFeatures().getCapabilities(); - LOG.debug("Setting capabilities for device {}", deviceContext.getDeviceState().getNodeId()); - DeviceStateUtil.setDeviceStateBasedOnV13Capabilities(deviceState, capabilities); - deviceFeaturesFuture = createDeviceFeaturesForOF13(deviceContext, deviceState); - } else { - deviceFeaturesFuture = Futures.immediateFailedFuture(new ConnectionException("Unsupported version " + version)); - } + connectionAdapter, deviceContext); + connectionAdapter.setMessageListener(messageListener); + deviceState.setValid(true); - Futures.addCallback(deviceFeaturesFuture, new FutureCallback>>>() { - @Override - public void onSuccess(final List>> result) { - deviceContext.getDeviceState().setValid(true); - deviceInitPhaseHandler.onDeviceContextLevelUp(deviceContext); - } + deviceInitPhaseHandler.onDeviceContextLevelUp(deviceContext); + } - @Override - public void onFailure(final Throwable t) { - // FIXME : remove session - LOG.trace("Device capabilities gathering future failed."); - LOG.trace("more info in exploration failure..", t); - } - }); + private static DeviceStateImpl createDeviceState(final @Nonnull ConnectionContext connectionContext) { + return new DeviceStateImpl(connectionContext.getFeatures(), connectionContext.getNodeId()); } - private static void chainTableTrunkWriteOF10(final DeviceContext deviceContext, final ListenableFuture>>> deviceFeaturesFuture) { - Futures.addCallback(deviceFeaturesFuture, new FutureCallback>>>() { - @Override - public void onSuccess(final List>> results) { - boolean allSucceeded = true; - for (final RpcResult> rpcResult : results) { - allSucceeded &= rpcResult.isSuccessful(); + private void updatePacketInRateLimiters() { + synchronized (deviceContexts) { + final int deviceContextsSize = deviceContexts.size(); + if (deviceContextsSize > 0) { + long freshNotificationLimit = globalNotificationQuota / deviceContextsSize; + if (freshNotificationLimit < 100) { + freshNotificationLimit = 100; } - if (allSucceeded) { - createEmptyFlowCapableNodeInDs(deviceContext); - makeEmptyTables(deviceContext, deviceContext.getDeviceState().getNodeInstanceIdentifier(), - deviceContext.getDeviceState().getFeatures().getTables()); + LOG.debug("fresh notification limit = {}", freshNotificationLimit); + for (final DeviceContext deviceContext : deviceContexts.values()) { + deviceContext.updatePacketInRateLimit(freshNotificationLimit); } } - - @Override - public void onFailure(final Throwable t) { - //NOOP - } - }); - } - - private ListenableFuture>> processReplyDesc(final DeviceContext deviceContext, - final DeviceState deviceState) { - - final ListenableFuture>> replyDesc = getNodeStaticInfo(MultipartType.OFPMPDESC, - deviceContext, - deviceState.getNodeInstanceIdentifier(), - deviceState.getVersion()); - return replyDesc; - } - - private ListenableFuture>>> createDeviceFeaturesForOF10(final DeviceContext deviceContext, - final DeviceState deviceState) { - return Futures.allAsList(Arrays.asList(processReplyDesc(deviceContext, deviceState))); - } - - private ListenableFuture>>> createDeviceFeaturesForOF13(final DeviceContext deviceContext, - final DeviceState deviceState) { - final ListenableFuture>> replyDesc = processReplyDesc(deviceContext, deviceState); - - final ListenableFuture>> replyMeterFeature = getNodeStaticInfo(MultipartType.OFPMPMETERFEATURES, - deviceContext, - deviceState.getNodeInstanceIdentifier(), - deviceState.getVersion()); - - final ListenableFuture>> replyGroupFeatures = getNodeStaticInfo(MultipartType.OFPMPGROUPFEATURES, - deviceContext, - deviceState.getNodeInstanceIdentifier(), - deviceState.getVersion()); - - final ListenableFuture>> replyTableFeatures = getNodeStaticInfo(MultipartType.OFPMPTABLEFEATURES, - deviceContext, - deviceState.getNodeInstanceIdentifier(), - deviceState.getVersion()); - - final ListenableFuture>> replyPortDescription = getNodeStaticInfo(MultipartType.OFPMPPORTDESC, - deviceContext, - deviceState.getNodeInstanceIdentifier(), - deviceState.getVersion()); - - return Futures.allAsList(Arrays.asList(replyDesc, - replyMeterFeature, - replyGroupFeatures, -// replyTableFeatures, - replyPortDescription)); - + } } @Override @@ -321,204 +200,105 @@ public class DeviceManagerImpl implements DeviceManager, AutoCloseable { this.translatorLibrary = translatorLibrary; } - private ListenableFuture>> getNodeStaticInfo(final MultipartType type, final DeviceContext deviceContext, - final InstanceIdentifier nodeII, final short version) { - - final OutboundQueue queue = deviceContext.getPrimaryConnectionContext().getOutboundQueueProvider(); - Long reservedXid = queue.reserveEntry(); - final Xid xid = new Xid(reservedXid); - - final RequestContext> requestContext = emptyRequestContextStack.createRequestContext(); - requestContext.setXid(xid); - - LOG.trace("Hooking xid {} to device context - precaution.", requestContext.getXid().getValue()); - deviceContext.hookRequestCtx(requestContext.getXid(), requestContext); - - final ListenableFuture>> requestContextFuture = requestContext.getFuture(); - - final MultiMsgCollector multiMsgCollector = deviceContext.getMultiMsgCollector(); - multiMsgCollector.registerMultipartXid(xid.getValue()); - queue.commitEntry(reservedXid, MultipartRequestInputFactory.makeMultipartRequestInput(xid.getValue(), version, type), new FutureCallback() { - @Override - public void onSuccess(final OfHeader ofHeader) { - if (ofHeader instanceof MultipartReply) { - MultipartReply multipartReply = (MultipartReply) ofHeader; - multiMsgCollector.addMultipartMsg(multipartReply); - } else { - if (null != ofHeader) { - LOG.info("Unexpected response type received {}.", ofHeader.getClass()); - } else { - LOG.info("Response received is null."); - } - } - - } - - @Override - public void onFailure(final Throwable t) { - LOG.info("Fail response from OutboundQueue."); - } - }); - - Futures.addCallback(requestContextFuture, new FutureCallback>>() { - @Override - public void onSuccess(final RpcResult> rpcResult) { - final List result = rpcResult.getResult(); - if (result != null) { - LOG.info("Static node {} info: {} collected", nodeII.toString(), type); - translateAndWriteReply(type, deviceContext, nodeII, result); - } else { - final Iterator rpcErrorIterator = rpcResult.getErrors().iterator(); - while (rpcErrorIterator.hasNext()) { - final RpcError rpcError = rpcErrorIterator.next(); - LOG.info("Failed to retrieve static node {} info: {}", type, rpcError.getMessage()); - if (null != rpcError.getCause()) { - LOG.trace("Detailed error:", rpcError.getCause()); - } - } - if (MultipartType.OFPMPTABLEFEATURES.equals(type)) { - makeEmptyTables(deviceContext, nodeII, deviceContext.getPrimaryConnectionContext().getFeatures().getTables()); - } - } - } - - @Override - public void onFailure(final Throwable throwable) { - LOG.info("Request of type {} for static info of node {} failed.", type, nodeII); - } - }); - - -/* - final ListenableFuture> rpcFuture = JdkFutureAdapters.listenInPoolThread(deviceContext.getPrimaryConnectionContext().getConnectionAdapter() - .multipartRequest(MultipartRequestInputFactory.makeMultipartRequestInput(xid.getValue(), version, type))); - final OFJResult2RequestCtxFuture OFJResult2RequestCtxFuture = new OFJResult2RequestCtxFuture(requestContext, deviceContext); - OFJResult2RequestCtxFuture.processResultFromOfJava(rpcFuture); -*/ - - return requestContext.getFuture(); + @Override + public void setNotificationService(final NotificationService notificationServiceParam) { + notificationService = notificationServiceParam; } - // FIXME : remove after ovs tableFeatures fix - private static void makeEmptyTables(final DeviceContext dContext, final InstanceIdentifier nodeII, final Short nrOfTables) { - LOG.debug("About to create {} empty tables.", nrOfTables); - for (int i = 0; i < nrOfTables; i++) { - final short tId = (short) i; - final InstanceIdentifier tableII = nodeII.augmentation(FlowCapableNode.class).child(Table.class, new TableKey(tId)); - final TableBuilder tableBuilder = new TableBuilder().setId(tId).addAugmentation(FlowTableStatisticsData.class, new FlowTableStatisticsDataBuilder().build()); - dContext.writeToTransaction(LogicalDatastoreType.OPERATIONAL, tableII, tableBuilder.build()); - } + @Override + public void setNotificationPublishService(final NotificationPublishService notificationService) { + notificationPublishService = notificationService; } - private static void translateAndWriteReply(final MultipartType type, final DeviceContext dContext, - final InstanceIdentifier nodeII, final Collection result) { - for (final MultipartReply reply : result) { - final MultipartReplyBody body = reply.getMultipartReplyBody(); - switch (type) { - case OFPMPDESC: - Preconditions.checkArgument(body instanceof MultipartReplyDescCase); - final MultipartReplyDesc replyDesc = ((MultipartReplyDescCase) body).getMultipartReplyDesc(); - final FlowCapableNode fcNode = NodeStaticReplyTranslatorUtil.nodeDescTranslator(replyDesc); - final InstanceIdentifier fNodeII = nodeII.augmentation(FlowCapableNode.class); - dContext.writeToTransaction(LogicalDatastoreType.OPERATIONAL, fNodeII, fcNode); - break; - - case OFPMPTABLEFEATURES: - Preconditions.checkArgument(body instanceof MultipartReplyTableFeaturesCase); - final MultipartReplyTableFeatures tableFeatures = ((MultipartReplyTableFeaturesCase) body).getMultipartReplyTableFeatures(); - final List tables = NodeStaticReplyTranslatorUtil.nodeTableFeatureTranslator(tableFeatures); - for (final TableFeatures table : tables) { - final Short tableId = table.getTableId(); - final InstanceIdentifier
tableII = nodeII.augmentation(FlowCapableNode.class).child(Table.class, new TableKey(tableId)); - final TableBuilder tableBuilder = new TableBuilder().setId(tableId).setTableFeatures(Collections.singletonList(table)); - tableBuilder.addAugmentation(FlowTableStatisticsData.class, new FlowTableStatisticsDataBuilder().build()); - dContext.writeToTransaction(LogicalDatastoreType.OPERATIONAL, tableII, tableBuilder.build()); - } - break; - - case OFPMPMETERFEATURES: - Preconditions.checkArgument(body instanceof MultipartReplyMeterFeaturesCase); - final MultipartReplyMeterFeatures meterFeatures = ((MultipartReplyMeterFeaturesCase) body).getMultipartReplyMeterFeatures(); - final NodeMeterFeatures mFeature = NodeStaticReplyTranslatorUtil.nodeMeterFeatureTranslator(meterFeatures); - final InstanceIdentifier mFeatureII = nodeII.augmentation(NodeMeterFeatures.class); - dContext.writeToTransaction(LogicalDatastoreType.OPERATIONAL, mFeatureII, mFeature); - if (0L < mFeature.getMeterFeatures().getMaxMeter().getValue()) { - dContext.getDeviceState().setMeterAvailable(true); - } - break; - - case OFPMPGROUPFEATURES: - Preconditions.checkArgument(body instanceof MultipartReplyGroupFeaturesCase); - final MultipartReplyGroupFeatures groupFeatures = ((MultipartReplyGroupFeaturesCase) body).getMultipartReplyGroupFeatures(); - final NodeGroupFeatures gFeature = NodeStaticReplyTranslatorUtil.nodeGroupFeatureTranslator(groupFeatures); - final InstanceIdentifier gFeatureII = nodeII.augmentation(NodeGroupFeatures.class); - dContext.writeToTransaction(LogicalDatastoreType.OPERATIONAL, gFeatureII, gFeature); - break; - - case OFPMPPORTDESC: - Preconditions.checkArgument(body instanceof MultipartReplyPortDescCase); - final MultipartReplyPortDesc portDesc = ((MultipartReplyPortDescCase) body).getMultipartReplyPortDesc(); - for (final PortGrouping port : portDesc.getPorts()) { - final short ofVersion = dContext.getDeviceState().getVersion(); - final TranslatorKey translatorKey = new TranslatorKey(ofVersion, PortGrouping.class.getName()); - final MessageTranslator translator = dContext.oook().lookupTranslator(translatorKey); - final FlowCapableNodeConnector fcNodeConnector = translator.translate(port, dContext, null); - - final BigInteger dataPathId = dContext.getPrimaryConnectionContext().getFeatures().getDatapathId(); - final NodeConnectorId nodeConnectorId = NodeStaticReplyTranslatorUtil.nodeConnectorId(dataPathId.toString(), port.getPortNo(), ofVersion); - final NodeConnectorBuilder ncBuilder = new NodeConnectorBuilder().setId(nodeConnectorId); - ncBuilder.addAugmentation(FlowCapableNodeConnector.class, fcNodeConnector); - ncBuilder.addAugmentation(FlowCapableNodeConnectorStatisticsData.class, new FlowCapableNodeConnectorStatisticsDataBuilder().build()); - final NodeConnector connector = ncBuilder.build(); - - final InstanceIdentifier connectorII = nodeII.child(NodeConnector.class, connector.getKey()); - dContext.writeToTransaction(LogicalDatastoreType.OPERATIONAL, connectorII, connector); - } - - break; + @Override + public void close() { + for (final Iterator iterator = Iterators.consumingIterator(deviceContexts.values().iterator()); + iterator.hasNext();) { + final DeviceContext deviceCtx = iterator.next(); + deviceCtx.shutdownConnection(); + deviceCtx.shuttingDownDataStoreTransactions(); + } - default: - throw new IllegalArgumentException("Unnexpected MultipartType " + type); - } + if (spyPool != null) { + spyPool.shutdownNow(); + spyPool = null; } } @Override - public void setNotificationService(final NotificationService notificationServiceParam) { - notificationService = notificationServiceParam; + public void onDeviceContextLevelDown(final DeviceContext deviceContext) { + LOG.debug("onDeviceContextClosed for Node {}", deviceContext.getDeviceState().getNodeId()); + deviceContexts.remove(deviceContext.getPrimaryConnectionContext().getNodeId(), deviceContext); + updatePacketInRateLimiters(); } @Override - public void setNotificationPublishService(final NotificationPublishService notificationService) { - this.notificationPublishService = notificationService; + public void initialize() { + spyPool = new ScheduledThreadPoolExecutor(1); + spyPool.scheduleAtFixedRate(messageIntelligenceAgency, spyRate, spyRate, TimeUnit.SECONDS); } @Override - public void close() throws Exception { - for (final DeviceContext deviceContext : deviceContexts) { - deviceContext.close(); - } - if (throttledNotificationsOfferer != null) { - throttledNotificationsOfferer.close(); - } + public void setExtensionConverterProvider(final ExtensionConverterProvider extensionConverterProvider) { + this.extensionConverterProvider = extensionConverterProvider; } - private static void createEmptyFlowCapableNodeInDs(final DeviceContext deviceContext) { - final FlowCapableNodeBuilder flowCapableNodeBuilder = new FlowCapableNodeBuilder(); - final InstanceIdentifier fNodeII = deviceContext.getDeviceState().getNodeInstanceIdentifier().augmentation(FlowCapableNode.class); - deviceContext.writeToTransaction(LogicalDatastoreType.OPERATIONAL, fNodeII, flowCapableNodeBuilder.build()); + @Override + public ExtensionConverterProvider getExtensionConverterProvider() { + return extensionConverterProvider; } @Override - public void onDeviceContextClosed(final DeviceContext deviceContext) { - deviceContexts.remove(deviceContext); + public void setDeviceTerminationPhaseHandler(final DeviceTerminationPhaseHandler handler) { + this.deviceTerminPhaseHandler = handler; } @Override - public void initialize() { - spyPool = new ScheduledThreadPoolExecutor(1); - spyPool.scheduleAtFixedRate(messageIntelligenceAgency, spyRate, spyRate, TimeUnit.SECONDS); + public void onDeviceDisconnected(final ConnectionContext connectionContext) { + LOG.trace("onDeviceDisconnected method call for Node: {}", connectionContext.getNodeId()); + Preconditions.checkArgument(connectionContext != null); + final NodeId nodeId = connectionContext.getNodeId(); + final DeviceContext deviceCtx = this.deviceContexts.get(nodeId); + + if (null == deviceCtx) { + LOG.info("DeviceContext for Node {} was not found. Connection is terminated without OFP context suite.", + connectionContext.getNodeId()); + return; + } - throttledNotificationsOfferer = new ThrottledNotificationsOffererImpl<>(notificationPublishService, messageIntelligenceAgency); + if (!connectionContext.equals(deviceCtx.getPrimaryConnectionContext())) { + /* Connection is not PrimaryConnection so try to remove from Auxiliary Connections */ + deviceCtx.removeAuxiliaryConnectionContext(connectionContext); + } else { + /* Device is disconnected and so we need to close TxManager */ + final ListenableFuture future = deviceCtx.shuttingDownDataStoreTransactions(); + Futures.addCallback(future, new FutureCallback() { + + @Override + public void onSuccess(final Void result) { + LOG.debug("TxChainManager for device {} is closed successful.", deviceCtx.getDeviceState().getNodeId()); + deviceTerminPhaseHandler.onDeviceContextLevelDown(deviceCtx); + } + + @Override + public void onFailure(final Throwable t) { + LOG.warn("TxChainManager for device {} failed by closing.", deviceCtx.getDeviceState().getNodeId(), t); + deviceTerminPhaseHandler.onDeviceContextLevelDown(deviceCtx); + } + }); + /* Add timer for Close TxManager because it could fain ind cluster without notification */ + final TimerTask timerTask = new TimerTask() { + + @Override + public void run(final Timeout timeout) throws Exception { + if (!future.isDone()) { + LOG.info("Shutting down TxChain for node {} not completed during 10 sec. Continue anyway.", + deviceCtx.getDeviceState().getNodeId()); + future.cancel(false); + } + } + }; + deviceCtx.getTimer().newTimeout(timerTask, 10, TimeUnit.SECONDS); + } } }