--- /dev/null
+/*
+ * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.openflowplugin.api.openflow.connection;
+
+import org.opendaylight.openflowjava.protocol.api.connection.ConnectionAdapter;
+
+/**
+ * Created by Martin Bobak <mbobak@cisco.com> on 8.5.2015.
+ */
+public interface ThrottledConnectionsHolder {
+
+ void storeThrottledConnection(ConnectionAdapter connectionAdapter);
+}
import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
import org.opendaylight.openflowplugin.api.openflow.OpenFlowPluginTimer;
import org.opendaylight.openflowplugin.api.openflow.connection.ConnectionContext;
+import org.opendaylight.openflowplugin.api.openflow.connection.ThrottledConnectionsHolder;
import org.opendaylight.openflowplugin.api.openflow.device.handlers.DeviceContextClosedHandler;
import org.opendaylight.openflowplugin.api.openflow.device.handlers.DeviceDisconnectedHandler;
import org.opendaylight.openflowplugin.api.openflow.device.handlers.DeviceReplyProcessor;
--- /dev/null
+/*
+ * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.openflowplugin.impl.connection;
+
+import io.netty.util.HashedWheelTimer;
+import io.netty.util.Timeout;
+import io.netty.util.TimerTask;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.LinkedHashSet;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import org.opendaylight.openflowjava.protocol.api.connection.ConnectionAdapter;
+import org.opendaylight.openflowplugin.api.openflow.connection.ThrottledConnectionsHolder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Created by Martin Bobak <mbobak@cisco.com> on 8.5.2015.
+ */
+public class ThrottledConnectionsHolderImpl implements ThrottledConnectionsHolder, TimerTask {
+
+ private final Set<ConnectionAdapter> throttledConnections = Collections.synchronizedSet(new LinkedHashSet<ConnectionAdapter>());
+ private final HashedWheelTimer hashedWheelTimer;
+ private Timeout timeout;
+ private long delay = 100L;
+ private static final Logger LOG = LoggerFactory.getLogger(ThrottledConnectionsHolderImpl.class);
+
+ public ThrottledConnectionsHolderImpl(final HashedWheelTimer hashedWheelTimer) {
+ this.hashedWheelTimer = hashedWheelTimer;
+ }
+
+ @Override
+ public void storeThrottledConnection(final ConnectionAdapter connectionAdapter) {
+ throttledConnections.add(connectionAdapter);
+ LOG.info("Adding piece of throttle for {}", connectionAdapter.getRemoteAddress());
+ synchronized (this) {
+ if (null == timeout) {
+ scheduleTimeout();
+ }
+ }
+ }
+
+ private void scheduleTimeout() {
+ this.timeout = hashedWheelTimer.newTimeout(this, delay, TimeUnit.MILLISECONDS);
+ }
+
+ @Override
+ public void run(final Timeout timeout) throws Exception {
+ synchronized (this) {
+ this.timeout = null;
+ if (throttledConnections.isEmpty()) {
+ return;
+ }
+
+ final Iterator<ConnectionAdapter> iterator = throttledConnections.iterator();
+ if (iterator.hasNext()) {
+ ConnectionAdapter connectionAdapter = iterator.next();
+ iterator.remove();
+ connectionAdapter.setAutoRead(true);
+ LOG.info("Un - throttling primary connection for {}", connectionAdapter.getRemoteAddress());
+ }
+
+ scheduleTimeout();
+ }
+ }
+}
import org.opendaylight.controller.md.sal.binding.api.NotificationService;
import org.opendaylight.controller.md.sal.binding.api.ReadTransaction;
import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
+import org.opendaylight.openflowjava.protocol.api.connection.ConnectionAdapter;
import org.opendaylight.openflowplugin.api.openflow.connection.ConnectionContext;
+import org.opendaylight.openflowplugin.api.openflow.connection.ThrottledConnectionsHolder;
import org.opendaylight.openflowplugin.api.openflow.device.DeviceContext;
import org.opendaylight.openflowplugin.api.openflow.device.DeviceState;
import org.opendaylight.openflowplugin.api.openflow.device.MessageTranslator;
private DeviceDisconnectedHandler deviceDisconnectedHandler;
private final List<DeviceContextClosedHandler> closeHandlers = new ArrayList<>();
private NotificationPublishService notificationPublishService;
+ private final ThrottledConnectionsHolder throttledConnectionsHolder;
@VisibleForTesting
@Nonnull final DeviceState deviceState,
@Nonnull final DataBroker dataBroker,
@Nonnull final HashedWheelTimer hashedWheelTimer,
- @Nonnull final MessageSpy _messageSpy) {
+ @Nonnull final MessageSpy _messageSpy,
+ @Nonnull final ThrottledConnectionsHolder throttledConnectionsHolder) {
this.primaryConnectionContext = Preconditions.checkNotNull(primaryConnectionContext);
this.deviceState = Preconditions.checkNotNull(deviceState);
this.dataBroker = Preconditions.checkNotNull(dataBroker);
deviceGroupRegistry = new DeviceGroupRegistryImpl();
deviceMeterRegistry = new DeviceMeterRegistryImpl();
messageSpy = _messageSpy;
+ this.throttledConnectionsHolder = throttledConnectionsHolder;
}
/**
final TranslatorKey translatorKey = new TranslatorKey(packetInMessage.getVersion(), PacketIn.class.getName());
final MessageTranslator<PacketInMessage, PacketReceived> messageTranslator = translatorLibrary.lookupTranslator(translatorKey);
final PacketReceived packetReceived = messageTranslator.translate(packetInMessage, this, null);
- if (!notificationPublishService.offerNotification(packetReceived)) {
- LOG.debug("Notification offer refused by notification service.");
+ final ConnectionAdapter connectionAdapter = this.getPrimaryConnectionContext().getConnectionAdapter();
+ if (connectionAdapter.isAutoRead()) {
+ if (!notificationPublishService.offerNotification(packetReceived)) {
+ LOG.debug("Notification offer refused by notification service.");
+ connectionAdapter.setAutoRead(false);
+ LOG.info("Throttling primary connection for {}", connectionAdapter.getRemoteAddress());
+ this.throttledConnectionsHolder.storeThrottledConnection(connectionAdapter);
+ }
}
}
import org.opendaylight.openflowplugin.api.ConnectionException;
import org.opendaylight.openflowplugin.api.OFConstants;
import org.opendaylight.openflowplugin.api.openflow.connection.ConnectionContext;
+import org.opendaylight.openflowplugin.api.openflow.connection.ThrottledConnectionsHolder;
import org.opendaylight.openflowplugin.api.openflow.device.DeviceContext;
import org.opendaylight.openflowplugin.api.openflow.device.DeviceManager;
import org.opendaylight.openflowplugin.api.openflow.device.DeviceState;
import org.opendaylight.openflowplugin.api.openflow.statistics.ofpspecific.MessageIntelligenceAgency;
import org.opendaylight.openflowplugin.impl.common.MultipartRequestInputFactory;
import org.opendaylight.openflowplugin.impl.common.NodeStaticReplyTranslatorUtil;
+import org.opendaylight.openflowplugin.impl.connection.ThrottledConnectionsHolderImpl;
import org.opendaylight.openflowplugin.impl.device.listener.OpenflowProtocolListenerFullImpl;
import org.opendaylight.openflowplugin.impl.rpc.RequestContextImpl;
import org.opendaylight.openflowplugin.impl.services.OFJResult2RequestCtxFuture;
private final List<DeviceContext> deviceContexts = new ArrayList<DeviceContext>();
private final MessageIntelligenceAgency messageIntelligenceAgency = new MessageIntelligenceAgencyImpl();
+ private final ThrottledConnectionsHolder throttledConnectionsHolder;
public DeviceManagerImpl(@Nonnull final DataBroker dataBroker) {
this.dataBroker = Preconditions.checkNotNull(dataBroker);
};
spyPool = new ScheduledThreadPoolExecutor(1);
spyPool.scheduleAtFixedRate(messageIntelligenceAgency, spyRate, spyRate, TimeUnit.SECONDS);
+
+ throttledConnectionsHolder = new ThrottledConnectionsHolderImpl(hashedWheelTimer);
}
@Override
final DeviceState deviceState = new DeviceStateImpl(connectionContext.getFeatures(), connectionContext.getNodeId());
- final DeviceContext deviceContext = new DeviceContextImpl(connectionContext, deviceState, dataBroker, hashedWheelTimer, messageIntelligenceAgency);
+ final DeviceContext deviceContext = new DeviceContextImpl(connectionContext, deviceState, dataBroker, hashedWheelTimer, messageIntelligenceAgency, throttledConnectionsHolder);
deviceContext.setNotificationService(notificationService);
deviceContext.setNotificationPublishService(notificationPublishService);
import org.opendaylight.controller.md.sal.binding.api.WriteTransaction;
import org.opendaylight.openflowplugin.api.OFConstants;
import org.opendaylight.openflowplugin.api.openflow.connection.ConnectionContext;
+import org.opendaylight.openflowplugin.api.openflow.connection.ThrottledConnectionsHolder;
import org.opendaylight.openflowplugin.api.openflow.device.DeviceState;
import org.opendaylight.openflowplugin.api.openflow.device.RequestContext;
import org.opendaylight.openflowplugin.api.openflow.device.Xid;
HashedWheelTimer timer;
@Mock
MessageIntelligenceAgency messageIntelligenceAgency;
+ @Mock
+ ThrottledConnectionsHolder throttledConnectionsHolder;
@Before
public void setUp() {
Mockito.when(txChainFactory.newWriteOnlyTransaction()).thenReturn(wTx);
Mockito.when(dataBroker.newReadOnlyTransaction()).thenReturn(rTx);
- deviceContext = new DeviceContextImpl(connectionContext, deviceState, dataBroker, timer, messageIntelligenceAgency);
+ deviceContext = new DeviceContextImpl(connectionContext, deviceState, dataBroker, timer, messageIntelligenceAgency,throttledConnectionsHolder);
xid = deviceContext.getNextXid();
xidMulti = deviceContext.getNextXid();
}
@Test(expected = NullPointerException.class)
public void testDeviceContextImplConstructorNullConnectionContext() {
- new DeviceContextImpl(null, deviceState, dataBroker, timer, messageIntelligenceAgency);
+ new DeviceContextImpl(null, deviceState, dataBroker, timer, messageIntelligenceAgency,throttledConnectionsHolder);
}
@Test(expected = NullPointerException.class)
public void testDeviceContextImplConstructorNullDataBroker() {
- new DeviceContextImpl(connectionContext, deviceState, null, timer, messageIntelligenceAgency);
+ new DeviceContextImpl(connectionContext, deviceState, null, timer, messageIntelligenceAgency,throttledConnectionsHolder);
}
@Test(expected = NullPointerException.class)
public void testDeviceContextImplConstructorNullDeviceState() {
- new DeviceContextImpl(connectionContext, null, dataBroker, timer, messageIntelligenceAgency);
+ new DeviceContextImpl(connectionContext, null, dataBroker, timer, messageIntelligenceAgency,throttledConnectionsHolder);
}
@Test(expected = NullPointerException.class)
public void testDeviceContextImplConstructorNullTimer() {
- new DeviceContextImpl(null, deviceState, dataBroker, null, messageIntelligenceAgency);
+ new DeviceContextImpl(null, deviceState, dataBroker, null, messageIntelligenceAgency,throttledConnectionsHolder);
}
@Test