--- /dev/null
+package org.opendaylight.openflowplugin.api.openflow.md.queue;
+
+public interface WaterMarkListener {
+
+ /**
+ * When HighWaterMark reached and currently not flooded
+ */
+ void onHighWaterMark();
+
+ /**
+ * When LowWaterMark reached and currently flooded
+ */
+ void onLowWaterMark();
+}
--- /dev/null
+package org.opendaylight.openflowplugin.api.openflow.md.queue;
+
+import org.opendaylight.openflowjava.protocol.api.connection.ConnectionAdapter;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.base.Preconditions;
+
+public class WaterMarkListenerImpl implements WaterMarkListener {
+
+ private static final Logger LOG = LoggerFactory
+ .getLogger(WaterMarkListenerImpl.class);
+
+ private ConnectionAdapter connectionAdapter;
+
+ public WaterMarkListenerImpl(ConnectionAdapter connectionAdapter) {
+ this.connectionAdapter = Preconditions.checkNotNull(connectionAdapter);
+ }
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see org.opendaylight.openflowplugin.api.openflow.md.queue.QueueListener#
+ * onHighWaterMark
+ * (org.opendaylight.openflowjava.protocol.api.connection.ConnectionAdapter)
+ */
+ @Override
+ public void onHighWaterMark() {
+ connectionAdapter.setAutoRead(false);
+ LOG.debug("AutoRead is set on false.");
+ }
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see org.opendaylight.openflowplugin.api.openflow.md.queue.QueueListener#
+ * onLowWaterMark
+ * (org.opendaylight.openflowjava.protocol.api.connection.ConnectionAdapter)
+ */
+ @Override
+ public void onLowWaterMark() {
+ connectionAdapter.setAutoRead(true);
+ LOG.debug("AutoRead is set on true.");
+ }
+}
package org.opendaylight.openflowplugin.openflow.md.core;
-import com.google.common.util.concurrent.Futures;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
+
import org.opendaylight.openflowjava.protocol.api.connection.ConnectionAdapter;
import org.opendaylight.openflowjava.protocol.api.connection.ConnectionReadyListener;
import org.opendaylight.openflowplugin.api.OFConstants;
import org.opendaylight.openflowplugin.api.openflow.md.queue.QueueKeeper;
import org.opendaylight.openflowplugin.api.openflow.md.queue.QueueKeeper.QueueType;
import org.opendaylight.openflowplugin.api.openflow.md.queue.QueueProcessor;
+import org.opendaylight.openflowplugin.api.openflow.md.queue.WaterMarkListener;
+import org.opendaylight.openflowplugin.api.openflow.md.queue.WaterMarkListenerImpl;
import org.opendaylight.openflowplugin.openflow.md.core.session.OFSessionUtil;
import org.opendaylight.openflowplugin.openflow.md.core.session.PortFeaturesUtil;
import org.opendaylight.openflowplugin.openflow.md.queue.QueueKeeperFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import com.google.common.util.concurrent.Futures;
+
/**
* @author mirehak
*/
public class ConnectionConductorImpl implements OpenflowProtocolListener,
- SystemNotificationsListener, ConnectionConductor, ConnectionReadyListener, HandshakeListener, NotificationEnqueuer, AutoCloseable {
+ SystemNotificationsListener, ConnectionConductor,
+ ConnectionReadyListener, HandshakeListener, NotificationEnqueuer,
+ AutoCloseable {
/**
* ingress queue limit
protected static final Logger LOG = LoggerFactory
.getLogger(ConnectionConductorImpl.class);
- /* variable to make BitMap-based negotiation enabled / disabled.
- * it will help while testing and isolating issues related to processing of
- * BitMaps from switches.
+ /*
+ * variable to make BitMap-based negotiation enabled / disabled. it will
+ * help while testing and isolating issues related to processing of BitMaps
+ * from switches.
*/
private boolean isBitmapNegotiationEnable = true;
protected ErrorHandler errorHandler;
private int ingressMaxQueueSize;
-
/**
* @param connectionAdapter
*/
/**
* @param connectionAdapter
- * @param ingressMaxQueueSize ingress queue limit (blocking)
+ * @param ingressMaxQueueSize
+ * ingress queue limit (blocking)
*/
- public ConnectionConductorImpl(ConnectionAdapter connectionAdapter, int ingressMaxQueueSize) {
+ public ConnectionConductorImpl(ConnectionAdapter connectionAdapter,
+ int ingressMaxQueueSize) {
this.connectionAdapter = connectionAdapter;
this.ingressMaxQueueSize = ingressMaxQueueSize;
conductorState = CONDUCTOR_STATE.HANDSHAKING;
firstHelloProcessed = false;
handshakeManager = new HandshakeManagerImpl(connectionAdapter,
- ConnectionConductor.versionOrder.get(0), ConnectionConductor.versionOrder);
+ ConnectionConductor.versionOrder.get(0),
+ ConnectionConductor.versionOrder);
handshakeManager.setUseVersionBitmap(isBitmapNegotiationEnable);
handshakeManager.setHandshakeListener(this);
portFeaturesUtils = PortFeaturesUtil.getInstance();
@Override
public void init() {
int handshakeThreadLimit = 1;
- hsPool = new ThreadPoolLoggingExecutor(handshakeThreadLimit, handshakeThreadLimit, 0L,
- TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(),
- "OFHandshake-" + conductorId);
+ hsPool = new ThreadPoolLoggingExecutor(handshakeThreadLimit,
+ handshakeThreadLimit, 0L, TimeUnit.MILLISECONDS,
+ new LinkedBlockingQueue<Runnable>(), "OFHandshake-"
+ + conductorId);
connectionAdapter.setMessageListener(this);
connectionAdapter.setSystemListener(this);
connectionAdapter.setConnectionReadyListener(this);
- queue = QueueKeeperFactory.createFairQueueKeeper(queueProcessor, ingressMaxQueueSize);
+ WaterMarkListener waterMarkListener = new WaterMarkListenerImpl(
+ connectionAdapter);
+ queue = QueueKeeperFactory.createFairQueueKeeper(queueProcessor,
+ ingressMaxQueueSize, waterMarkListener);
}
@Override
- public void setQueueProcessor(QueueProcessor<OfHeader, DataObject> queueProcessor) {
+ public void setQueueProcessor(
+ QueueProcessor<OfHeader, DataObject> queueProcessor) {
this.queueProcessor = queueProcessor;
}
/**
- * @param errorHandler the errorHandler to set
+ * @param errorHandler
+ * the errorHandler to set
*/
@Override
public void setErrorHandler(ErrorHandler errorHandler) {
new Thread(new Runnable() {
@Override
public void run() {
- LOG.debug("echo request received: " + echoRequestMessage.getXid());
+ LOG.debug("echo request received: "
+ + echoRequestMessage.getXid());
EchoReplyInputBuilder builder = new EchoReplyInputBuilder();
builder.setVersion(echoRequestMessage.getVersion());
builder.setXid(echoRequestMessage.getXid());
enqueueMessage(errorMessage);
}
-
/**
* @param message
*/
/**
* @param message
- * @param queueType enqueue type
+ * @param queueType
+ * enqueue type
*/
private void enqueueMessage(OfHeader message, QueueType queueType) {
queue.push(message, this, queueType);
enqueueMessage(message);
}
-
/**
- * version negotiation happened as per following steps:
- * 1. If HelloMessage version field has same version, continue connection processing.
- * If HelloMessage version is lower than supported versions, just disconnect.
+ * version negotiation happened as per following steps: 1. If HelloMessage
+ * version field has same version, continue connection processing. If
+ * HelloMessage version is lower than supported versions, just disconnect.
* 2. If HelloMessage contains bitmap and common version found in bitmap
- * then continue connection processing. if no common version found, just disconnect.
- * 3. If HelloMessage version is not supported, send HelloMessage with lower supported version.
- * 4. If Hello message received again with not supported version, just disconnect.
+ * then continue connection processing. if no common version found, just
+ * disconnect. 3. If HelloMessage version is not supported, send
+ * HelloMessage with lower supported version. 4. If Hello message received
+ * again with not supported version, just disconnect.
*/
@Override
public void onHelloMessage(final HelloMessage hello) {
Boolean portBandwidth = portFeaturesUtils.getPortBandwidth(msg);
if (portBandwidth == null) {
- LOG.debug("can't get bandwidth info from port: {}, aborting port update", msg.toString());
+ LOG.debug(
+ "can't get bandwidth info from port: {}, aborting port update",
+ msg.toString());
} else {
this.getSessionContext().getPhysicalPorts().put(portNumber, msg);
- this.getSessionContext().getPortsBandwidth().put(portNumber, portBandwidth);
+ this.getSessionContext().getPortsBandwidth()
+ .put(portNumber, portBandwidth);
}
}
@Override
public void run() {
if (!CONDUCTOR_STATE.WORKING.equals(getConductorState())) {
- // idle state in any other conductorState than WORKING means real
- // problem and wont be handled by echoReply, but disconnection
+ // idle state in any other conductorState than WORKING means
+ // real
+ // problem and wont be handled by echoReply, but
+ // disconnection
disconnect();
- OFSessionUtil.getSessionManager().invalidateOnDisconnect(ConnectionConductorImpl.this);
+ OFSessionUtil.getSessionManager().invalidateOnDisconnect(
+ ConnectionConductorImpl.this);
} else {
- LOG.debug("first idle state occured, sessionCtx={}|auxId={}", sessionContext, auxiliaryKey);
+ LOG.debug(
+ "first idle state occured, sessionCtx={}|auxId={}",
+ sessionContext, auxiliaryKey);
EchoInputBuilder builder = new EchoInputBuilder();
builder.setVersion(getVersion());
builder.setXid(getSessionContext().getNextXid());
.echo(builder.build());
try {
- RpcResult<EchoOutput> echoReplyValue = echoReplyFuture.get(getMaxTimeout(),
- getMaxTimeoutUnit());
+ RpcResult<EchoOutput> echoReplyValue = echoReplyFuture
+ .get(getMaxTimeout(), getMaxTimeoutUnit());
if (echoReplyValue.isSuccessful()) {
setConductorState(CONDUCTOR_STATE.WORKING);
} else {
- for (RpcError replyError : echoReplyValue.getErrors()) {
+ for (RpcError replyError : echoReplyValue
+ .getErrors()) {
Throwable cause = replyError.getCause();
LOG.error(
"while receiving echoReply in TIMEOUTING state: "
+ cause.getMessage(), cause);
}
- //switch issue occurred
+ // switch issue occurred
throw new Exception("switch issue occurred");
}
} catch (Exception e) {
LOG.error("while waiting for echoReply in TIMEOUTING state: "
+ e.getMessage());
errorHandler.handleException(e, sessionContext);
- //switch is not responding
+ // switch is not responding
disconnect();
- OFSessionUtil.getSessionManager().invalidateOnDisconnect(ConnectionConductorImpl.this);
+ OFSessionUtil.getSessionManager()
+ .invalidateOnDisconnect(
+ ConnectionConductorImpl.this);
}
}
}
}
/**
- * @param conductorState the connectionState to set
+ * @param conductorState
+ * the connectionState to set
*/
@Override
public void setConductorState(CONDUCTOR_STATE conductorState) {
@Override
public Future<Boolean> disconnect() {
- LOG.trace("disconnecting: sessionCtx={}|auxId={}", sessionContext, auxiliaryKey);
+ LOG.trace("disconnecting: sessionCtx={}|auxId={}", sessionContext,
+ auxiliaryKey);
Future<Boolean> result = null;
if (connectionAdapter.isAlive()) {
@Override
public void onHandshakeSuccessfull(GetFeaturesOutput featureOutput,
- Short negotiatedVersion) {
+ Short negotiatedVersion) {
postHandshakeBasic(featureOutput, negotiatedVersion);
// post-handshake actions
/**
* used by tests
- *
+ *
* @param featureOutput
* @param negotiatedVersion
*/
protected void postHandshakeBasic(GetFeaturesOutput featureOutput,
- Short negotiatedVersion) {
+ Short negotiatedVersion) {
version = negotiatedVersion;
if (version == OFConstants.OFP_VERSION_1_0) {
- // Because the GetFeaturesOutput contains information about the port
- // in OF1.0 (that we would otherwise get from the PortDesc) we have to pass
- // it up for parsing to convert into a NodeConnectorUpdate
+ // Because the GetFeaturesOutput contains information about the port
+ // in OF1.0 (that we would otherwise get from the PortDesc) we have
+ // to pass
+ // it up for parsing to convert into a NodeConnectorUpdate
//
- // BUG-1988 - this must be the first item in queue in order not to get behind link-up message
+ // BUG-1988 - this must be the first item in queue in order not to
+ // get behind link-up message
enqueueMessage(featureOutput);
}
}
/*
- * Send an OFPMP_DESC request message to the switch
+ * Send an OFPMP_DESC request message to the switch
*/
private void requestDesc() {
MultipartRequestInputBuilder builder = new MultipartRequestInputBuilder();
builder.setType(MultipartType.OFPMPDESC);
builder.setVersion(getVersion());
builder.setFlags(new MultipartRequestFlags(false));
- builder.setMultipartRequestBody(new MultipartRequestDescCaseBuilder().build());
+ builder.setMultipartRequestBody(new MultipartRequestDescCaseBuilder()
+ .build());
builder.setXid(getSessionContext().getNextXid());
getConnectionAdapter().multipartRequest(builder.build());
}
builder.setType(MultipartType.OFPMPPORTDESC);
builder.setVersion(getVersion());
builder.setFlags(new MultipartRequestFlags(false));
- builder.setMultipartRequestBody(new MultipartRequestPortDescCaseBuilder().build());
+ builder.setMultipartRequestBody(new MultipartRequestPortDescCaseBuilder()
+ .build());
builder.setXid(getSessionContext().getNextXid());
getConnectionAdapter().multipartRequest(builder.build());
}
mprInput.setFlags(new MultipartRequestFlags(false));
mprInput.setXid(getSessionContext().getNextXid());
- MultipartRequestGroupFeaturesCaseBuilder mprGroupFeaturesBuild =
- new MultipartRequestGroupFeaturesCaseBuilder();
+ MultipartRequestGroupFeaturesCaseBuilder mprGroupFeaturesBuild = new MultipartRequestGroupFeaturesCaseBuilder();
mprInput.setMultipartRequestBody(mprGroupFeaturesBuild.build());
- LOG.debug("Send group features statistics request :{}", mprGroupFeaturesBuild);
+ LOG.debug("Send group features statistics request :{}",
+ mprGroupFeaturesBuild);
getConnectionAdapter().multipartRequest(mprInput.build());
}
mprInput.setFlags(new MultipartRequestFlags(false));
mprInput.setXid(getSessionContext().getNextXid());
- MultipartRequestMeterFeaturesCaseBuilder mprMeterFeaturesBuild =
- new MultipartRequestMeterFeaturesCaseBuilder();
+ MultipartRequestMeterFeaturesCaseBuilder mprMeterFeaturesBuild = new MultipartRequestMeterFeaturesCaseBuilder();
mprInput.setMultipartRequestBody(mprMeterFeaturesBuild.build());
- LOG.debug("Send meter features statistics request :{}", mprMeterFeaturesBuild);
+ LOG.debug("Send meter features statistics request :{}",
+ mprMeterFeaturesBuild);
getConnectionAdapter().multipartRequest(mprInput.build());
}
/**
- * @param isBitmapNegotiationEnable the isBitmapNegotiationEnable to set
+ * @param isBitmapNegotiationEnable
+ * the isBitmapNegotiationEnable to set
*/
- public void setBitmapNegotiationEnable(
- boolean isBitmapNegotiationEnable) {
+ public void setBitmapNegotiationEnable(boolean isBitmapNegotiationEnable) {
this.isBitmapNegotiationEnable = isBitmapNegotiationEnable;
}
import org.opendaylight.openflowplugin.api.openflow.md.queue.MessageSourcePollRegistrator;
import org.opendaylight.openflowplugin.api.openflow.md.queue.QueueKeeper;
+import org.opendaylight.openflowplugin.api.openflow.md.queue.WaterMarkListener;
import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.OfHeader;
/**
- * factory for {@link org.opendaylight.openflowplugin.api.openflow.md.queue.QueueKeeper} implementations
+ * factory for
+ * {@link org.opendaylight.openflowplugin.api.openflow.md.queue.QueueKeeper}
+ * implementations
*/
public abstract class QueueKeeperFactory {
-
+
/**
- * @param sourceRegistrator
- * @param capacity blocking queue capacity
- * @return fair reading implementation of {@link org.opendaylight.openflowplugin.api.openflow.md.queue.QueueKeeper} (not registered = not started yet)
+ * @param sourceRegistrator
+ * @param capacity
+ * blocking queue capacity
+ * @param waterMarkListener
+ * @return fair reading implementation of
+ * {@link org.opendaylight.openflowplugin.api.openflow.md.queue.QueueKeeper}
+ * (not registered = not started yet)
*/
public static QueueKeeper<OfHeader> createFairQueueKeeper(
- MessageSourcePollRegistrator<QueueKeeper<OfHeader>> sourceRegistrator, int capacity) {
+ MessageSourcePollRegistrator<QueueKeeper<OfHeader>> sourceRegistrator,
+ int capacity, WaterMarkListener waterMarkListener) {
QueueKeeperFairImpl queueKeeper = new QueueKeeperFairImpl();
queueKeeper.setCapacity(capacity);
queueKeeper.setHarvesterHandle(sourceRegistrator.getHarvesterHandle());
+ queueKeeper.setWaterMarkListener(waterMarkListener);
queueKeeper.init();
-
+
return queueKeeper;
}
/**
- * register queue by harvester, start processing it. Use {@link QueueKeeperFairImpl#close()} to kill the queue and stop processing.
+ * register queue by harvester, start processing it. Use
+ * {@link QueueKeeperFairImpl#close()} to kill the queue and stop
+ * processing.
+ *
* @param sourceRegistrator
* @param queueKeeper
*/
public static <V> void plugQueue(
MessageSourcePollRegistrator<QueueKeeper<V>> sourceRegistrator,
QueueKeeper<V> queueKeeper) {
- AutoCloseable registration = sourceRegistrator.registerMessageSource(queueKeeper);
+ AutoCloseable registration = sourceRegistrator
+ .registerMessageSource(queueKeeper);
queueKeeper.setPollRegistration(registration);
sourceRegistrator.getHarvesterHandle().ping();
}
import org.opendaylight.openflowplugin.api.openflow.md.queue.HarvesterHandle;
import org.opendaylight.openflowplugin.api.openflow.md.queue.QueueItem;
import org.opendaylight.openflowplugin.api.openflow.md.queue.QueueKeeper;
+import org.opendaylight.openflowplugin.api.openflow.md.queue.WaterMarkListener;
import org.opendaylight.openflowplugin.api.openflow.md.util.PollableQueuesPriorityZipper;
import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.OfHeader;
import org.slf4j.Logger;
private HarvesterHandle harvesterHandle;
private PollableQueuesPriorityZipper<QueueItem<OfHeader>> queueZipper;
+ private WaterMarkListener waterMarkListener;
+
@Override
public void close() throws Exception {
Preconditions.checkNotNull(pollRegistration,
*/
@Override
public QueueItem<OfHeader> poll() {
- QueueItem<OfHeader> nextQueueItem = queueZipper.poll();
- return nextQueueItem;
+ return queueZipper.poll();
}
/**
* init blocking queue
*/
public void init() {
+ Preconditions.checkNotNull(waterMarkListener);
queueUnordered = new ArrayBlockingQueue<>(capacity);
queueDefault = new ArrayBlockingQueue<>(capacity);
+ WrapperQueueImpl<QueueItem<OfHeader>> wrapperQueue = new WrapperQueueImpl<>(
+ capacity, queueDefault, waterMarkListener);
queueZipper = new PollableQueuesPriorityZipper<>();
queueZipper.addSource(queueUnordered);
- queueZipper.setPrioritizedSource(queueDefault);
+ queueZipper.setPrioritizedSource(wrapperQueue);
+ }
+
+ public void setWaterMarkListener(WaterMarkListener waterMarkListener) {
+ this.waterMarkListener = waterMarkListener;
}
/**
--- /dev/null
+package org.opendaylight.openflowplugin.openflow.md.queue;
+
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.Queue;
+
+import org.opendaylight.openflowplugin.api.openflow.md.queue.QueueItem;
+import org.opendaylight.openflowplugin.api.openflow.md.queue.WaterMarkListener;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.base.Preconditions;
+
+public class WrapperQueueImpl<E> implements Queue<E> {
+
+ protected static final Logger LOG = LoggerFactory
+ .getLogger(WrapperQueueImpl.class);
+
+ private int lowWaterMark;
+ private int highWaterMark;
+
+ private WaterMarkListener queueListenerMark;
+
+ private Queue<E> queueDefault;
+
+ private boolean flooded;
+
+ /**
+ * @param capacity
+ * @param queueZipper
+ */
+ public WrapperQueueImpl(int capacity, Queue<E> queueDefault,
+ WaterMarkListener queueListenerMark) {
+ this.queueListenerMark = queueListenerMark;
+ this.queueDefault = Preconditions.checkNotNull(queueDefault);
+
+ this.highWaterMark = (int) (capacity * 0.8);
+ this.lowWaterMark = (int) (capacity * 0.65);
+ }
+
+ /**
+ * Marking checks size of {@link #queueDefault} and on the basis of this is
+ * set autoRead
+ */
+ private void marking() {
+ if (queueDefault.size() >= highWaterMark && !flooded) {
+ queueListenerMark.onHighWaterMark();
+ flooded = true;
+ } else if (queueDefault.size() <= lowWaterMark && flooded) {
+ queueListenerMark.onLowWaterMark();
+ flooded = false;
+ }
+ }
+
+ /**
+ * @return true if flooded
+ */
+ public boolean isFlooded() {
+ return flooded;
+ }
+
+ /**
+ * poll {@link QueueItem} and call {@link #marking()} for check marks and
+ * set autoRead if it need it
+ *
+ * @return polled item
+ */
+ public E poll() {
+ E nextQueueItem = queueDefault.poll();
+ marking();
+ return nextQueueItem;
+ }
+
+ public boolean add(E e) {
+ return queueDefault.add(e);
+ }
+
+ public int size() {
+ return queueDefault.size();
+ }
+
+ public boolean isEmpty() {
+ return queueDefault.isEmpty();
+ }
+
+ public boolean contains(Object o) {
+ return queueDefault.contains(o);
+ }
+
+ public boolean offer(E e) {
+ boolean enqueueResult = queueDefault.offer(e);
+ marking();
+ return enqueueResult;
+ }
+
+ public Iterator<E> iterator() {
+ return queueDefault.iterator();
+ }
+
+ public E remove() {
+ return queueDefault.remove();
+ }
+
+ public Object[] toArray() {
+ return queueDefault.toArray();
+ }
+
+ public E element() {
+ return queueDefault.element();
+ }
+
+ public E peek() {
+ return queueDefault.peek();
+ }
+
+ public <T> T[] toArray(T[] a) {
+ return queueDefault.toArray(a);
+ }
+
+ public boolean remove(Object o) {
+ return queueDefault.remove(o);
+ }
+
+ public boolean containsAll(Collection<?> c) {
+ return queueDefault.containsAll(c);
+ }
+
+ public boolean addAll(Collection<? extends E> c) {
+ return queueDefault.addAll(c);
+ }
+
+ public boolean removeAll(Collection<?> c) {
+ return queueDefault.removeAll(c);
+ }
+
+ public boolean retainAll(Collection<?> c) {
+ return queueDefault.retainAll(c);
+ }
+
+ public void clear() {
+ queueDefault.clear();
+ }
+
+ public boolean equals(Object o) {
+ return queueDefault.equals(o);
+ }
+
+ public int hashCode() {
+ return queueDefault.hashCode();
+ }
+
+}
private int planItemCounter;
+ private boolean autoRead = true;
+
/**
* default ctor
*/
}
@Override
- public synchronized Future<RpcResult<BarrierOutput>> barrier(BarrierInput arg0) {
+ public synchronized Future<RpcResult<BarrierOutput>> barrier(
+ BarrierInput arg0) {
checkRpcAndNext(arg0, "barrier");
SettableFuture<RpcResult<BarrierOutput>> result = createAndRegisterRpcResult(arg0);
return result;
}
@Override
- public synchronized Future<RpcResult<GetAsyncOutput>> getAsync(GetAsyncInput arg0) {
+ public synchronized Future<RpcResult<GetAsyncOutput>> getAsync(
+ GetAsyncInput arg0) {
checkRpcAndNext(arg0, "echo");
Future<RpcResult<GetAsyncOutput>> result = createAndRegisterRpcResult(arg0);
return result;
}
@Override
- public synchronized Future<RpcResult<GetConfigOutput>> getConfig(GetConfigInput arg0) {
+ public synchronized Future<RpcResult<GetConfigOutput>> getConfig(
+ GetConfigInput arg0) {
checkRpcAndNext(arg0, "echo");
Future<RpcResult<GetConfigOutput>> result = createAndRegisterRpcResult(arg0);
return result;
@Override
public void checkListeners() {
- if (ofListener == null || systemListener == null || connectionReadyListener == null) {
+ if (ofListener == null || systemListener == null
+ || connectionReadyListener == null) {
occuredExceptions
.add(new IllegalStateException("missing listeners"));
}
throw new IllegalStateException("eventPlan already depleted");
}
- LOG.debug("checking rpc: name={}, ver={}, xid={}", rpcName, rpcInput.getVersion(), rpcInput.getXid());
+ LOG.debug("checking rpc: name={}, ver={}, xid={}", rpcName,
+ rpcInput.getVersion(), rpcInput.getXid());
if (!(eventPlan.peek() instanceof SwitchTestWaitForRpcEvent)
&& !(eventPlan.peek() instanceof SwitchTestWaitForAllEvent)) {
if (eventPlan.peek() instanceof SwitchTestNotificationEvent) {
- SwitchTestNotificationEvent notifEvent = (SwitchTestNotificationEvent) (eventPlan.peek());
- msg = "expected [notification: " +notifEvent.getPlannedNotification()+ "], got [" + rpcInput.getClass().getSimpleName()
- + "]";
+ SwitchTestNotificationEvent notifEvent = (SwitchTestNotificationEvent) (eventPlan
+ .peek());
+ msg = "expected [notification: "
+ + notifEvent.getPlannedNotification() + "], got ["
+ + rpcInput.getClass().getSimpleName() + "]";
} else if (eventPlan.peek() instanceof SwitchTestRcpResponseEvent) {
- SwitchTestRcpResponseEvent rpcEvent = (SwitchTestRcpResponseEvent) (eventPlan.peek());
- msg = "expected [rpc: " +rpcEvent.getPlannedRpcResponse()+ "], got [" + rpcInput.getClass().getSimpleName()
+ SwitchTestRcpResponseEvent rpcEvent = (SwitchTestRcpResponseEvent) (eventPlan
+ .peek());
+ msg = "expected [rpc: " + rpcEvent.getPlannedRpcResponse()
+ + "], got [" + rpcInput.getClass().getSimpleName()
+ "]";
}
} else {
if (eventPlan.peek() instanceof SwitchTestWaitForAllEvent) {
SwitchTestWaitForAllEvent switchTestWaitForAll = (SwitchTestWaitForAllEvent) eventPlan
.peek();
- Set<SwitchTestWaitForRpcEvent> eventBag = switchTestWaitForAll.getWaitEventBag();
+ Set<SwitchTestWaitForRpcEvent> eventBag = switchTestWaitForAll
+ .getWaitEventBag();
List<String> msgLot = new ArrayList<>();
if (eventBag == null || eventBag.isEmpty()) {
} else {
finished = false;
for (SwitchTestWaitForRpcEvent switchTestWaitForRpc : eventBag) {
- String msgPart = checkSingleRpcContent(rpcInput, rpcName, switchTestWaitForRpc);
+ String msgPart = checkSingleRpcContent(rpcInput,
+ rpcName, switchTestWaitForRpc);
if (msgPart != null) {
msgLot.add(msgPart);
} else if (eventPlan.peek() instanceof SwitchTestWaitForRpcEvent) {
SwitchTestWaitForRpcEvent switchTestRpcEvent = (SwitchTestWaitForRpcEvent) eventPlan
.peek();
- msg = checkSingleRpcContent(rpcInput, rpcName, switchTestRpcEvent);
+ msg = checkSingleRpcContent(rpcInput, rpcName,
+ switchTestRpcEvent);
}
}
if (msg != null) {
LOG.debug("rpc check .. FAILED: " + msg);
- occuredExceptions.add(new IllegalArgumentException("step:"+planItemCounter+" | "+msg));
+ occuredExceptions.add(new IllegalArgumentException("step:"
+ + planItemCounter + " | " + msg));
} else {
LOG.debug("rpc check .. OK");
}
* @param switchTestWaitForRpc
* @return
*/
- private static String checkSingleRpcContent(OfHeader rpcInput, String rpcName,
- SwitchTestWaitForRpcEvent switchTestWaitForRpc) {
+ private static String checkSingleRpcContent(OfHeader rpcInput,
+ String rpcName, SwitchTestWaitForRpcEvent switchTestWaitForRpc) {
String failureMsg = null;
if (!rpcName.equals(switchTestWaitForRpc.getRpcName())) {
- failureMsg = "expected rpc name [" + switchTestWaitForRpc.getRpcName()
- + "], got [" + rpcName + "]";
+ failureMsg = "expected rpc name ["
+ + switchTestWaitForRpc.getRpcName() + "], got [" + rpcName
+ + "]";
} else if (!rpcInput.getXid().equals(switchTestWaitForRpc.getXid())) {
- failureMsg = "expected "+rpcName+".xid [" + switchTestWaitForRpc.getXid()
- + "], got [" + rpcInput.getXid() + "]";
+ failureMsg = "expected " + rpcName + ".xid ["
+ + switchTestWaitForRpc.getXid() + "], got ["
+ + rpcInput.getXid() + "]";
}
return failureMsg;
* discard current event, execute next, if possible
*/
private void next() {
- LOG.debug("<---> STEPPING TO NEXT event in plan (leaving [{}] {})", planItemCounter, eventPlan.peek());
+ LOG.debug("<---> STEPPING TO NEXT event in plan (leaving [{}] {})",
+ planItemCounter, eventPlan.peek());
eventPlan.pop();
- planItemCounter ++;
+ planItemCounter++;
planTouched = true;
try {
Thread.sleep(JOB_DELAY);
*/
private synchronized void proceed() {
boolean processed = false;
- LOG.debug("proceeding plan item[{}]: {}", planItemCounter, eventPlan.peek());
+ LOG.debug("proceeding plan item[{}]: {}", planItemCounter,
+ eventPlan.peek());
if (eventPlan.peek() instanceof SwitchTestNotificationEvent) {
SwitchTestNotificationEvent notification = (SwitchTestNotificationEvent) eventPlan
.peek();
processRpcResponse(rpcResponse);
processed = true;
} else if (eventPlan.peek() instanceof SwitchTestCallbackEvent) {
- SwitchTestCallbackEvent callbackEvent = (SwitchTestCallbackEvent) eventPlan.peek();
+ SwitchTestCallbackEvent callbackEvent = (SwitchTestCallbackEvent) eventPlan
+ .peek();
try {
callbackEvent.getCallback().call();
} catch (Exception e) {
LOG.error(e.getMessage(), e);
- occuredExceptions.add(e);
+ occuredExceptions.add(e);
}
processed = true;
}
planTouched = false;
proceed();
if (!planTouched) {
- occuredExceptions.add(new IllegalStateException(
- "eventPlan STALLED, planItemCounter="+planItemCounter));
+ occuredExceptions
+ .add(new IllegalStateException(
+ "eventPlan STALLED, planItemCounter="
+ + planItemCounter));
break;
}
}
private synchronized void processNotification(
final SwitchTestNotificationEvent notificationEvent) {
- Notification notification = notificationEvent
- .getPlannedNotification();
+ Notification notification = notificationEvent.getPlannedNotification();
LOG.debug("notificating OF_LISTENER: "
+ notification.getClass().getSimpleName());
// system events
if (notification instanceof DisconnectEvent) {
- systemListener
- .onDisconnectEvent((DisconnectEvent) notification);
+ systemListener.onDisconnectEvent((DisconnectEvent) notification);
}
// of notifications
else if (notification instanceof EchoRequestMessage) {
- ofListener
- .onEchoRequestMessage((EchoRequestMessage) notification);
+ ofListener.onEchoRequestMessage((EchoRequestMessage) notification);
} else if (notification instanceof ErrorMessage) {
ofListener.onErrorMessage((ErrorMessage) notification);
} else if (notification instanceof ExperimenterMessage) {
ofListener
- .onExperimenterMessage((ExperimenterMessage) notification);
+ .onExperimenterMessage((ExperimenterMessage) notification);
} else if (notification instanceof FlowRemovedMessage) {
- ofListener
- .onFlowRemovedMessage((FlowRemovedMessage) notification);
+ ofListener.onFlowRemovedMessage((FlowRemovedMessage) notification);
} else if (notification instanceof HelloMessage) {
ofListener.onHelloMessage((HelloMessage) notification);
} else if (notification instanceof MultipartReplyMessage) {
ofListener
- .onMultipartReplyMessage((MultipartReplyMessage) notification);
+ .onMultipartReplyMessage((MultipartReplyMessage) notification);
} else if (notification instanceof PacketInMessage) {
- ofListener
- .onPacketInMessage((PacketInMessage) notification);
+ ofListener.onPacketInMessage((PacketInMessage) notification);
} else if (notification instanceof PortStatusMessage) {
- ofListener
- .onPortStatusMessage((PortStatusMessage) notification);
+ ofListener.onPortStatusMessage((PortStatusMessage) notification);
}
// default
else {
- occuredExceptions.add(new IllegalStateException("step:"+planItemCounter+" | " +
- "message listening not supported for type: "
- + notification.getClass()));
+ occuredExceptions.add(new IllegalStateException("step:"
+ + planItemCounter + " | "
+ + "message listening not supported for type: "
+ + notification.getClass()));
}
- LOG.debug("notification ["+notification.getClass().getSimpleName()+"] .. done");
+ LOG.debug("notification [" + notification.getClass().getSimpleName()
+ + "] .. done");
}
/**
* @param rpcResponse
*/
- private synchronized void processRpcResponse(final SwitchTestRcpResponseEvent rpcResponse) {
- OfHeader plannedRpcResponseValue = rpcResponse
- .getPlannedRpcResponse();
+ private synchronized void processRpcResponse(
+ final SwitchTestRcpResponseEvent rpcResponse) {
+ OfHeader plannedRpcResponseValue = rpcResponse.getPlannedRpcResponse();
LOG.debug("rpc-responding to OF_LISTENER: " + rpcResponse.getXid());
@SuppressWarnings("unchecked")
SettableFuture<RpcResult<?>> response = (SettableFuture<RpcResult<?>>) rpcResults
- .get(rpcResponse.getXid());
+ .get(rpcResponse.getXid());
if (response != null) {
boolean successful = plannedRpcResponseValue != null;
if (successful) {
errors = Collections.emptyList();
} else {
- errors = Lists
- .newArrayList(RpcErrors
- .getRpcError(
- "unit",
- "unit",
- "not requested",
- ErrorSeverity.ERROR,
- "planned response to RPC.id = "
- + rpcResponse.getXid(),
- ErrorType.RPC,
- new Exception(
- "rpc response failed (planned behavior)")));
+ errors = Lists.newArrayList(RpcErrors.getRpcError("unit",
+ "unit", "not requested", ErrorSeverity.ERROR,
+ "planned response to RPC.id = " + rpcResponse.getXid(),
+ ErrorType.RPC, new Exception(
+ "rpc response failed (planned behavior)")));
}
RpcResult<?> result = Rpcs.getRpcResult(successful,
plannedRpcResponseValue, errors);
response.set(result);
} else {
String msg = "RpcResponse not expected: xid="
- + rpcResponse.getXid()
- + ", "
- + plannedRpcResponseValue.getClass()
- .getSimpleName();
+ + rpcResponse.getXid() + ", "
+ + plannedRpcResponseValue.getClass().getSimpleName();
LOG.error(msg);
- occuredExceptions.add(new IllegalStateException("step:"+planItemCounter+" | " + msg));
+ occuredExceptions.add(new IllegalStateException("step:"
+ + planItemCounter + " | " + msg));
}
- LOG.debug("rpc ["+rpcResponse.getXid()+"] .. done");
+ LOG.debug("rpc [" + rpcResponse.getXid() + "] .. done");
}
/**
@Override
public void fireConnectionReadyNotification() {
- connectionReadyListener.onConnectionReady();
+ connectionReadyListener.onConnectionReady();
}
@Override
}
@Override
- public Future<RpcResult<Void>> multipartRequest(
- MultipartRequestInput arg0) {
+ public Future<RpcResult<Void>> multipartRequest(MultipartRequestInput arg0) {
checkRpcAndNext(arg0, "multipartRequestInput");
SettableFuture<RpcResult<Void>> result = createOneWayRpcResult();
return result;
return null;
}
+ @Override
+ public boolean isAutoRead() {
+ return autoRead;
+ }
+
+ @Override
+ public void setAutoRead(boolean autoRead) {
+ this.autoRead = autoRead;
+ }
+
}
import org.junit.Test;
import org.opendaylight.openflowjava.protocol.api.connection.ConnectionAdapter;
import org.opendaylight.openflowjava.protocol.api.connection.ConnectionReadyListener;
-import org.opendaylight.openflowplugin.api.openflow.md.core.session.IMessageDispatchService;
-import org.opendaylight.openflowplugin.api.openflow.md.core.session.SessionContext;
-import org.opendaylight.openflowplugin.api.openflow.md.core.session.SwitchSessionKeyOF;
-import org.opendaylight.openflowplugin.api.openflow.md.ModelDrivenSwitch;
import org.opendaylight.openflowplugin.api.OFConstants;
+import org.opendaylight.openflowplugin.api.openflow.md.ModelDrivenSwitch;
import org.opendaylight.openflowplugin.api.openflow.md.core.ConnectionConductor;
import org.opendaylight.openflowplugin.api.openflow.md.core.ErrorHandler;
import org.opendaylight.openflowplugin.api.openflow.md.core.NotificationEnqueuer;
import org.opendaylight.openflowplugin.api.openflow.md.core.NotificationQueueWrapper;
import org.opendaylight.openflowplugin.api.openflow.md.core.SwitchConnectionDistinguisher;
+import org.opendaylight.openflowplugin.api.openflow.md.core.session.IMessageDispatchService;
+import org.opendaylight.openflowplugin.api.openflow.md.core.session.SessionContext;
+import org.opendaylight.openflowplugin.api.openflow.md.core.session.SwitchSessionKeyOF;
import org.opendaylight.openflowplugin.api.openflow.md.queue.QueueProcessor;
import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.BarrierInput;
import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.BarrierInputBuilder;
/**
* Test barrier message for null cookie
- *
+ *
* @throws Exception
*/
@Test
SwitchConnectionDistinguisher cookie = conductor.getAuxiliaryKey();
BarrierInputBuilder barrierMsg = new BarrierInputBuilder();
session.getMessageDispatchService().barrier(barrierMsg.build(), cookie);
- Assert.assertEquals(MessageType.BARRIER, session.getPrimaryConductor().getMessageType());
+ Assert.assertEquals(MessageType.BARRIER, session.getPrimaryConductor()
+ .getMessageType());
}
/**
MockConnectionConductor conductor = new MockConnectionConductor(1);
SwitchConnectionDistinguisher cookie = conductor.getAuxiliaryKey();
ExperimenterInputBuilder experimenterInputBuilder = new ExperimenterInputBuilder();
- session.getMessageDispatchService().experimenter(experimenterInputBuilder.build(), cookie);
- Assert.assertEquals(MessageType.NONE, session.getPrimaryConductor().getMessageType());
+ session.getMessageDispatchService().experimenter(
+ experimenterInputBuilder.build(), cookie);
+ Assert.assertEquals(MessageType.NONE, session.getPrimaryConductor()
+ .getMessageType());
}
/**
MockConnectionConductor conductor = new MockConnectionConductor(1);
SwitchConnectionDistinguisher cookie = conductor.getAuxiliaryKey();
GetAsyncInputBuilder getAsyncInputBuilder = new GetAsyncInputBuilder();
- session.getMessageDispatchService().getAsync(getAsyncInputBuilder.build(), cookie);
- Assert.assertEquals(MessageType.NONE, session.getPrimaryConductor().getMessageType());
+ session.getMessageDispatchService().getAsync(
+ getAsyncInputBuilder.build(), cookie);
+ Assert.assertEquals(MessageType.NONE, session.getPrimaryConductor()
+ .getMessageType());
}
/**
MockConnectionConductor conductor = new MockConnectionConductor(1);
SwitchConnectionDistinguisher cookie = conductor.getAuxiliaryKey();
GetConfigInputBuilder getConfigInputBuilder = new GetConfigInputBuilder();
- session.getMessageDispatchService().getConfig(getConfigInputBuilder.build(), cookie);
- Assert.assertEquals(MessageType.NONE, session.getPrimaryConductor().getMessageType());
+ session.getMessageDispatchService().getConfig(
+ getConfigInputBuilder.build(), cookie);
+ Assert.assertEquals(MessageType.NONE, session.getPrimaryConductor()
+ .getMessageType());
}
/**
MockConnectionConductor conductor = new MockConnectionConductor(1);
SwitchConnectionDistinguisher cookie = conductor.getAuxiliaryKey();
GetFeaturesInputBuilder getFeaturesInputBuilder = new GetFeaturesInputBuilder();
- session.getMessageDispatchService().getFeatures(getFeaturesInputBuilder.build(), cookie);
- Assert.assertEquals(MessageType.NONE, session.getPrimaryConductor().getMessageType());
+ session.getMessageDispatchService().getFeatures(
+ getFeaturesInputBuilder.build(), cookie);
+ Assert.assertEquals(MessageType.NONE, session.getPrimaryConductor()
+ .getMessageType());
}
/**
MockConnectionConductor conductor = new MockConnectionConductor(1);
SwitchConnectionDistinguisher cookie = conductor.getAuxiliaryKey();
GetQueueConfigInputBuilder getQueueConfigInputBuilder = new GetQueueConfigInputBuilder();
- session.getMessageDispatchService().getQueueConfig(getQueueConfigInputBuilder.build(), cookie);
- Assert.assertEquals(MessageType.NONE, session.getPrimaryConductor().getMessageType());
+ session.getMessageDispatchService().getQueueConfig(
+ getQueueConfigInputBuilder.build(), cookie);
+ Assert.assertEquals(MessageType.NONE, session.getPrimaryConductor()
+ .getMessageType());
}
/**
MockConnectionConductor conductor = new MockConnectionConductor(1);
SwitchConnectionDistinguisher cookie = conductor.getAuxiliaryKey();
MultipartRequestInputBuilder multipartRequestInputBuilder = new MultipartRequestInputBuilder();
- session.getMessageDispatchService().multipartRequest(multipartRequestInputBuilder.build(), cookie);
- Assert.assertEquals(MessageType.NONE, session.getPrimaryConductor().getMessageType());
+ session.getMessageDispatchService().multipartRequest(
+ multipartRequestInputBuilder.build(), cookie);
+ Assert.assertEquals(MessageType.NONE, session.getPrimaryConductor()
+ .getMessageType());
}
/**
MockConnectionConductor conductor = new MockConnectionConductor(1);
SwitchConnectionDistinguisher cookie = conductor.getAuxiliaryKey();
RoleRequestInputBuilder roleRequestInputBuilder = new RoleRequestInputBuilder();
- session.getMessageDispatchService().roleRequest(roleRequestInputBuilder.build(), cookie);
- Assert.assertEquals(MessageType.NONE, session.getPrimaryConductor().getMessageType());
+ session.getMessageDispatchService().roleRequest(
+ roleRequestInputBuilder.build(), cookie);
+ Assert.assertEquals(MessageType.NONE, session.getPrimaryConductor()
+ .getMessageType());
}
/**
MockConnectionConductor conductor = new MockConnectionConductor(1);
SwitchConnectionDistinguisher cookie = conductor.getAuxiliaryKey();
TableModInputBuilder tableModInputBuilder = new TableModInputBuilder();
- session.getMessageDispatchService().tableMod(tableModInputBuilder.build(), cookie);
- Assert.assertEquals(MessageType.TABLEMOD, session.getPrimaryConductor().getMessageType());
+ session.getMessageDispatchService().tableMod(
+ tableModInputBuilder.build(), cookie);
+ Assert.assertEquals(MessageType.TABLEMOD, session.getPrimaryConductor()
+ .getMessageType());
}
-
/**
* Test packet out message for primary connection
- *
+ *
* @throws Exception
*/
@Test
public void testPacketOutMessageForPrimary() throws Exception {
session.getMessageDispatchService().packetOut(null, null);
- Assert.assertEquals(MessageType.PACKETOUT, session.getPrimaryConductor().getMessageType());
+ Assert.assertEquals(MessageType.PACKETOUT, session
+ .getPrimaryConductor().getMessageType());
}
/**
* Test packet out message for auxiliary connection
- *
+ *
* @throws Exception
*/
@Test
SwitchConnectionDistinguisher cookie = conductor.getAuxiliaryKey();
session.addAuxiliaryConductor(cookie, conductor);
session.getMessageDispatchService().packetOut(null, cookie);
- Assert.assertEquals(MessageType.NONE, session.getPrimaryConductor().getMessageType());
- conductor = (MockConnectionConductor) session.getAuxiliaryConductor(cookie);
+ Assert.assertEquals(MessageType.NONE, session.getPrimaryConductor()
+ .getMessageType());
+ conductor = (MockConnectionConductor) session
+ .getAuxiliaryConductor(cookie);
Assert.assertEquals(MessageType.PACKETOUT, conductor.getMessageType());
}
/**
* Test packet out message when multiple auxiliary connection exist
- *
+ *
* @throws Exception
*/
@Test
// send message
session.getMessageDispatchService().packetOut(builder.build(), cookie2);
- Assert.assertEquals(MessageType.NONE, session.getPrimaryConductor().getMessageType());
+ Assert.assertEquals(MessageType.NONE, session.getPrimaryConductor()
+ .getMessageType());
- conductor3 = (MockConnectionConductor) session.getAuxiliaryConductor(cookie3);
+ conductor3 = (MockConnectionConductor) session
+ .getAuxiliaryConductor(cookie3);
Assert.assertEquals(MessageType.NONE, conductor3.getMessageType());
- conductor2 = (MockConnectionConductor) session.getAuxiliaryConductor(cookie2);
+ conductor2 = (MockConnectionConductor) session
+ .getAuxiliaryConductor(cookie2);
Assert.assertEquals(MessageType.PACKETOUT, conductor2.getMessageType());
- conductor1 = (MockConnectionConductor) session.getAuxiliaryConductor(cookie1);
+ conductor1 = (MockConnectionConductor) session
+ .getAuxiliaryConductor(cookie1);
Assert.assertEquals(MessageType.NONE, conductor1.getMessageType());
}
/**
* Test for invalid session
- *
+ *
* @throws Exception
*/
@Test
public void testInvalidSession() throws Exception {
session.setValid(false);
- Future<RpcResult<Void>> resultFuture = session.getMessageDispatchService().packetOut(null, null);
+ Future<RpcResult<Void>> resultFuture = session
+ .getMessageDispatchService().packetOut(null, null);
if (resultFuture.isDone()) {
RpcResult<Void> rpcResult = resultFuture.get();
Assert.assertTrue(!rpcResult.getErrors().isEmpty());
Iterator<RpcError> it = rpcResult.getErrors().iterator();
RpcError rpcError = it.next();
- Assert.assertTrue(rpcError.getApplicationTag().equals(OFConstants.APPLICATION_TAG));
- Assert.assertTrue(rpcError.getTag().equals(OFConstants.ERROR_TAG_TIMEOUT));
- Assert.assertTrue(rpcError.getErrorType().equals(RpcError.ErrorType.TRANSPORT));
+ Assert.assertTrue(rpcError.getApplicationTag().equals(
+ OFConstants.APPLICATION_TAG));
+ Assert.assertTrue(rpcError.getTag().equals(
+ OFConstants.ERROR_TAG_TIMEOUT));
+ Assert.assertTrue(rpcError.getErrorType().equals(
+ RpcError.ErrorType.TRANSPORT));
}
}
}
@Override
- public ConnectionConductor getAuxiliaryConductor(SwitchConnectionDistinguisher auxiliaryKey) {
+ public ConnectionConductor getAuxiliaryConductor(
+ SwitchConnectionDistinguisher auxiliaryKey) {
return map.get(auxiliaryKey);
}
}
@Override
- public void addAuxiliaryConductor(SwitchConnectionDistinguisher auxiliaryKey, ConnectionConductor conductorArg) {
+ public void addAuxiliaryConductor(
+ SwitchConnectionDistinguisher auxiliaryKey,
+ ConnectionConductor conductorArg) {
map.put(auxiliaryKey, conductorArg);
}
@Override
- public ConnectionConductor removeAuxiliaryConductor(SwitchConnectionDistinguisher connectionCookie) {
+ public ConnectionConductor removeAuxiliaryConductor(
+ SwitchConnectionDistinguisher connectionCookie) {
return map.remove(connectionCookie);
}
}
/**
- * @param seed the seed to set
+ * @param seed
+ * the seed to set
*/
public void setSeed(int seed) {
this.seed = seed;
}
-
+
@Override
public NotificationEnqueuer getNotificationEnqueuer() {
return conductor;
}
}
-class MockConnectionConductor implements ConnectionConductor, NotificationEnqueuer {
+class MockConnectionConductor implements ConnectionConductor,
+ NotificationEnqueuer {
private int conductorNum;
private MockConnectionAdapter adapter;
}
@Override
- public void setQueueProcessor(QueueProcessor<OfHeader, DataObject> queueKeeper) {
+ public void setQueueProcessor(
+ QueueProcessor<OfHeader, DataObject> queueKeeper) {
// NOOP
}
public void setId(int conductorId) {
// NOOP
}
-
+
@Override
public void enqueueNotification(NotificationQueueWrapper notification) {
// NOOP
}
@Override
- public Future<RpcResult<GetFeaturesOutput>> getFeatures(GetFeaturesInput input) {
+ public Future<RpcResult<GetFeaturesOutput>> getFeatures(
+ GetFeaturesInput input) {
// TODO Auto-generated method stub
return null;
}
@Override
- public Future<RpcResult<GetQueueConfigOutput>> getQueueConfig(GetQueueConfigInput input) {
+ public Future<RpcResult<GetQueueConfigOutput>> getQueueConfig(
+ GetQueueConfigInput input) {
// TODO Auto-generated method stub
return null;
}
}
@Override
- public Future<RpcResult<RoleRequestOutput>> roleRequest(RoleRequestInput input) {
+ public Future<RpcResult<RoleRequestOutput>> roleRequest(
+ RoleRequestInput input) {
// TODO Auto-generated method stub
return null;
}
}
/**
- * @param messageType the messageType to set
+ * @param messageType
+ * the messageType to set
*/
public void setMessageType(MessageType messageType) {
this.messageType = messageType;
}
@Override
- public Future<RpcResult<Void>> multipartRequest(
- MultipartRequestInput input) {
+ public Future<RpcResult<Void>> multipartRequest(MultipartRequestInput input) {
// TODO Auto-generated method stub
return null;
}
- /* (non-Javadoc)
- * @see org.opendaylight.openflowjava.protocol.api.connection.ConnectionAdapter#getRemoteAddress()
+ /*
+ * (non-Javadoc)
+ *
+ * @see
+ * org.opendaylight.openflowjava.protocol.api.connection.ConnectionAdapter
+ * #getRemoteAddress()
*/
@Override
public InetSocketAddress getRemoteAddress() {
// TODO Auto-generated method stub
return null;
}
+
+ @Override
+ public boolean isAutoRead() {
+ // TODO Auto-generated method stub
+ return false;
+ }
+
+ @Override
+ public void setAutoRead(boolean arg0) {
+ // TODO Auto-generated method stub
+
+ }
}
--- /dev/null
+package org.opendaylight.openflowplugin.openflow.md.util;
+
+import java.util.Queue;
+import java.util.concurrent.ArrayBlockingQueue;
+
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.Mock;
+import org.mockito.Mockito;
+import org.mockito.runners.MockitoJUnitRunner;
+import org.opendaylight.openflowplugin.api.openflow.md.core.ConnectionConductor;
+import org.opendaylight.openflowplugin.api.openflow.md.queue.WaterMarkListener;
+import org.opendaylight.openflowplugin.openflow.md.queue.WrapperQueueImpl;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+@RunWith(MockitoJUnitRunner.class)
+public class WrapperQueueImplTest {
+
+ protected static final Logger LOG = LoggerFactory
+ .getLogger(WrapperQueueImplTest.class);
+
+ @Mock
+ private ConnectionConductor connectionConductor;
+
+ @Mock
+ private WaterMarkListener waterMarkListener;
+
+ private WrapperQueueImpl<Integer> wrapperQueueImpl;
+ private final int capacity = 100;
+ private Queue<Integer> queueDefault;
+ private int highWaterMark = 80;
+ private int lowWaterMark = 65;
+
+ /**
+ * Setup before tests
+ */
+ @Before
+ public void setUp() {
+ queueDefault = new ArrayBlockingQueue<>(capacity);
+
+ wrapperQueueImpl = new WrapperQueueImpl<>(capacity, queueDefault,
+ waterMarkListener);
+ }
+
+ /**
+ * Test for check if wrapper is not null
+ */
+ @Test
+ public void testWrapperQueueImpl() {
+ Assert.assertNotNull("Wrapper can not be null.", wrapperQueueImpl);
+ }
+
+ /**
+ * Test for set setAutoRead on false on high water mark
+ */
+ @Test
+ public void testReadOnHighWaterMark() {
+
+ Assert.assertFalse("Wrapper must be not flooded at the start.",
+ wrapperQueueImpl.isFlooded());
+
+ push(79);
+ Assert.assertFalse("Wrapper should not be flooded.",
+ wrapperQueueImpl.isFlooded());
+ Mockito.verify(waterMarkListener, Mockito.times(0)).onHighWaterMark();
+
+ push(1);
+ Assert.assertTrue("Wrapper should be flooded.",
+ wrapperQueueImpl.isFlooded());
+ Mockito.verify(waterMarkListener, Mockito.times(1)).onHighWaterMark();
+
+ Assert.assertEquals(
+ "Size of queue has to be equals to 80% of capacity of queue",
+ highWaterMark, queueDefault.size());
+ }
+
+ /**
+ *
+ */
+ private void push(int size) {
+ for (int i = 0; i < size; i++) {
+ try {
+ wrapperQueueImpl.offer(i);
+ } catch (Exception e) {
+ LOG.error("Failed to offer item to queue.", e);
+ }
+ }
+ }
+
+ /**
+ * Test for setAutoRead on true on low water mark
+ */
+ @Test
+ public void testReadOnLowWaterMark() {
+ Mockito.verify(waterMarkListener, Mockito.times(0)).onHighWaterMark();
+ push(80);
+ Assert.assertTrue("Wrapper should be flooded.",
+ wrapperQueueImpl.isFlooded());
+ Mockito.verify(waterMarkListener, Mockito.times(1)).onHighWaterMark();
+
+ Assert.assertEquals(
+ "Size of queue has to be equals to 80% of capacity of queue",
+ highWaterMark, queueDefault.size());
+
+ poll(14);
+ Mockito.verify(waterMarkListener, Mockito.times(0)).onLowWaterMark();
+ Assert.assertTrue("Wrapper should be still flooded.",
+ wrapperQueueImpl.isFlooded());
+
+ poll(1);
+ Mockito.verify(waterMarkListener, Mockito.times(1)).onLowWaterMark();
+
+ Assert.assertEquals(
+ "Size of queue has to be equals to 65% on lowWaterMark.",
+ lowWaterMark, queueDefault.size());
+ Assert.assertFalse("Wrapped should be not flooded.",
+ wrapperQueueImpl.isFlooded());
+ }
+
+ /**
+ * Polling messages
+ */
+ private void poll(int size) {
+
+ for (int i = 0; i < size; i++) {
+ wrapperQueueImpl.poll();
+ }
+ }
+
+ /**
+ * Test for one cycle.
+ */
+ @Test
+ public void testEndReadOnHWMStartOnLWM() {
+
+ Assert.assertFalse("Wrapper should not be flooded",
+ wrapperQueueImpl.isFlooded());
+ Mockito.verify(waterMarkListener, Mockito.times(0)).onLowWaterMark();
+ Mockito.verify(waterMarkListener, Mockito.times(0)).onHighWaterMark();
+
+ push(81);
+ Assert.assertTrue("Wrapper should be flooded",
+ wrapperQueueImpl.isFlooded());
+ Mockito.verify(waterMarkListener, Mockito.times(0)).onLowWaterMark();
+ Mockito.verify(waterMarkListener, Mockito.times(1)).onHighWaterMark();
+
+ poll(17);
+ Assert.assertFalse("Wrapper should not be flooded",
+ wrapperQueueImpl.isFlooded());
+ Mockito.verify(waterMarkListener, Mockito.times(1)).onLowWaterMark();
+ Mockito.verify(waterMarkListener, Mockito.times(1)).onHighWaterMark();
+
+ push(18);
+ Assert.assertTrue("Wrapper should be flooded",
+ wrapperQueueImpl.isFlooded());
+
+ Mockito.verify(waterMarkListener, Mockito.times(1)).onLowWaterMark();
+ Mockito.verify(waterMarkListener, Mockito.times(2)).onHighWaterMark();
+ }
+}