X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;f=openflowplugin-impl%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fopenflowplugin%2Fimpl%2Fdevice%2FDeviceContextImpl.java;h=d24e4c8426d33ee4bf2f4a8fca02b75980eaee2a;hb=4b34ce62d369b80533e57e0bc0e8bc0ac7c496b3;hp=f7a73b4112fca8c02b8051496f2b54ac16f35297;hpb=c1da4a885ca995216d7f95b755bc7bf4147b09f9;p=openflowplugin.git diff --git a/openflowplugin-impl/src/main/java/org/opendaylight/openflowplugin/impl/device/DeviceContextImpl.java b/openflowplugin-impl/src/main/java/org/opendaylight/openflowplugin/impl/device/DeviceContextImpl.java index f7a73b4112..d24e4c8426 100644 --- a/openflowplugin-impl/src/main/java/org/opendaylight/openflowplugin/impl/device/DeviceContextImpl.java +++ b/openflowplugin-impl/src/main/java/org/opendaylight/openflowplugin/impl/device/DeviceContextImpl.java @@ -9,32 +9,36 @@ package org.opendaylight.openflowplugin.impl.device; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; -import com.google.common.util.concurrent.SettableFuture; +import com.google.common.util.concurrent.FutureCallback; +import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.ListenableFuture; import io.netty.util.HashedWheelTimer; import io.netty.util.Timeout; import java.math.BigInteger; +import java.util.Collection; import java.util.HashMap; -import java.util.Iterator; +import java.util.HashSet; import java.util.List; import java.util.Map; -import java.util.TreeMap; -import java.util.concurrent.atomic.AtomicLong; import javax.annotation.Nonnull; import org.opendaylight.controller.md.sal.binding.api.DataBroker; +import org.opendaylight.controller.md.sal.binding.api.NotificationPublishService; +import org.opendaylight.controller.md.sal.binding.api.NotificationService; import org.opendaylight.controller.md.sal.binding.api.ReadTransaction; import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType; -import org.opendaylight.controller.sal.binding.api.NotificationProviderService; +import org.opendaylight.openflowjava.protocol.api.connection.ConnectionAdapter; +import org.opendaylight.openflowjava.protocol.api.connection.OutboundQueue; import org.opendaylight.openflowplugin.api.openflow.connection.ConnectionContext; +import org.opendaylight.openflowplugin.api.openflow.connection.OutboundQueueProvider; import org.opendaylight.openflowplugin.api.openflow.device.DeviceContext; import org.opendaylight.openflowplugin.api.openflow.device.DeviceState; import org.opendaylight.openflowplugin.api.openflow.device.MessageTranslator; import org.opendaylight.openflowplugin.api.openflow.device.RequestContext; import org.opendaylight.openflowplugin.api.openflow.device.TranslatorLibrary; import org.opendaylight.openflowplugin.api.openflow.device.Xid; -import org.opendaylight.openflowplugin.api.openflow.device.exception.DeviceDataException; import org.opendaylight.openflowplugin.api.openflow.device.handlers.DeviceContextClosedHandler; import org.opendaylight.openflowplugin.api.openflow.device.handlers.DeviceDisconnectedHandler; -import org.opendaylight.openflowplugin.api.openflow.device.listener.OpenflowMessageListenerFacade; +import org.opendaylight.openflowplugin.api.openflow.device.handlers.MultiMsgCollector; import org.opendaylight.openflowplugin.api.openflow.md.core.SwitchConnectionDistinguisher; import org.opendaylight.openflowplugin.api.openflow.md.core.TranslatorKey; import org.opendaylight.openflowplugin.api.openflow.registry.flow.DeviceFlowRegistry; @@ -42,6 +46,7 @@ import org.opendaylight.openflowplugin.api.openflow.registry.group.DeviceGroupRe import org.opendaylight.openflowplugin.api.openflow.registry.meter.DeviceMeterRegistry; import org.opendaylight.openflowplugin.api.openflow.statistics.ofpspecific.MessageSpy; import org.opendaylight.openflowplugin.impl.common.NodeStaticReplyTranslatorUtil; +import org.opendaylight.openflowplugin.impl.device.listener.MultiMsgCollectorImpl; import org.opendaylight.openflowplugin.impl.registry.flow.DeviceFlowRegistryImpl; import org.opendaylight.openflowplugin.impl.registry.group.DeviceGroupRegistryImpl; import org.opendaylight.openflowplugin.impl.registry.meter.DeviceMeterRegistryImpl; @@ -64,14 +69,10 @@ import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731 import org.opendaylight.yang.gen.v1.urn.opendaylight.packet.service.rev130709.PacketReceived; import org.opendaylight.yang.gen.v1.urn.opendaylight.port.statistics.rev131214.FlowCapableNodeConnectorStatisticsData; import org.opendaylight.yang.gen.v1.urn.opendaylight.port.statistics.rev131214.FlowCapableNodeConnectorStatisticsDataBuilder; -import org.opendaylight.yang.gen.v1.urn.opendaylight.table.types.rev131026.TableFeatures; import org.opendaylight.yangtools.yang.binding.ChildOf; import org.opendaylight.yangtools.yang.binding.DataObject; import org.opendaylight.yangtools.yang.binding.InstanceIdentifier; import org.opendaylight.yangtools.yang.binding.KeyedInstanceIdentifier; -import org.opendaylight.yangtools.yang.common.RpcError; -import org.opendaylight.yangtools.yang.common.RpcResult; -import org.opendaylight.yangtools.yang.common.RpcResultBuilder; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -82,68 +83,81 @@ public class DeviceContextImpl implements DeviceContext { private static final Logger LOG = LoggerFactory.getLogger(DeviceContextImpl.class); + // TODO: watermarks should be derived from effective rpc limit (75%|95%) + private static final int PACKETIN_LOW_WATERMARK = 15000; + private static final int PACKETIN_HIGH_WATERMARK = 19000; + // TODO: drain factor should be parametrized + public static final float REJECTED_DRAIN_FACTOR = 0.25f; + private final ConnectionContext primaryConnectionContext; private final DeviceState deviceState; private final DataBroker dataBroker; - private final XidGenerator xidGenerator; private final HashedWheelTimer hashedWheelTimer; - private Map requests = new TreeMap<>(); - private final Map auxiliaryConnectionContexts; private final TransactionChainManager txChainManager; - private TranslatorLibrary translatorLibrary; - private OpenflowMessageListenerFacade openflowMessageListenerFacade; private final DeviceFlowRegistry deviceFlowRegistry; private final DeviceGroupRegistry deviceGroupRegistry; private final DeviceMeterRegistry deviceMeterRegistry; - private Timeout barrierTaskTimeout; - private NotificationProviderService notificationService; - private final MessageSpy messageSpy; + private final Collection closeHandlers = new HashSet<>(); + private final PacketInRateLimiter packetInLimiter; + private final MessageSpy messageSpy; + private NotificationPublishService notificationPublishService; private DeviceDisconnectedHandler deviceDisconnectedHandler; - private DeviceContextClosedHandler deviceContextClosedHandler; - + private NotificationService notificationService; + private TranslatorLibrary translatorLibrary; + private OutboundQueue outboundQueueProvider; + private Timeout barrierTaskTimeout; @VisibleForTesting DeviceContextImpl(@Nonnull final ConnectionContext primaryConnectionContext, @Nonnull final DeviceState deviceState, @Nonnull final DataBroker dataBroker, @Nonnull final HashedWheelTimer hashedWheelTimer, - @Nonnull final MessageSpy _messageSpy) { + @Nonnull final MessageSpy _messageSpy, OutboundQueueProvider outboundQueueProvider) { this.primaryConnectionContext = Preconditions.checkNotNull(primaryConnectionContext); this.deviceState = Preconditions.checkNotNull(deviceState); this.dataBroker = Preconditions.checkNotNull(dataBroker); this.hashedWheelTimer = Preconditions.checkNotNull(hashedWheelTimer); - xidGenerator = new XidGenerator(); - txChainManager = new TransactionChainManager(dataBroker, hashedWheelTimer, 500L); + this.outboundQueueProvider = Preconditions.checkNotNull(outboundQueueProvider); + txChainManager = new TransactionChainManager(dataBroker, deviceState); auxiliaryConnectionContexts = new HashMap<>(); - requests = new HashMap<>(); deviceFlowRegistry = new DeviceFlowRegistryImpl(); deviceGroupRegistry = new DeviceGroupRegistryImpl(); deviceMeterRegistry = new DeviceMeterRegistryImpl(); messageSpy = _messageSpy; + + this.packetInLimiter = new PacketInRateLimiter(primaryConnectionContext.getConnectionAdapter(), + PACKETIN_LOW_WATERMARK, PACKETIN_HIGH_WATERMARK, messageSpy, REJECTED_DRAIN_FACTOR); } /** * This method is called from {@link DeviceManagerImpl} only. So we could say "posthandshake process finish" * and we are able to set a scheduler for an automatic transaction submitting by time (0,5sec). */ - void submitTransaction() { - txChainManager.submitTransaction(); - txChainManager.enableCounter(); + void initialSubmitTransaction() { + txChainManager.initialSubmitWriteTransaction(); } @Override - public > void onMessage(final M message, final RequestContext requestContext) { - // TODO Auto-generated method stub + public Long getReservedXid() { + return outboundQueueProvider.reserveEntry(); + } + @Override + public > void onMessage(final M message, final RequestContext requestContext) { + // TODO Auto-generated method stub } @Override public void addAuxiliaryConenctionContext(final ConnectionContext connectionContext) { - final SwitchConnectionDistinguisher connectionDistinguisher = new SwitchConnectionCookieOFImpl(connectionContext.getFeatures().getAuxiliaryId()); + final SwitchConnectionDistinguisher connectionDistinguisher = createConnectionDistinguisher(connectionContext); auxiliaryConnectionContexts.put(connectionDistinguisher, connectionContext); } + private static SwitchConnectionDistinguisher createConnectionDistinguisher(final ConnectionContext connectionContext) { + return new SwitchConnectionCookieOFImpl(connectionContext.getFeatures().getAuxiliaryId()); + } + @Override public void removeAuxiliaryConenctionContext(final ConnectionContext connectionContext) { // TODO Auto-generated method stub @@ -171,9 +185,8 @@ public class DeviceContextImpl implements DeviceContext { } @Override - public TableFeatures getCapabilities() { - // TODO Auto-generated method stub - return null; + public boolean submitTransaction() { + return txChainManager.submitWriteTransaction(); } @Override @@ -186,42 +199,6 @@ public class DeviceContextImpl implements DeviceContext { return auxiliaryConnectionContexts.get(new SwitchConnectionCookieOFImpl(cookie.longValue())); } - @Override - public Xid getNextXid() { - return xidGenerator.generate(); - } - - @Override - public RequestContext lookupRequest(Xid xid) { - return requests.get(xid.getValue()); - } - - @Override - public int getNumberOfOutstandingRequests() { - return requests.size(); - } - - @Override - public void hookRequestCtx(final Xid xid, final RequestContext requestFutureContext) { - requests.put(xid.getValue(), requestFutureContext); - } - - @Override - public RequestContext unhookRequestCtx(Xid xid) { - return requests.remove(xid.getValue()); - } - - @Override - public void attachOpenflowMessageListener(final OpenflowMessageListenerFacade openflowMessageListenerFacade) { - this.openflowMessageListenerFacade = openflowMessageListenerFacade; - primaryConnectionContext.getConnectionAdapter().setMessageListener(openflowMessageListenerFacade); - } - - @Override - public OpenflowMessageListenerFacade getOpenflowMessageListenerFacade() { - return openflowMessageListenerFacade; - } - @Override public DeviceFlowRegistry getDeviceFlowRegistry() { return deviceFlowRegistry; @@ -239,88 +216,17 @@ public class DeviceContextImpl implements DeviceContext { @Override public void processReply(final OfHeader ofHeader) { - final RequestContext requestContext = requests.get(ofHeader.getXid()); - if (null != requestContext) { - final SettableFuture replyFuture = requestContext.getFuture(); - requests.remove(ofHeader.getXid()); - RpcResult rpcResult; - if (ofHeader instanceof Error) { - final Error error = (Error) ofHeader; - final String message = "Operation on device failed"; - rpcResult = RpcResultBuilder - .failed() - .withError(RpcError.ErrorType.APPLICATION, message, new DeviceDataException(message, error)) - .build(); - messageSpy.spyMessage(ofHeader.getImplementedInterface(), MessageSpy.STATISTIC_GROUP.FROM_SWITCH_PUBLISHED_FAILURE); - } else { - rpcResult = RpcResultBuilder - .success() - .withResult(ofHeader) - .build(); - messageSpy.spyMessage(ofHeader.getImplementedInterface(), MessageSpy.STATISTIC_GROUP.FROM_SWITCH_PUBLISHED_SUCCESS); - } - - replyFuture.set(rpcResult); - try { - requestContext.close(); - } catch (final Exception e) { - LOG.error("Closing RequestContext failed: ", e); - } + if (ofHeader instanceof Error) { + messageSpy.spyMessage(ofHeader.getImplementedInterface(), MessageSpy.STATISTIC_GROUP.FROM_SWITCH_PUBLISHED_FAILURE); } else { - LOG.error("Can't find request context registered for xid : {}. Type of reply: {}. From address: {}", ofHeader.getXid(), ofHeader.getClass().getName(), - getPrimaryConnectionContext().getConnectionAdapter().getRemoteAddress()); + messageSpy.spyMessage(ofHeader.getImplementedInterface(), MessageSpy.STATISTIC_GROUP.FROM_SWITCH_PUBLISHED_SUCCESS); } } @Override public void processReply(final Xid xid, final List ofHeaderList) { - final RequestContext requestContext = requests.get(xid.getValue()); - if (null != requestContext) { - final SettableFuture replyFuture = requestContext.getFuture(); - requests.remove(xid.getValue()); - final RpcResult> rpcResult = RpcResultBuilder - .>success() - .withResult(ofHeaderList) - .build(); - replyFuture.set(rpcResult); - for (MultipartReply multipartReply : ofHeaderList) { - messageSpy.spyMessage(multipartReply.getImplementedInterface(), MessageSpy.STATISTIC_GROUP.FROM_SWITCH_PUBLISHED_FAILURE); - } - - try { - requestContext.close(); - } catch (final Exception e) { - LOG.error("Closing RequestContext failed: ", e); - } - } else { - LOG.error("Can't find request context registered for xid : {}. Type of reply: MULTIPART. From address: {}", xid.getValue(), - getPrimaryConnectionContext().getConnectionAdapter().getRemoteAddress()); - } - } - - @Override - public void processException(final Xid xid, final DeviceDataException deviceDataException) { - - LOG.trace("Processing exception for xid : {}", xid.getValue()); - - final RequestContext requestContext = requests.get(xid.getValue()); - - if (null != requestContext) { - final SettableFuture replyFuture = requestContext.getFuture(); - requests.remove(xid.getValue()); - final RpcResult> rpcResult = RpcResultBuilder - .>failed() - .withError(RpcError.ErrorType.APPLICATION, String.format("Message processing failed : %s", deviceDataException.getError()), deviceDataException) - .build(); - replyFuture.set(rpcResult); - messageSpy.spyMessage(deviceDataException.getClass(), MessageSpy.STATISTIC_GROUP.FROM_SWITCH_PUBLISHED_FAILURE); - try { - requestContext.close(); - } catch (final Exception e) { - LOG.error("Closing RequestContext failed: ", e); - } - } else { - LOG.error("Can't find request context registered for xid : {}", xid.getValue()); + for (final MultipartReply multipartReply : ofHeaderList) { + messageSpy.spyMessage(multipartReply.getImplementedInterface(), MessageSpy.STATISTIC_GROUP.FROM_SWITCH_PUBLISHED_FAILURE); } } @@ -346,6 +252,7 @@ public class DeviceContextImpl implements DeviceContext { } else if (portStatus.getReason().equals(PortReason.OFPPRDELETE)) { addDeleteToTxChain(LogicalDatastoreType.OPERATIONAL, iiToNodeConnector); } + submitTransaction(); } private KeyedInstanceIdentifier provideIIToNodeConnector(final long portNo, final short version) { @@ -357,11 +264,50 @@ public class DeviceContextImpl implements DeviceContext { @Override public void processPacketInMessage(final PacketInMessage packetInMessage) { - messageSpy.spyMessage(packetInMessage.getImplementedInterface(), MessageSpy.STATISTIC_GROUP.FROM_SWITCH_PUBLISHED_SUCCESS); + messageSpy.spyMessage(packetInMessage.getImplementedInterface(), MessageSpy.STATISTIC_GROUP.FROM_SWITCH); + final ConnectionAdapter connectionAdapter = getPrimaryConnectionContext().getConnectionAdapter(); + final TranslatorKey translatorKey = new TranslatorKey(packetInMessage.getVersion(), PacketIn.class.getName()); final MessageTranslator messageTranslator = translatorLibrary.lookupTranslator(translatorKey); final PacketReceived packetReceived = messageTranslator.translate(packetInMessage, this, null); - notificationService.publish(packetReceived); + + if (packetReceived == null) { + LOG.debug("Received a null packet from switch {}", connectionAdapter.getRemoteAddress()); + return; + } + + if (!packetInLimiter.acquirePermit()) { + LOG.debug("Packet limited"); + // TODO: save packet into emergency slot if possible + // FIXME: some other counter + messageSpy.spyMessage(packetReceived.getImplementedInterface(), MessageSpy.STATISTIC_GROUP.FROM_SWITCH_TRANSLATE_SRC_FAILURE); + return; + } + + final ListenableFuture offerNotification = notificationPublishService.offerNotification(packetReceived); + if (NotificationPublishService.REJECTED.equals(offerNotification)) { + LOG.debug("notification offer rejected"); + messageSpy.spyMessage(packetReceived.getImplementedInterface(), MessageSpy.STATISTIC_GROUP.FROM_SWITCH_TRANSLATE_SRC_FAILURE); + packetInLimiter.drainLowWaterMark(); + packetInLimiter.releasePermit(); + return; + } + + Futures.addCallback(offerNotification, new FutureCallback() { + @Override + public void onSuccess(final Object result) { + messageSpy.spyMessage(packetReceived.getImplementedInterface(), MessageSpy.STATISTIC_GROUP.FROM_SWITCH_PUBLISHED_SUCCESS); + packetInLimiter.releasePermit(); + } + + @Override + public void onFailure(final Throwable t) { + messageSpy.spyMessage(packetReceived.getImplementedInterface(), MessageSpy.STATISTIC_GROUP.FROM_SWITCH_NOTIFICATION_REJECTED); + LOG.debug("notification offer failed: {}", t.getMessage()); + LOG.trace("notification offer failed..", t); + packetInLimiter.releasePermit(); + } + }); } @Override @@ -381,49 +327,39 @@ public class DeviceContextImpl implements DeviceContext { @Override public void close() throws Exception { - for (Map.Entry entry : requests.entrySet()) { - entry.getValue().close(); + deviceState.setValid(false); + + deviceGroupRegistry.close(); + deviceFlowRegistry.close(); + deviceMeterRegistry.close(); + + primaryConnectionContext.close(); + for (final ConnectionContext connectionContext : auxiliaryConnectionContexts.values()) { + connectionContext.close(); + } + + for (final DeviceContextClosedHandler deviceContextClosedHandler : closeHandlers) { + deviceContextClosedHandler.onDeviceContextClosed(this); } - deviceContextClosedHandler.onDeviceContextClosed(this); + + txChainManager.close(); } @Override public void onDeviceDisconnected(final ConnectionContext connectionContext) { - if (this.getPrimaryConnectionContext().equals(connectionContext)) { + if (getPrimaryConnectionContext().equals(connectionContext)) { try { close(); - } catch (Exception e) { + } catch (final Exception e) { LOG.trace("Error closing device context."); } if (null != deviceDisconnectedHandler) { deviceDisconnectedHandler.onDeviceDisconnected(connectionContext); } } else { - auxiliaryConnectionContexts.remove(connectionContext); - } - } - - - private class XidGenerator { - - private final AtomicLong xid = new AtomicLong(0); - - public Xid generate() { - return new Xid(xid.incrementAndGet()); - } - } - - @Override - public RequestContext extractNextOutstandingMessage(final long barrierXid) { - RequestContext nextMessage = null; - final Iterator keyIterator = requests.keySet().iterator(); - if (keyIterator.hasNext()) { - final Long oldestXid = keyIterator.next(); - if (oldestXid < barrierXid) { - nextMessage = requests.remove(oldestXid); - } + final SwitchConnectionDistinguisher connectionDistinguisher = createConnectionDistinguisher(connectionContext); + auxiliaryConnectionContexts.remove(connectionDistinguisher); } - return nextMessage; } @Override @@ -437,10 +373,15 @@ public class DeviceContextImpl implements DeviceContext { } @Override - public void setNotificationService(final NotificationProviderService notificationServiceParam) { + public void setNotificationService(final NotificationService notificationServiceParam) { notificationService = notificationServiceParam; } + @Override + public void setNotificationPublishService(final NotificationPublishService notificationPublishService) { + this.notificationPublishService = notificationPublishService; + } + @Override public MessageSpy getMessageSpy() { return messageSpy; @@ -452,7 +393,20 @@ public class DeviceContextImpl implements DeviceContext { } @Override - public void setDeviceContextClosedHandler(final DeviceContextClosedHandler deviceContextClosedHandler) { - this.deviceContextClosedHandler = deviceContextClosedHandler; + public void addDeviceContextClosedHandler(final DeviceContextClosedHandler deviceContextClosedHandler) { + closeHandlers.add(deviceContextClosedHandler); + } + + @Override + public void onPublished() { + primaryConnectionContext.getConnectionAdapter().setPacketInFiltering(false); + for (final ConnectionContext switchAuxConnectionContext : auxiliaryConnectionContexts.values()) { + switchAuxConnectionContext.getConnectionAdapter().setPacketInFiltering(false); + } + } + + @Override + public MultiMsgCollector getMultiMsgCollector(final RequestContext> requestContext) { + return new MultiMsgCollectorImpl(this, requestContext); } }