/**
- * Copyright (c) 2013-2014 Cisco Systems, Inc. and others. All rights reserved.
+ * Copyright (c) 2013, 2014 Cisco Systems, Inc. and others. All rights reserved.
*
* This program and the accompanying materials are made available under the
* terms of the Eclipse Public License v1.0 which accompanies this distribution,
import java.util.List;
import java.util.Map;
import java.util.Stack;
+import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.opendaylight.openflowplugin.openflow.md.core.plan.ConnectionAdapterStackImpl;
import org.opendaylight.openflowplugin.openflow.md.core.plan.EventFactory;
import org.opendaylight.openflowplugin.openflow.md.core.plan.SwitchTestEvent;
+import org.opendaylight.openflowplugin.openflow.md.core.sal.convertor.ConvertorManager;
+import org.opendaylight.openflowplugin.openflow.md.core.sal.convertor.ConvertorManagerFactory;
import org.opendaylight.openflowplugin.openflow.md.queue.QueueProcessorLightImpl;
import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.common.types.rev130731.Capabilities;
import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.common.types.rev130731.PortFeatures;
private Stack<SwitchTestEvent> eventPlan;
private Thread libSimulation;
- private ScheduledThreadPoolExecutor pool = new ScheduledThreadPoolExecutor(
+ private final ScheduledThreadPoolExecutor pool = new ScheduledThreadPoolExecutor(
8);
protected QueueProcessorLightImpl queueProcessor;
- private PopListener<DataObject> popListener;
+ private PopListenerCountingImpl<DataObject> popListener;
private int experimenterMessageCounter;
private int packetinMessageCounter;
@Mock
private ErrorHandlerSimpleImpl errorHandler;
- private int expectedErrors = 0;
+ private final int expectedErrors = 0;
@Mock
private MessageSpy<DataContainer> messageSpy;
@Mock
HandshakeContext handshakeContext;
+ private ErrorMessageService objErms;
public void incrExperimenterMessageCounter() {
this.experimenterMessageCounter++;
popListener = new PopListenerCountingImpl<>();
- controller = new MDController();
+ final ConvertorManager convertorManager = ConvertorManagerFactory.createDefaultManager();
+ controller = new MDController(convertorManager);
controller.init();
controller.getMessageTranslators().putAll(assembleTranslatorMapping());
if (eventPlan.size() > 0) {
LOG.debug("eventPlan size: " + eventPlan.size());
for (SwitchTestEvent event : eventPlan) {
- LOG.debug(" # EVENT:: " + event.toString());
+ LOG.debug(" # EVENT:: " + event);
}
}
}
EventFactory.DEFAULT_VERSION, getFeatureResponseMsg()));
int i = 1;
- eventPlan.add(0, EventFactory.createDefaultWaitForRpcEvent(i++, "multipartRequestInput"));
- eventPlan.add(0, EventFactory.createDefaultWaitForRpcEvent(i++, "multipartRequestInput"));
- eventPlan.add(0, EventFactory.createDefaultWaitForRpcEvent(i++, "multipartRequestInput"));
- eventPlan.add(0, EventFactory.createDefaultWaitForRpcEvent(i++, "multipartRequestInput"));
executeNow();
Assert.assertEquals(ConnectionConductor.CONDUCTOR_STATE.WORKING,
eventPlan.add(0, EventFactory.createDefaultRpcResponseEvent(43,
EventFactory.DEFAULT_VERSION, getFeatureResponseMsg()));
- int i = 1;
- eventPlan.add(0, EventFactory.createDefaultWaitForRpcEvent(i++, "multipartRequestInput"));
- eventPlan.add(0, EventFactory.createDefaultWaitForRpcEvent(i++, "multipartRequestInput"));
- eventPlan.add(0, EventFactory.createDefaultWaitForRpcEvent(i++, "multipartRequestInput"));
- eventPlan.add(0, EventFactory.createDefaultWaitForRpcEvent(i++, "multipartRequestInput"));
-
executeNow();
Assert.assertEquals(ConnectionConductor.CONDUCTOR_STATE.WORKING,
eventPlan.add(0, EventFactory.createDefaultRpcResponseEvent(45,
(short) 0x01, getFeatureResponseMsg()));
- int i = 1;
- eventPlan.add(0, EventFactory.createDefaultWaitForRpcEvent(i++, "multipartRequestInput"));
-
executeNow();
Assert.assertEquals(ConnectionConductor.CONDUCTOR_STATE.WORKING,
eventPlan.add(0, EventFactory.createDefaultRpcResponseEvent(45,
(short) 0x01, getFeatureResponseMsg()));
- int i = 1;
- eventPlan.add(0, EventFactory.createDefaultWaitForRpcEvent(i++, "multipartRequestInput"));
-
executeNow();
Assert.assertEquals(ConnectionConductor.CONDUCTOR_STATE.WORKING,
*/
@Test
public void testOnFlowRemovedMessage() throws InterruptedException {
- IMDMessageTranslator<OfHeader, List<DataObject>> objFms = new FlowRemovedMessageService();
+ FlowRemovedMessageService objFms = new FlowRemovedMessageService();
controller.addMessageTranslator(FlowRemovedMessage.class, 4, objFms);
simulateV13PostHandshakeState(connectionConductor);
+ objFms.resetLatch(2);
// Now send Flow Removed messages
FlowRemovedMessageBuilder builder1 = new FlowRemovedMessageBuilder();
builder1.setVersion((short) 4);
builder1.setXid(1L);
connectionConductor.onFlowRemovedMessage(builder1.build());
- synchronized (popListener) {
LOG.debug("about to wait for popListener");
- popListener.wait(maxProcessingTimeout);
- }
- Assert.assertEquals(1, flowremovedMessageCounter);
builder1.setXid(2L);
connectionConductor.onFlowRemovedMessage(builder1.build());
- synchronized (popListener) {
- popListener.wait(maxProcessingTimeout);
- }
+
+ flushMessageProcessing();
+ Assert.assertTrue(objFms.await(maxProcessingTimeout, TimeUnit.MILLISECONDS));
Assert.assertEquals(2, flowremovedMessageCounter);
}
*/
@Test
public void testOnPacketInMessage() throws InterruptedException {
- IMDMessageTranslator<OfHeader, List<DataObject>> objPms = new PacketInMessageService();
+ PacketInMessageService objPms = new PacketInMessageService();
controller.addMessageTranslator(PacketInMessage.class, 4, objPms);
simulateV13PostHandshakeState(connectionConductor);
builder1.setVersion((short) 4);
builder1.setBufferId((long) 1);
connectionConductor.onPacketInMessage(builder1.build());
- synchronized (popListener) {
- popListener.wait(maxProcessingTimeout);
- }
- Assert.assertEquals(1, packetinMessageCounter);
builder1.setBufferId((long) 2);
connectionConductor.onPacketInMessage(builder1.build());
- synchronized (popListener) {
- popListener.wait(maxProcessingTimeout);
- }
+
+ flushMessageProcessing();
+ Assert.assertTrue(objPms.await(maxProcessingTimeout, TimeUnit.MILLISECONDS));
Assert.assertEquals(2, packetinMessageCounter);
}
*/
@Test
public void testOnPortStatusMessage() throws InterruptedException {
- IMDMessageTranslator<OfHeader, List<DataObject>> objPSms = new PortStatusMessageService();
+ PortStatusMessageService objPSms = new PortStatusMessageService();
controller.addMessageTranslator(PortStatusMessage.class, 4, objPSms);
simulateV13PostHandshakeState(connectionConductor);
builder1.setVersion((short) 4);
PortFeatures features = new PortFeatures(true, false, false, false, false, false, false, false, false, false, false, false, false, false, false, false);
builder1.setPortNo(90L).setReason(PortReason.OFPPRADD).setCurrentFeatures(features);
+ objPSms.resetLatch(3);
+ LOG.debug("sending port message");
connectionConductor.onPortStatusMessage(builder1.build());
- synchronized (popListener) {
- popListener.wait(maxProcessingTimeout);
- }
- Assert.assertEquals(1, portstatusAddMessageCounter);
builder1.setPortNo(90L).setReason(PortReason.OFPPRMODIFY).setCurrentFeatures(features);
connectionConductor.onPortStatusMessage(builder1.build());
- synchronized (popListener) {
- popListener.wait(maxProcessingTimeout);
- }
- Assert.assertEquals(1, portstatusModifyMessageCounter);
builder1.setPortNo(90L).setReason(PortReason.OFPPRDELETE).setCurrentFeatures(features);
connectionConductor.onPortStatusMessage(builder1.build());
- synchronized (popListener) {
- popListener.wait(maxProcessingTimeout);
- }
+
+ flushMessageProcessing();
+ Assert.assertTrue(objPSms.await(maxProcessingTimeout, TimeUnit.MILLISECONDS));
+ Assert.assertEquals(1, portstatusModifyMessageCounter);
+ Assert.assertEquals(1, portstatusAddMessageCounter);
Assert.assertEquals(1, portstatusDeleteMessageCounter);
}
+ private void flushMessageProcessing() throws InterruptedException {
+ // make sure that harvester sleeps deeply
+ Thread.sleep(maxProcessingTimeout);
+ // flushing messages
+ queueProcessor.getHarvesterHandle().ping();
+ }
+
/**
* @throws InterruptedException
*/
/**
* @throws InterruptedException
*/
- private void execute(boolean join) throws InterruptedException {
+ private void execute(final boolean join) throws InterruptedException {
libSimulation = new Thread(adapter, "junit-adapter");
libSimulation.start();
if (join) {
/**
* @return
*/
- private static Capabilities createCapabilities(long input) {
+ private static Capabilities createCapabilities(final long input) {
final Boolean FLOW_STATS = (input & (1 << 0)) != 0;
final Boolean TABLE_STATS = (input & (1 << 1)) != 0;
final Boolean PORT_STATS = (input & (1 << 2)) != 0;
return capabilities;
}
+ public abstract class ProcessingLatchService {
+ private CountDownLatch processingLatch = new CountDownLatch(0);
+
+ public void resetLatch(int passAmount) {
+ processingLatch = new CountDownLatch(passAmount);
+ }
+
+ protected void countDown() {
+ processingLatch.countDown();
+ }
+
+ public boolean await(long timeout, TimeUnit unit) throws InterruptedException {
+ return processingLatch.await(timeout, unit);
+ }
+ }
+
public class ExperimenterMessageService implements IMDMessageTranslator<OfHeader, List<DataObject>> {
@Override
- public List<DataObject> translate(SwitchConnectionDistinguisher cookie, SessionContext sw, OfHeader msg) {
+ public List<DataObject> translate(final SwitchConnectionDistinguisher cookie, final SessionContext sw, final OfHeader msg) {
LOG.debug("Received a packet in Experimenter Service");
ConnectionConductorImplTest.this.incrExperimenterMessageCounter();
return null;
}
}
- public class PacketInMessageService implements IMDMessageTranslator<OfHeader, List<DataObject>> {
+ public class PacketInMessageService extends ProcessingLatchService implements IMDMessageTranslator<OfHeader, List<DataObject>> {
@Override
- public List<DataObject> translate(SwitchConnectionDistinguisher cookie, SessionContext sw, OfHeader msg) {
+ public List<DataObject> translate(final SwitchConnectionDistinguisher cookie, final SessionContext sw, final OfHeader msg) {
LOG.debug("Received a packet in PacketIn Service");
ConnectionConductorImplTest.this.incrPacketinMessageCounter();
+ countDown();
return null;
}
}
- public class FlowRemovedMessageService implements IMDMessageTranslator<OfHeader, List<DataObject>> {
+ public class FlowRemovedMessageService extends ProcessingLatchService implements IMDMessageTranslator<OfHeader, List<DataObject>> {
@Override
- public List<DataObject> translate(SwitchConnectionDistinguisher cookie, SessionContext sw, OfHeader msg) {
+ public List<DataObject> translate(final SwitchConnectionDistinguisher cookie, final SessionContext sw, final OfHeader msg) {
LOG.debug("Received a packet in FlowRemoved Service");
ConnectionConductorImplTest.this.incrFlowremovedMessageCounter();
+ countDown();
return null;
}
}
- public class PortStatusMessageService implements IMDMessageTranslator<OfHeader, List<DataObject>> {
+ public class PortStatusMessageService extends ProcessingLatchService implements IMDMessageTranslator<OfHeader, List<DataObject>> {
@Override
- public List<DataObject> translate(SwitchConnectionDistinguisher cookie, SessionContext sw, OfHeader msg) {
+ public List<DataObject> translate(final SwitchConnectionDistinguisher cookie, final SessionContext sw, final OfHeader msg) {
LOG.debug("Received a packet in PortStatus Service");
if ((((PortStatusMessage) msg).getReason().equals(PortReason.OFPPRADD))) {
ConnectionConductorImplTest.this.incrPortstatusAddMessageCounter();
} else if (((PortStatusMessage) msg).getReason().equals(PortReason.OFPPRMODIFY)) {
ConnectionConductorImplTest.this.incrPortstatusModifyMessageCounter();
}
+ countDown();
return null;
}
}
- public class ErrorMessageService implements IMDMessageTranslator<OfHeader, List<DataObject>> {
+ public class ErrorMessageService extends ProcessingLatchService implements IMDMessageTranslator<OfHeader, List<DataObject>> {
@Override
- public List<DataObject> translate(SwitchConnectionDistinguisher cookie, SessionContext sw, OfHeader msg) {
+ public List<DataObject> translate(final SwitchConnectionDistinguisher cookie, final SessionContext sw, final OfHeader msg) {
LOG.debug("Received a packet in Experimenter Service");
ConnectionConductorImplTest.this.incrErrorMessageCounter();
+ countDown();
return null;
}
}
@Test
public void testOnErrorMessage() throws InterruptedException {
simulateV13PostHandshakeState(connectionConductor);
+ final int messageAmount = 2;
+ objErms.resetLatch(messageAmount);
ErrorMessageBuilder builder1 = new ErrorMessageBuilder();
builder1.setVersion((short) 4);
builder1.setCode(100);
connectionConductor.onErrorMessage(builder1.build());
- synchronized (popListener) {
- popListener.wait(maxProcessingTimeout);
- }
- Assert.assertEquals(1, errorMessageCounter);
builder1.setCode(200);
connectionConductor.onErrorMessage(builder1.build());
- synchronized (popListener) {
- popListener.wait(maxProcessingTimeout);
- }
- Assert.assertEquals(2, errorMessageCounter);
+
+ flushMessageProcessing();
+ Assert.assertTrue(objErms.await(maxProcessingTimeout, TimeUnit.MILLISECONDS));
+ Assert.assertEquals(messageAmount, errorMessageCounter);
}
/**
existingValues.add(objEms);
tKey = new TranslatorKey(4, ExperimenterMessage.class.getName());
translatorMapping.put(tKey, existingValues);
- IMDMessageTranslator<OfHeader, List<DataObject>> objErms = new ErrorMessageService();
+ objErms = new ErrorMessageService();
existingValues.add(objErms);
tKey = new TranslatorKey(4, ErrorMessage.class.getName());
translatorMapping.put(tKey, existingValues);
connectionConductor.onHandshakeFailure();
connectionConductor.checkState(ConnectionConductor.CONDUCTOR_STATE.RIP);
}
- private void simulateV13PostHandshakeState(ConnectionConductorImpl conductor) {
+ private static void simulateV13PostHandshakeState(final ConnectionConductorImpl conductor) {
GetFeaturesOutputBuilder featureOutput = getFeatureResponseMsg();
conductor.postHandshakeBasic(featureOutput.build(), OFConstants.OFP_VERSION_1_3);
+ LOG.debug("simulating post handshake event done");
}
}