connection adapter is set to autoread = false when notofications are not handled 18/19918/1
authorMartin Bobak <mbobak@cisco.com>
Fri, 8 May 2015 16:51:16 +0000 (18:51 +0200)
committerMartin Bobak <mbobak@cisco.com>
Fri, 8 May 2015 16:51:16 +0000 (18:51 +0200)
Change-Id: Ie97ed2237a862c64dbd7b7616dce37ef98f3673e
Signed-off-by: Martin Bobak <mbobak@cisco.com>
openflowplugin-api/src/main/java/org/opendaylight/openflowplugin/api/openflow/connection/ThrottledConnectionsHolder.java [new file with mode: 0644]
openflowplugin-api/src/main/java/org/opendaylight/openflowplugin/api/openflow/device/DeviceContext.java
openflowplugin-impl/src/main/java/org/opendaylight/openflowplugin/impl/connection/ThrottledConnectionsHolderImpl.java [new file with mode: 0644]
openflowplugin-impl/src/main/java/org/opendaylight/openflowplugin/impl/device/DeviceContextImpl.java
openflowplugin-impl/src/main/java/org/opendaylight/openflowplugin/impl/device/DeviceManagerImpl.java
openflowplugin-impl/src/test/java/org/opendaylight/openflowplugin/impl/device/DeviceContextImplTest.java

diff --git a/openflowplugin-api/src/main/java/org/opendaylight/openflowplugin/api/openflow/connection/ThrottledConnectionsHolder.java b/openflowplugin-api/src/main/java/org/opendaylight/openflowplugin/api/openflow/connection/ThrottledConnectionsHolder.java
new file mode 100644 (file)
index 0000000..e1cbf02
--- /dev/null
@@ -0,0 +1,19 @@
+/*
+ * Copyright (c) 2015 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.openflowplugin.api.openflow.connection;
+
+import org.opendaylight.openflowjava.protocol.api.connection.ConnectionAdapter;
+
+/**
+ * Created by Martin Bobak &lt;mbobak@cisco.com&gt; on 8.5.2015.
+ */
+public interface ThrottledConnectionsHolder {
+
+    void storeThrottledConnection(ConnectionAdapter connectionAdapter);
+}
index 84a78324ea93300e8c9578018a6e503628d9dac3..13adcb344732e74b7406d42d2f41b0e4625c1133 100644 (file)
@@ -16,6 +16,7 @@ import org.opendaylight.controller.md.sal.binding.api.ReadTransaction;
 import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
 import org.opendaylight.openflowplugin.api.openflow.OpenFlowPluginTimer;
 import org.opendaylight.openflowplugin.api.openflow.connection.ConnectionContext;
+import org.opendaylight.openflowplugin.api.openflow.connection.ThrottledConnectionsHolder;
 import org.opendaylight.openflowplugin.api.openflow.device.handlers.DeviceContextClosedHandler;
 import org.opendaylight.openflowplugin.api.openflow.device.handlers.DeviceDisconnectedHandler;
 import org.opendaylight.openflowplugin.api.openflow.device.handlers.DeviceReplyProcessor;
diff --git a/openflowplugin-impl/src/main/java/org/opendaylight/openflowplugin/impl/connection/ThrottledConnectionsHolderImpl.java b/openflowplugin-impl/src/main/java/org/opendaylight/openflowplugin/impl/connection/ThrottledConnectionsHolderImpl.java
new file mode 100644 (file)
index 0000000..fd10ae1
--- /dev/null
@@ -0,0 +1,73 @@
+/*
+ * Copyright (c) 2015 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.openflowplugin.impl.connection;
+
+import io.netty.util.HashedWheelTimer;
+import io.netty.util.Timeout;
+import io.netty.util.TimerTask;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.LinkedHashSet;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import org.opendaylight.openflowjava.protocol.api.connection.ConnectionAdapter;
+import org.opendaylight.openflowplugin.api.openflow.connection.ThrottledConnectionsHolder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Created by Martin Bobak &lt;mbobak@cisco.com&gt; on 8.5.2015.
+ */
+public class ThrottledConnectionsHolderImpl implements ThrottledConnectionsHolder, TimerTask {
+
+    private final Set<ConnectionAdapter> throttledConnections = Collections.synchronizedSet(new LinkedHashSet<ConnectionAdapter>());
+    private final HashedWheelTimer hashedWheelTimer;
+    private Timeout timeout;
+    private long delay = 100L;
+    private static final Logger LOG = LoggerFactory.getLogger(ThrottledConnectionsHolderImpl.class);
+
+    public ThrottledConnectionsHolderImpl(final HashedWheelTimer hashedWheelTimer) {
+        this.hashedWheelTimer = hashedWheelTimer;
+    }
+
+    @Override
+    public void storeThrottledConnection(final ConnectionAdapter connectionAdapter) {
+        throttledConnections.add(connectionAdapter);
+        LOG.info("Adding piece of throttle for {}", connectionAdapter.getRemoteAddress());
+        synchronized (this) {
+            if (null == timeout) {
+                scheduleTimeout();
+            }
+        }
+    }
+
+    private void scheduleTimeout() {
+        this.timeout = hashedWheelTimer.newTimeout(this, delay, TimeUnit.MILLISECONDS);
+    }
+
+    @Override
+    public void run(final Timeout timeout) throws Exception {
+        synchronized (this) {
+            this.timeout = null;
+            if (throttledConnections.isEmpty()) {
+                return;
+            }
+
+            final Iterator<ConnectionAdapter> iterator = throttledConnections.iterator();
+            if (iterator.hasNext()) {
+                ConnectionAdapter connectionAdapter = iterator.next();
+                iterator.remove();
+                connectionAdapter.setAutoRead(true);
+                LOG.info("Un - throttling primary connection for {}", connectionAdapter.getRemoteAddress());
+            }
+
+            scheduleTimeout();
+        }
+    }
+}
index 9c907ee074523daa33a00fb7c3960912d5d01d0b..a92df046099138df778011f2ddbcc7236804bc92 100644 (file)
@@ -26,7 +26,9 @@ import org.opendaylight.controller.md.sal.binding.api.NotificationPublishService
 import org.opendaylight.controller.md.sal.binding.api.NotificationService;
 import org.opendaylight.controller.md.sal.binding.api.ReadTransaction;
 import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
+import org.opendaylight.openflowjava.protocol.api.connection.ConnectionAdapter;
 import org.opendaylight.openflowplugin.api.openflow.connection.ConnectionContext;
+import org.opendaylight.openflowplugin.api.openflow.connection.ThrottledConnectionsHolder;
 import org.opendaylight.openflowplugin.api.openflow.device.DeviceContext;
 import org.opendaylight.openflowplugin.api.openflow.device.DeviceState;
 import org.opendaylight.openflowplugin.api.openflow.device.MessageTranslator;
@@ -105,6 +107,7 @@ public class DeviceContextImpl implements DeviceContext {
     private DeviceDisconnectedHandler deviceDisconnectedHandler;
     private final List<DeviceContextClosedHandler> closeHandlers = new ArrayList<>();
     private NotificationPublishService notificationPublishService;
+    private final ThrottledConnectionsHolder throttledConnectionsHolder;
 
 
     @VisibleForTesting
@@ -112,7 +115,8 @@ public class DeviceContextImpl implements DeviceContext {
                       @Nonnull final DeviceState deviceState,
                       @Nonnull final DataBroker dataBroker,
                       @Nonnull final HashedWheelTimer hashedWheelTimer,
-                      @Nonnull final MessageSpy _messageSpy) {
+                      @Nonnull final MessageSpy _messageSpy,
+                      @Nonnull final ThrottledConnectionsHolder throttledConnectionsHolder) {
         this.primaryConnectionContext = Preconditions.checkNotNull(primaryConnectionContext);
         this.deviceState = Preconditions.checkNotNull(deviceState);
         this.dataBroker = Preconditions.checkNotNull(dataBroker);
@@ -124,6 +128,7 @@ public class DeviceContextImpl implements DeviceContext {
         deviceGroupRegistry = new DeviceGroupRegistryImpl();
         deviceMeterRegistry = new DeviceMeterRegistryImpl();
         messageSpy = _messageSpy;
+        this.throttledConnectionsHolder = throttledConnectionsHolder;
     }
 
     /**
@@ -376,8 +381,14 @@ public class DeviceContextImpl implements DeviceContext {
         final TranslatorKey translatorKey = new TranslatorKey(packetInMessage.getVersion(), PacketIn.class.getName());
         final MessageTranslator<PacketInMessage, PacketReceived> messageTranslator = translatorLibrary.lookupTranslator(translatorKey);
         final PacketReceived packetReceived = messageTranslator.translate(packetInMessage, this, null);
-        if (!notificationPublishService.offerNotification(packetReceived)) {
-            LOG.debug("Notification offer refused by notification service.");
+        final ConnectionAdapter connectionAdapter = this.getPrimaryConnectionContext().getConnectionAdapter();
+        if (connectionAdapter.isAutoRead()) {
+            if (!notificationPublishService.offerNotification(packetReceived)) {
+                LOG.debug("Notification offer refused by notification service.");
+                connectionAdapter.setAutoRead(false);
+                LOG.info("Throttling primary connection for {}", connectionAdapter.getRemoteAddress());
+                this.throttledConnectionsHolder.storeThrottledConnection(connectionAdapter);
+            }
         }
 
     }
index f50916481475f6de552dcb47e536dc4fc12bdb00..3589e71080f93b58ae1a0d9a68891ef33941e138 100644 (file)
@@ -33,6 +33,7 @@ import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
 import org.opendaylight.openflowplugin.api.ConnectionException;
 import org.opendaylight.openflowplugin.api.OFConstants;
 import org.opendaylight.openflowplugin.api.openflow.connection.ConnectionContext;
+import org.opendaylight.openflowplugin.api.openflow.connection.ThrottledConnectionsHolder;
 import org.opendaylight.openflowplugin.api.openflow.device.DeviceContext;
 import org.opendaylight.openflowplugin.api.openflow.device.DeviceManager;
 import org.opendaylight.openflowplugin.api.openflow.device.DeviceState;
@@ -47,6 +48,7 @@ import org.opendaylight.openflowplugin.api.openflow.md.core.TranslatorKey;
 import org.opendaylight.openflowplugin.api.openflow.statistics.ofpspecific.MessageIntelligenceAgency;
 import org.opendaylight.openflowplugin.impl.common.MultipartRequestInputFactory;
 import org.opendaylight.openflowplugin.impl.common.NodeStaticReplyTranslatorUtil;
+import org.opendaylight.openflowplugin.impl.connection.ThrottledConnectionsHolderImpl;
 import org.opendaylight.openflowplugin.impl.device.listener.OpenflowProtocolListenerFullImpl;
 import org.opendaylight.openflowplugin.impl.rpc.RequestContextImpl;
 import org.opendaylight.openflowplugin.impl.services.OFJResult2RequestCtxFuture;
@@ -115,6 +117,7 @@ public class DeviceManagerImpl implements DeviceManager, AutoCloseable {
 
     private final List<DeviceContext> deviceContexts = new ArrayList<DeviceContext>();
     private final MessageIntelligenceAgency messageIntelligenceAgency = new MessageIntelligenceAgencyImpl();
+    private final ThrottledConnectionsHolder throttledConnectionsHolder;
 
     public DeviceManagerImpl(@Nonnull final DataBroker dataBroker) {
         this.dataBroker = Preconditions.checkNotNull(dataBroker);
@@ -142,6 +145,8 @@ public class DeviceManagerImpl implements DeviceManager, AutoCloseable {
         };
         spyPool = new ScheduledThreadPoolExecutor(1);
         spyPool.scheduleAtFixedRate(messageIntelligenceAgency, spyRate, spyRate, TimeUnit.SECONDS);
+
+        throttledConnectionsHolder = new ThrottledConnectionsHolderImpl(hashedWheelTimer);
     }
 
     @Override
@@ -163,7 +168,7 @@ public class DeviceManagerImpl implements DeviceManager, AutoCloseable {
 
         final DeviceState deviceState = new DeviceStateImpl(connectionContext.getFeatures(), connectionContext.getNodeId());
 
-        final DeviceContext deviceContext = new DeviceContextImpl(connectionContext, deviceState, dataBroker, hashedWheelTimer, messageIntelligenceAgency);
+        final DeviceContext deviceContext = new DeviceContextImpl(connectionContext, deviceState, dataBroker, hashedWheelTimer, messageIntelligenceAgency, throttledConnectionsHolder);
 
         deviceContext.setNotificationService(notificationService);
         deviceContext.setNotificationPublishService(notificationPublishService);
index 1fa33862c682f55c5e7d5c0fb96c40d66a4346d1..c7a38adcb3f63394b4cdac3a4f0c5b42151f5d18 100644 (file)
@@ -25,6 +25,7 @@ import org.opendaylight.controller.md.sal.binding.api.ReadTransaction;
 import org.opendaylight.controller.md.sal.binding.api.WriteTransaction;
 import org.opendaylight.openflowplugin.api.OFConstants;
 import org.opendaylight.openflowplugin.api.openflow.connection.ConnectionContext;
+import org.opendaylight.openflowplugin.api.openflow.connection.ThrottledConnectionsHolder;
 import org.opendaylight.openflowplugin.api.openflow.device.DeviceState;
 import org.opendaylight.openflowplugin.api.openflow.device.RequestContext;
 import org.opendaylight.openflowplugin.api.openflow.device.Xid;
@@ -80,6 +81,8 @@ public class DeviceContextImplTest {
     HashedWheelTimer timer;
     @Mock
     MessageIntelligenceAgency messageIntelligenceAgency;
+    @Mock
+    ThrottledConnectionsHolder throttledConnectionsHolder;
 
     @Before
     public void setUp() {
@@ -92,29 +95,29 @@ public class DeviceContextImplTest {
         Mockito.when(txChainFactory.newWriteOnlyTransaction()).thenReturn(wTx);
         Mockito.when(dataBroker.newReadOnlyTransaction()).thenReturn(rTx);
 
-        deviceContext = new DeviceContextImpl(connectionContext, deviceState, dataBroker, timer, messageIntelligenceAgency);
+        deviceContext = new DeviceContextImpl(connectionContext, deviceState, dataBroker, timer, messageIntelligenceAgency,throttledConnectionsHolder);
         xid = deviceContext.getNextXid();
         xidMulti = deviceContext.getNextXid();
     }
 
     @Test(expected = NullPointerException.class)
     public void testDeviceContextImplConstructorNullConnectionContext() {
-        new DeviceContextImpl(null, deviceState, dataBroker, timer, messageIntelligenceAgency);
+        new DeviceContextImpl(null, deviceState, dataBroker, timer, messageIntelligenceAgency,throttledConnectionsHolder);
     }
 
     @Test(expected = NullPointerException.class)
     public void testDeviceContextImplConstructorNullDataBroker() {
-        new DeviceContextImpl(connectionContext, deviceState, null, timer, messageIntelligenceAgency);
+        new DeviceContextImpl(connectionContext, deviceState, null, timer, messageIntelligenceAgency,throttledConnectionsHolder);
     }
 
     @Test(expected = NullPointerException.class)
     public void testDeviceContextImplConstructorNullDeviceState() {
-        new DeviceContextImpl(connectionContext, null, dataBroker, timer, messageIntelligenceAgency);
+        new DeviceContextImpl(connectionContext, null, dataBroker, timer, messageIntelligenceAgency,throttledConnectionsHolder);
     }
 
     @Test(expected = NullPointerException.class)
     public void testDeviceContextImplConstructorNullTimer() {
-        new DeviceContextImpl(null, deviceState, dataBroker, null, messageIntelligenceAgency);
+        new DeviceContextImpl(null, deviceState, dataBroker, null, messageIntelligenceAgency,throttledConnectionsHolder);
     }
 
     @Test