X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;f=openflowplugin-impl%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fopenflowplugin%2Fimpl%2Fdevice%2FDeviceContextImpl.java;h=c19041aaeaac196cc7bc9d718422218d3681999f;hb=1a5b7b8a9b2e60ec70fd1048f80661b0553e55f2;hp=69a6e2f1788a4c87904154d8ab3512d7d7525f48;hpb=add6c892275f91a4b57d5c43c0f33e7b7ece5e20;p=openflowplugin.git diff --git a/openflowplugin-impl/src/main/java/org/opendaylight/openflowplugin/impl/device/DeviceContextImpl.java b/openflowplugin-impl/src/main/java/org/opendaylight/openflowplugin/impl/device/DeviceContextImpl.java index 69a6e2f178..c19041aaea 100644 --- a/openflowplugin-impl/src/main/java/org/opendaylight/openflowplugin/impl/device/DeviceContextImpl.java +++ b/openflowplugin-impl/src/main/java/org/opendaylight/openflowplugin/impl/device/DeviceContextImpl.java @@ -9,24 +9,33 @@ package org.opendaylight.openflowplugin.impl.device; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; -import com.google.common.util.concurrent.SettableFuture; +import com.google.common.util.concurrent.FutureCallback; +import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.ListenableFuture; import io.netty.util.HashedWheelTimer; import io.netty.util.Timeout; import java.math.BigInteger; -import java.util.ArrayList; -import java.util.Collections; +import java.net.InetSocketAddress; +import java.util.Collection; import java.util.HashMap; +import java.util.HashSet; import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.TreeMap; -import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.ExecutionException; import javax.annotation.Nonnull; import org.opendaylight.controller.md.sal.binding.api.DataBroker; +import org.opendaylight.controller.md.sal.binding.api.NotificationPublishService; +import org.opendaylight.controller.md.sal.binding.api.NotificationService; import org.opendaylight.controller.md.sal.binding.api.ReadTransaction; import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType; -import org.opendaylight.controller.sal.binding.api.NotificationProviderService; +import org.opendaylight.openflowjava.protocol.api.connection.ConnectionAdapter; +import org.opendaylight.openflowjava.protocol.api.connection.OutboundQueue; import org.opendaylight.openflowplugin.api.openflow.connection.ConnectionContext; +import org.opendaylight.openflowplugin.api.openflow.connection.ThrottledNotificationsOfferer; import org.opendaylight.openflowplugin.api.openflow.device.DeviceContext; import org.opendaylight.openflowplugin.api.openflow.device.DeviceState; import org.opendaylight.openflowplugin.api.openflow.device.MessageTranslator; @@ -36,7 +45,7 @@ import org.opendaylight.openflowplugin.api.openflow.device.Xid; import org.opendaylight.openflowplugin.api.openflow.device.exception.DeviceDataException; import org.opendaylight.openflowplugin.api.openflow.device.handlers.DeviceContextClosedHandler; import org.opendaylight.openflowplugin.api.openflow.device.handlers.DeviceDisconnectedHandler; -import org.opendaylight.openflowplugin.api.openflow.device.listener.OpenflowMessageListenerFacade; +import org.opendaylight.openflowplugin.api.openflow.device.handlers.MultiMsgCollector; import org.opendaylight.openflowplugin.api.openflow.md.core.SwitchConnectionDistinguisher; import org.opendaylight.openflowplugin.api.openflow.md.core.TranslatorKey; import org.opendaylight.openflowplugin.api.openflow.registry.flow.DeviceFlowRegistry; @@ -44,6 +53,7 @@ import org.opendaylight.openflowplugin.api.openflow.registry.group.DeviceGroupRe import org.opendaylight.openflowplugin.api.openflow.registry.meter.DeviceMeterRegistry; import org.opendaylight.openflowplugin.api.openflow.statistics.ofpspecific.MessageSpy; import org.opendaylight.openflowplugin.impl.common.NodeStaticReplyTranslatorUtil; +import org.opendaylight.openflowplugin.impl.device.listener.MultiMsgCollectorImpl; import org.opendaylight.openflowplugin.impl.registry.flow.DeviceFlowRegistryImpl; import org.opendaylight.openflowplugin.impl.registry.group.DeviceGroupRegistryImpl; import org.opendaylight.openflowplugin.impl.registry.meter.DeviceMeterRegistryImpl; @@ -88,22 +98,36 @@ public class DeviceContextImpl implements DeviceContext { private final ConnectionContext primaryConnectionContext; private final DeviceState deviceState; private final DataBroker dataBroker; - private final XidGenerator xidGenerator; private final HashedWheelTimer hashedWheelTimer; - private Map requests = Collections.synchronizedMap(new TreeMap()); + private final Map requests = new TreeMap<>(); private final Map auxiliaryConnectionContexts; private final TransactionChainManager txChainManager; private TranslatorLibrary translatorLibrary; - private OpenflowMessageListenerFacade openflowMessageListenerFacade; private final DeviceFlowRegistry deviceFlowRegistry; private final DeviceGroupRegistry deviceGroupRegistry; private final DeviceMeterRegistry deviceMeterRegistry; private Timeout barrierTaskTimeout; - private NotificationProviderService notificationService; - private final MessageSpy messageSpy; + private NotificationService notificationService; + private final MessageSpy> messageSpy; private DeviceDisconnectedHandler deviceDisconnectedHandler; - private List closeHandlers = new ArrayList<>(); + private final Collection closeHandlers = new HashSet<>(); + private NotificationPublishService notificationPublishService; + private final ThrottledNotificationsOfferer throttledConnectionsHolder; + private final BlockingQueue bumperQueue; + private final OutboundQueue outboundQueueProvider; + + @Override + public MultiMsgCollector getMultiMsgCollector() { + return multiMsgCollector; + } + + @Override + public Long getReservedXid() { + return outboundQueueProvider.reserveEntry(); + } + + private final MultiMsgCollector multiMsgCollector = new MultiMsgCollectorImpl(); @VisibleForTesting @@ -111,19 +135,22 @@ public class DeviceContextImpl implements DeviceContext { @Nonnull final DeviceState deviceState, @Nonnull final DataBroker dataBroker, @Nonnull final HashedWheelTimer hashedWheelTimer, - @Nonnull final MessageSpy _messageSpy) { + @Nonnull final MessageSpy _messageSpy, + @Nonnull final ThrottledNotificationsOfferer throttledConnectionsHolder) { this.primaryConnectionContext = Preconditions.checkNotNull(primaryConnectionContext); this.deviceState = Preconditions.checkNotNull(deviceState); this.dataBroker = Preconditions.checkNotNull(dataBroker); this.hashedWheelTimer = Preconditions.checkNotNull(hashedWheelTimer); - xidGenerator = new XidGenerator(); txChainManager = new TransactionChainManager(dataBroker, hashedWheelTimer, 500L, 500L); auxiliaryConnectionContexts = new HashMap<>(); - requests = new HashMap<>(); deviceFlowRegistry = new DeviceFlowRegistryImpl(); deviceGroupRegistry = new DeviceGroupRegistryImpl(); deviceMeterRegistry = new DeviceMeterRegistryImpl(); messageSpy = _messageSpy; + this.throttledConnectionsHolder = throttledConnectionsHolder; + bumperQueue = new ArrayBlockingQueue<>(5000); + multiMsgCollector.setDeviceReplyProcessor(this); + outboundQueueProvider = Preconditions.checkNotNull(primaryConnectionContext.getOutboundQueueProvider()); } /** @@ -131,8 +158,8 @@ public class DeviceContextImpl implements DeviceContext { * and we are able to set a scheduler for an automatic transaction submitting by time (0,5sec). */ void submitTransaction() { + txChainManager.enableSubmit(); txChainManager.submitTransaction(); - txChainManager.enableCounter(); } @Override @@ -143,10 +170,14 @@ public class DeviceContextImpl implements DeviceContext { @Override public void addAuxiliaryConenctionContext(final ConnectionContext connectionContext) { - final SwitchConnectionDistinguisher connectionDistinguisher = new SwitchConnectionCookieOFImpl(connectionContext.getFeatures().getAuxiliaryId()); + final SwitchConnectionDistinguisher connectionDistinguisher = createConnectionDistinguisher(connectionContext); auxiliaryConnectionContexts.put(connectionDistinguisher, connectionContext); } + private static SwitchConnectionDistinguisher createConnectionDistinguisher(final ConnectionContext connectionContext) { + return new SwitchConnectionCookieOFImpl(connectionContext.getFeatures().getAuxiliaryId()); + } + @Override public void removeAuxiliaryConenctionContext(final ConnectionContext connectionContext) { // TODO Auto-generated method stub @@ -184,39 +215,31 @@ public class DeviceContextImpl implements DeviceContext { } @Override - public Xid getNextXid() { - return xidGenerator.generate(); - } - - @Override - public RequestContext lookupRequest(Xid xid) { - return requests.get(xid.getValue()); + public RequestContext lookupRequest(final Xid xid) { + synchronized (requests) { + return requests.get(xid.getValue()); + } } @Override public int getNumberOfOutstandingRequests() { - return requests.size(); + synchronized (requests) { + return requests.size(); + } } @Override public void hookRequestCtx(final Xid xid, final RequestContext requestFutureContext) { - requests.put(xid.getValue(), requestFutureContext); - } - - @Override - public RequestContext unhookRequestCtx(Xid xid) { - return requests.remove(xid.getValue()); - } - - @Override - public void attachOpenflowMessageListener(final OpenflowMessageListenerFacade openflowMessageListenerFacade) { - this.openflowMessageListenerFacade = openflowMessageListenerFacade; - primaryConnectionContext.getConnectionAdapter().setMessageListener(openflowMessageListenerFacade); + synchronized (requests) { + requests.put(xid.getValue(), requestFutureContext); + } } @Override - public OpenflowMessageListenerFacade getOpenflowMessageListenerFacade() { - return openflowMessageListenerFacade; + public RequestContext unhookRequestCtx(final Xid xid) { + synchronized (requests) { + return requests.remove(xid.getValue()); + } } @Override @@ -236,16 +259,14 @@ public class DeviceContextImpl implements DeviceContext { @Override public void processReply(final OfHeader ofHeader) { - final RequestContext requestContext = requests.get(ofHeader.getXid()); + final RequestContext requestContext = requests.remove(ofHeader.getXid()); if (null != requestContext) { - final SettableFuture replyFuture = requestContext.getFuture(); - requests.remove(ofHeader.getXid()); RpcResult rpcResult; if (ofHeader instanceof Error) { //TODO : this is the point, where we can discover that add flow operation failed and where we should //TODO : remove this flow from deviceFlowRegistry final Error error = (Error) ofHeader; - final String message = "Operation on device failed"; + final String message = "Operation on device failed with xid " + ofHeader.getXid() + "."; rpcResult = RpcResultBuilder .failed() .withError(RpcError.ErrorType.APPLICATION, message, new DeviceDataException(message, error)) @@ -259,40 +280,44 @@ public class DeviceContextImpl implements DeviceContext { messageSpy.spyMessage(ofHeader.getImplementedInterface(), MessageSpy.STATISTIC_GROUP.FROM_SWITCH_PUBLISHED_SUCCESS); } - replyFuture.set(rpcResult); + requestContext.setResult(rpcResult); try { requestContext.close(); } catch (final Exception e) { - LOG.error("Closing RequestContext failed: ", e); + LOG.warn("Closing RequestContext failed: {}", e.getMessage()); + LOG.debug("Closing RequestContext failed.. ", e); } } else { - LOG.error("Can't find request context registered for xid : {}. Type of reply: {}. From address: {}", ofHeader.getXid(), ofHeader.getClass().getName(), + LOG.warn("Can't find request context registered for xid : {}. Type of reply: {}. From address: {}", ofHeader.getXid(), ofHeader.getClass().getName(), getPrimaryConnectionContext().getConnectionAdapter().getRemoteAddress()); } } @Override public void processReply(final Xid xid, final List ofHeaderList) { - final RequestContext requestContext = requests.get(xid.getValue()); + final RequestContext requestContext; + synchronized (requests) { + requestContext = requests.remove(xid.getValue()); + } if (null != requestContext) { - final SettableFuture replyFuture = requestContext.getFuture(); - requests.remove(xid.getValue()); final RpcResult> rpcResult = RpcResultBuilder .>success() .withResult(ofHeaderList) .build(); - replyFuture.set(rpcResult); - for (MultipartReply multipartReply : ofHeaderList) { + requestContext.setResult(rpcResult); + for (final MultipartReply multipartReply : ofHeaderList) { messageSpy.spyMessage(multipartReply.getImplementedInterface(), MessageSpy.STATISTIC_GROUP.FROM_SWITCH_PUBLISHED_FAILURE); } + unhookRequestCtx(xid); try { requestContext.close(); } catch (final Exception e) { - LOG.error("Closing RequestContext failed: ", e); + LOG.warn("Closing RequestContext failed: {}", e.getMessage()); + LOG.debug("Closing RequestContext failed.. ", e); } } else { - LOG.error("Can't find request context registered for xid : {}. Type of reply: MULTIPART. From address: {}", xid.getValue(), + LOG.warn("Can't find request context registered for xid : {}. Type of reply: MULTIPART. From address: {}", xid.getValue(), getPrimaryConnectionContext().getConnectionAdapter().getRemoteAddress()); } } @@ -302,24 +327,24 @@ public class DeviceContextImpl implements DeviceContext { LOG.trace("Processing exception for xid : {}", xid.getValue()); - final RequestContext requestContext = requests.get(xid.getValue()); + final RequestContext requestContext = requests.remove(xid.getValue()); if (null != requestContext) { - final SettableFuture replyFuture = requestContext.getFuture(); - requests.remove(xid.getValue()); final RpcResult> rpcResult = RpcResultBuilder .>failed() .withError(RpcError.ErrorType.APPLICATION, String.format("Message processing failed : %s", deviceDataException.getError()), deviceDataException) .build(); - replyFuture.set(rpcResult); + requestContext.setResult(rpcResult); messageSpy.spyMessage(deviceDataException.getClass(), MessageSpy.STATISTIC_GROUP.FROM_SWITCH_PUBLISHED_FAILURE); try { requestContext.close(); } catch (final Exception e) { - LOG.error("Closing RequestContext failed: ", e); + LOG.warn("Closing RequestContext failed: ", e); + LOG.debug("Closing RequestContext failed..", e); } } else { - LOG.error("Can't find request context registered for xid : {}", xid.getValue()); + LOG.warn("Can't find request context registered for xid : {}. Exception message {}", + xid.getValue(), deviceDataException.getMessage()); } } @@ -329,7 +354,7 @@ public class DeviceContextImpl implements DeviceContext { } @Override - public synchronized void processPortStatusMessage(final PortStatusMessage portStatus) { + public void processPortStatusMessage(final PortStatusMessage portStatus) { messageSpy.spyMessage(portStatus.getImplementedInterface(), MessageSpy.STATISTIC_GROUP.FROM_SWITCH_PUBLISHED_SUCCESS); final TranslatorKey translatorKey = new TranslatorKey(portStatus.getVersion(), PortGrouping.class.getName()); final MessageTranslator messageTranslator = translatorLibrary.lookupTranslator(translatorKey); @@ -356,11 +381,71 @@ public class DeviceContextImpl implements DeviceContext { @Override public void processPacketInMessage(final PacketInMessage packetInMessage) { - messageSpy.spyMessage(packetInMessage.getImplementedInterface(), MessageSpy.STATISTIC_GROUP.FROM_SWITCH_PUBLISHED_SUCCESS); + messageSpy.spyMessage(packetInMessage.getImplementedInterface(), MessageSpy.STATISTIC_GROUP.FROM_SWITCH); + final TranslatorKey translatorKey = new TranslatorKey(packetInMessage.getVersion(), PacketIn.class.getName()); final MessageTranslator messageTranslator = translatorLibrary.lookupTranslator(translatorKey); final PacketReceived packetReceived = messageTranslator.translate(packetInMessage, this, null); - notificationService.publish(packetReceived); + + if (packetReceived != null) { + messageSpy.spyMessage(packetInMessage.getImplementedInterface(), MessageSpy.STATISTIC_GROUP.FROM_SWITCH_TRANSLATE_OUT_SUCCESS); + } else { + messageSpy.spyMessage(packetInMessage.getImplementedInterface(), MessageSpy.STATISTIC_GROUP.FROM_SWITCH_TRANSLATE_SRC_FAILURE); + return; + } + + ListenableFuture listenableFuture = notificationPublishService.offerNotification(packetReceived); + if (NotificationPublishService.REJECTED.equals(listenableFuture)) { + LOG.debug("notification offer rejected"); + messageSpy.spyMessage(packetInMessage.getImplementedInterface(), MessageSpy.STATISTIC_GROUP.FROM_SWITCH_NOTIFICATION_REJECTED); + } else if (listenableFuture.isDone()) { + Object x = null; + try { + x = listenableFuture.get(); + messageSpy.spyMessage(packetInMessage.getImplementedInterface(), MessageSpy.STATISTIC_GROUP.FROM_SWITCH_PUBLISHED_SUCCESS); + } catch (InterruptedException e) { + LOG.debug("notification offer interrupted: {}", e.getMessage()); + LOG.trace("notification offer interrupted..", e); + } catch (ExecutionException e) { + LOG.debug("notification offer failed: {}", e.getMessage()); + LOG.trace("notification offer failed..", e); + } finally { + if (null == x) { + messageSpy.spyMessage(packetInMessage.getImplementedInterface(), MessageSpy.STATISTIC_GROUP.FROM_SWITCH_NOTIFICATION_REJECTED); + } + } + } else { + messageSpy.spyMessage(packetInMessage.getImplementedInterface(), MessageSpy.STATISTIC_GROUP.FROM_SWITCH_PUBLISHED_SUCCESS); + } + } + + private void applyThrottling(final PacketReceived packetReceived, final ConnectionAdapter connectionAdapter) { + final InetSocketAddress remoteAddress = connectionAdapter.getRemoteAddress(); + LOG.debug("Notification offer refused by notification service."); + messageSpy.spyMessage(packetReceived.getImplementedInterface(), MessageSpy.STATISTIC_GROUP.FROM_SWITCH_PUBLISHED_FAILURE); + connectionAdapter.setAutoRead(false); + + LOG.debug("Throttling ingress for {}", remoteAddress); + final ListenableFuture queueDone; + + // adding first notification + bumperQueue.offer(packetReceived); + synchronized (bumperQueue) { + queueDone = throttledConnectionsHolder.applyThrottlingOnConnection(bumperQueue); + } + Futures.addCallback(queueDone, new FutureCallback() { + @Override + public void onSuccess(final Void result) { + LOG.debug("Un - throttling ingress for {}", remoteAddress); + connectionAdapter.setAutoRead(true); + } + + @Override + public void onFailure(final Throwable t) { + LOG.warn("failed to offer queued notification for {}: {}", remoteAddress, t.getMessage()); + LOG.debug("failed to offer queued notification for {}.. ", remoteAddress, t); + } + }); } @Override @@ -374,7 +459,7 @@ public class DeviceContextImpl implements DeviceContext { } @Override - public synchronized HashedWheelTimer getTimer() { + public HashedWheelTimer getTimer() { return hashedWheelTimer; } @@ -393,54 +478,47 @@ public class DeviceContextImpl implements DeviceContext { primaryConnectionContext.setConnectionState(ConnectionContext.CONNECTION_STATE.RIP); primaryConnectionContext.getConnectionAdapter().disconnect(); } - for (Map.Entry entry : requests.entrySet()) { + for (final Map.Entry entry : requests.entrySet()) { RequestContextUtil.closeRequestContextWithRpcError(entry.getValue(), DEVICE_DISCONNECTED); } - for (ConnectionContext connectionContext : auxiliaryConnectionContexts.values()) { + for (final ConnectionContext connectionContext : auxiliaryConnectionContexts.values()) { if (connectionContext.getConnectionAdapter().isAlive()) { connectionContext.getConnectionAdapter().disconnect(); } } - for (DeviceContextClosedHandler deviceContextClosedHandler : closeHandlers) { + for (final DeviceContextClosedHandler deviceContextClosedHandler : closeHandlers) { deviceContextClosedHandler.onDeviceContextClosed(this); } } @Override - public synchronized void onDeviceDisconnected(final ConnectionContext connectionContext) { + public void onDeviceDisconnected(final ConnectionContext connectionContext) { if (this.getPrimaryConnectionContext().equals(connectionContext)) { try { close(); - } catch (Exception e) { + } catch (final Exception e) { LOG.trace("Error closing device context."); } if (null != deviceDisconnectedHandler) { deviceDisconnectedHandler.onDeviceDisconnected(connectionContext); } } else { - auxiliaryConnectionContexts.remove(connectionContext); - } - } - - - private class XidGenerator { - - private final AtomicLong xid = new AtomicLong(0); - - public Xid generate() { - return new Xid(xid.incrementAndGet()); + final SwitchConnectionDistinguisher connectionDistinguisher = createConnectionDistinguisher(connectionContext); + auxiliaryConnectionContexts.remove(connectionDistinguisher); } } @Override public RequestContext extractNextOutstandingMessage(final long barrierXid) { RequestContext nextMessage = null; - final Iterator keyIterator = requests.keySet().iterator(); - if (keyIterator.hasNext()) { - final Long oldestXid = keyIterator.next(); - if (oldestXid < barrierXid) { - nextMessage = requests.remove(oldestXid); + synchronized (requests) { + final Iterator keyIterator = requests.keySet().iterator(); + if (keyIterator.hasNext()) { + final Long oldestXid = keyIterator.next(); + if (oldestXid < barrierXid) { + nextMessage = requests.remove(oldestXid); + } } } return nextMessage; @@ -457,10 +535,15 @@ public class DeviceContextImpl implements DeviceContext { } @Override - public void setNotificationService(final NotificationProviderService notificationServiceParam) { + public void setNotificationService(final NotificationService notificationServiceParam) { notificationService = notificationServiceParam; } + @Override + public void setNotificationPublishService(final NotificationPublishService notificationPublishService) { + this.notificationPublishService = notificationPublishService; + } + @Override public MessageSpy getMessageSpy() { return messageSpy; @@ -475,4 +558,15 @@ public class DeviceContextImpl implements DeviceContext { public void addDeviceContextClosedHandler(final DeviceContextClosedHandler deviceContextClosedHandler) { this.closeHandlers.add(deviceContextClosedHandler); } + + @Override + public void startGatheringOperationsToOneTransaction() { + txChainManager.startGatheringOperationsToOneTransaction(); + } + + @Override + public void commitOperationsGatheredInOneTransaction() { + txChainManager.commitOperationsGatheredInOneTransaction(); + } + }