import org.opendaylight.openflowjava.protocol.impl.clients.SendEvent;
import org.opendaylight.openflowjava.protocol.impl.clients.SimpleClient;
import org.opendaylight.openflowjava.protocol.impl.clients.SleepEvent;
+import org.opendaylight.openflowjava.protocol.impl.clients.WaitForMessageEvent;
+import org.opendaylight.openflowjava.protocol.impl.util.ByteBufUtils;
import org.opendaylight.openflowjava.protocol.spi.connection.SwitchConnectionProvider;
import org.ops4j.pax.exam.Configuration;
import org.ops4j.pax.exam.Option;
private SimpleClient switchSim;
+ /**
+ * test tear down
+ */
@After
public void tearDown() {
try {
* @throws Exception
*/
@Test
- public void handshakeOk() throws Exception {
+ public void handshakeOk1() throws Exception {
LOG.debug("handshake integration test");
LOG.debug("switchConnectionProvider: "+switchConnectionProvider);
//TODO: dump errors of plugin
}
+ /**
+ * test basic integration with OFLib running the handshake (with version bitmap)
+ * @throws Exception
+ */
+ @Test
+ public void handshakeOk2() throws Exception {
+ LOG.debug("handshake integration test");
+ LOG.debug("switchConnectionProvider: "+switchConnectionProvider);
+
+ switchSim = new SimpleClient("localhost", 6653);
+ switchSim.setSecuredClient(false);
+ Stack<ClientEvent> handshakeScenario = new Stack<>();
+ // handshake with versionbitmap
+ handshakeScenario.add(0, new SendEvent(ByteBufUtils.hexStringToBytes("04 00 00 10 00 00 00 01 00 01 00 08 00 00 00 10")));
+ handshakeScenario.add(0, new WaitForMessageEvent(ByteBufUtils.hexStringToBytes("04 00 00 10 00 00 00 15 00 01 00 08 00 00 00 12")));
+ handshakeScenario.add(0, new WaitForMessageEvent(ByteBufUtils.hexStringToBytes("04 05 00 08 00 00 00 03")));
+ handshakeScenario.add(0, new SendEvent(ByteBufUtils.hexStringToBytes("04 06 00 20 00 00 00 03 "
+ + "00 01 02 03 04 05 06 07 00 01 02 03 01 00 00 00 00 01 02 03 00 01 02 03")));
+
+ ScenarioHandler scenario = new ScenarioHandler(handshakeScenario);
+ switchSim.setScenarioHandler(scenario);
+ switchSim.start();
+ try {
+ switchSim.getScenarioDone().get(getFailSafeTimeout(), TimeUnit.MILLISECONDS);
+ } catch (Exception e) {
+ String msg = "waiting for scenario to finish failed: "+e.getMessage();
+ LOG.error(msg, e);
+ Assert.fail(msg);
+ }
+ //TODO: dump errors of plugin
+ }
+
/**
* test basic integration with OFLib running the handshake
* @throws Exception
package org.opendaylight.openflowplugin.openflow.md.core;
+import java.util.List;
import java.util.concurrent.Future;
import org.opendaylight.openflowjava.protocol.api.connection.ConnectionAdapter;
import org.opendaylight.openflowplugin.openflow.md.core.session.SessionContext;
+import com.google.common.collect.Lists;
+
/**
* @author mirehak
RIP
}
+ /** supported version ordered by height (highest version is at the beginning) */
+ public static final List<Short> versionOrder = Lists.newArrayList((short) 0x04, (short) 0x01);
+
/**
* initialize wiring around {@link #connectionAdapter}
*/
package org.opendaylight.openflowplugin.openflow.md.core;
-import java.util.ArrayList;
+import java.util.Arrays;
import java.util.Collection;
import java.util.List;
import java.util.Map;
+import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
import org.opendaylight.openflowjava.protocol.api.connection.ConnectionAdapter;
import org.opendaylight.openflowjava.protocol.api.connection.ConnectionReadyListener;
import org.opendaylight.openflowplugin.openflow.md.core.session.OFSessionUtil;
import org.opendaylight.openflowplugin.openflow.md.core.session.SessionContext;
import org.opendaylight.openflowplugin.openflow.md.core.session.SessionManager;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.common.types.rev130731.HelloElementType;
import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.EchoInputBuilder;
import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.EchoOutput;
import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.EchoReplyInputBuilder;
import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.FlowRemovedMessage;
import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.GetFeaturesInputBuilder;
import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.GetFeaturesOutput;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.HelloInputBuilder;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.HelloInput;
import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.HelloMessage;
import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.MultipartReplyMessage;
import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.MultipartRequestMessage;
import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.PacketInMessage;
import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.PortStatusMessage;
import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.hello.Elements;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.hello.ElementsBuilder;
import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.system.rev130927.DisconnectEvent;
import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.system.rev130927.SwitchIdleEvent;
import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.system.rev130927.SystemNotificationsListener;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import com.google.common.collect.Lists;
import com.google.common.util.concurrent.Futures;
/**
public class ConnectionConductorImpl implements OpenflowProtocolListener,
SystemNotificationsListener, ConnectionConductor, ConnectionReadyListener {
- private static final Logger LOG = LoggerFactory
+ protected static final Logger LOG = LoggerFactory
.getLogger(ConnectionConductorImpl.class);
/* variable to make BitMap-based negotiation enabled / disabled.
private static final boolean isBitmapNegotiationEnable = true;
private LinkedBlockingQueue<Exception> errorQueue = new LinkedBlockingQueue<>();
- private final ConnectionAdapter connectionAdapter;
- private final List<Short> versionOrder;
+ protected final ConnectionAdapter connectionAdapter;
private ConnectionConductor.CONDUCTOR_STATE conductorState;
private Short version;
private Map<Class<? extends DataObject>, Collection<IMDMessageListener>> listenerMapping;
- private boolean isFirstHelloNegotiation = true;
+ protected boolean isFirstHelloNegotiation = true;
public ConnectionConductorImpl(ConnectionAdapter connectionAdapter) {
this.connectionAdapter = connectionAdapter;
conductorState = CONDUCTOR_STATE.HANDSHAKING;
- versionOrder = Lists.newArrayList((short) 0x04, (short) 0x01);
- // TODO: add a thread pool to handle ErrorQueueHandler
new Thread(new ErrorQueueHandler(errorQueue)).start();
}
/**
* send first hello message to switch
*/
- private void sendFirstHelloMessage() {
- short highestVersion = versionOrder.get(0);
- Long helloXid = 1L;
- HelloInputBuilder helloInputbuilder = new HelloInputBuilder();
- helloInputbuilder.setVersion(highestVersion);
- helloInputbuilder.setXid(helloXid);
+ protected void sendFirstHelloMessage() {
+ Short highestVersion = ConnectionConductor.versionOrder.get(0);
+ Long helloXid = 21L;
+ HelloInput helloInput = null;
+
if (isBitmapNegotiationEnable) {
- int elementsCount = highestVersion / Integer.SIZE;
- ElementsBuilder elementsBuilder = new ElementsBuilder();
-
- List<Elements> elementList = new ArrayList<Elements>();
- int orderIndex = versionOrder.size();
- int value = versionOrder.get(--orderIndex);
- for (int index = 0; index <= elementsCount; index++) {
- List<Boolean> booleanList = new ArrayList<Boolean>();
- for (int i = 0; i < Integer.SIZE; i++) {
- if (value == ((index * Integer.SIZE) + i)) {
- booleanList.add(true);
- value = (orderIndex == 0) ? highestVersion : versionOrder.get(--orderIndex);
- } else {
- booleanList.add(false);
- }
- }
- elementsBuilder.setType(HelloElementType.forValue(1));
- elementsBuilder.setVersionBitmap(booleanList);
- elementList.add(elementsBuilder.build());
- }
- helloInputbuilder.setElements(elementList);
- LOG.debug("sending first hello message: version header={} , version bitmap={}", highestVersion, elementList);
+ helloInput = MessageFactory.createHelloInput(highestVersion, helloXid, ConnectionConductor.versionOrder);
+ LOG.debug("sending first hello message: vertsion header={} , version bitmap={}",
+ highestVersion, helloInput.getElements());
} else {
+ helloInput = MessageFactory.createHelloInput(highestVersion, helloXid);
LOG.debug("sending first hello message: version header={} ", highestVersion);
}
- connectionAdapter.hello(helloInputbuilder.build());
-
+
+ try {
+ RpcResult<Void> helloResult = connectionAdapter.hello(helloInput).get(getMaxTimeout(), getMaxTimeoutUnit());
+ smokeRpc(helloResult);
+ LOG.debug("FIRST HELLO sent.");
+ } catch (Throwable e) {
+ LOG.debug("FIRST HELLO sending failed.");
+ handleException(e);
+ }
}
@Override
- public void onEchoRequestMessage(EchoRequestMessage echoRequestMessage) {
- LOG.debug("echo request received: " + echoRequestMessage.getXid());
- EchoReplyInputBuilder builder = new EchoReplyInputBuilder();
- builder.setVersion(echoRequestMessage.getVersion());
- builder.setXid(echoRequestMessage.getXid());
- builder.setData(echoRequestMessage.getData());
-
- connectionAdapter.echoReply(builder.build());
+ public void onEchoRequestMessage(final EchoRequestMessage echoRequestMessage) {
+ new Thread(new Runnable() {
+ @Override
+ public void run() {
+ LOG.debug("echo request received: " + echoRequestMessage.getXid());
+ EchoReplyInputBuilder builder = new EchoReplyInputBuilder();
+ builder.setVersion(echoRequestMessage.getVersion());
+ builder.setXid(echoRequestMessage.getXid());
+ builder.setData(echoRequestMessage.getData());
+
+ connectionAdapter.echoReply(builder.build());
+ }
+ }).start();
}
@Override
@Override
public void onHelloMessage(final HelloMessage hello) {
// do handshake
- LOG.info("handshake STARTED");
- checkState(CONDUCTOR_STATE.HANDSHAKING);
new Thread(new Runnable() {
-
@Override
public void run() {
+ LOG.info("handshake STARTED");
+ checkState(CONDUCTOR_STATE.HANDSHAKING);
+
Short remoteVersion = hello.getVersion();
List<Elements> elements = hello.getElements();
- long xid = hello.getXid();
- short proposedVersion;
+ Long xid = hello.getXid();
+ Short proposedVersion;
LOG.debug("Hello message version={} and bitmap={}", remoteVersion, elements);
try {
// find the version from header version field
} catch (IllegalArgumentException e) {
handleException(e);
connectionAdapter.disconnect();
- throw e;
+ return;
}
// sent version is equal to remote --> version is negotiated
} catch (IllegalArgumentException ex) {
handleException(ex);
connectionAdapter.disconnect();
- throw ex;
+ return;
}
LOG.debug("sending helloReply for common bitmap version : {}", proposedVersion);
sendHelloReply(proposedVersion, ++xid);
}
}
}
-
}).start();
}
* @param proposedVersion
* @param hello
*/
- private void sendHelloReply(Short proposedVersion, Long xid)
+ protected void sendHelloReply(Short proposedVersion, Long xid)
{
- HelloInputBuilder helloBuilder = new HelloInputBuilder();
- helloBuilder.setVersion(proposedVersion).setXid(xid);
- connectionAdapter.hello(helloBuilder.build());
+ HelloInput helloMsg = MessageFactory.createHelloInput(proposedVersion, xid);
+ RpcResult<Void> result;
+ try {
+ result = connectionAdapter.hello(helloMsg).get(getMaxTimeout(), getMaxTimeoutUnit());
+ smokeRpc(result);
+ } catch (Throwable e) {
+ handleException(e);
+ }
}
+ /**
+ * @param futureResult
+ * @throws Throwable
+ */
+ private static void smokeRpc(RpcResult<?> result) throws Throwable {
+ if (!result.isSuccessful()) {
+ Throwable firstCause = null;
+ StringBuffer sb = new StringBuffer();
+ for (RpcError error : result.getErrors()) {
+ if (firstCause != null) {
+ firstCause = error.getCause();
+ }
+
+ sb.append("rpcError:").append(error.getCause().getMessage()).append(";");
+ }
+ throw new Exception(sb.toString(), firstCause);
+ }
+ }
+
/**
* after handshake set features, register to session
* @param proposedVersion
* @param xId
*/
- private void postHandshake(Short proposedVersion, Long xid) {
+ protected void postHandshake(Short proposedVersion, Long xid) {
// set version
version = proposedVersion;
LOG.debug("version set: " + proposedVersion);
Future<RpcResult<GetFeaturesOutput>> featuresFuture = connectionAdapter
.getFeatures(featuresBuilder.build());
LOG.debug("waiting for features");
- RpcResult<GetFeaturesOutput> rpcFeatures;
try {
- rpcFeatures = featuresFuture.get(getMaxTimeout(),
- TimeUnit.MILLISECONDS);
- if (!rpcFeatures.isSuccessful()) {
- LOG.error("obtained features problem: {}"
- , rpcFeatures.getErrors());
- } else {
- GetFeaturesOutput featureOutput = rpcFeatures.getResult();
- LOG.debug("obtained features: datapathId={}",
- featureOutput.getDatapathId());
- LOG.debug("obtained features: auxiliaryId={}",
- featureOutput.getAuxiliaryId());
- conductorState = CONDUCTOR_STATE.WORKING;
-
- OFSessionUtil.registerSession(this,
- featureOutput, version);
- this.setListenerMapping(OFSessionUtil.getListenersMap());
- LOG.info("handshake SETTLED: datapathId={}, auxiliaryId={}", featureOutput.getDatapathId(), featureOutput.getAuxiliaryId());
- }
- } catch (Exception e) {
+ RpcResult<GetFeaturesOutput> rpcFeatures =
+ featuresFuture.get(getMaxTimeout(), getMaxTimeoutUnit());
+ smokeRpc(rpcFeatures);
+
+ GetFeaturesOutput featureOutput = rpcFeatures.getResult();
+ LOG.debug("obtained features: datapathId={}",
+ featureOutput.getDatapathId());
+ LOG.debug("obtained features: auxiliaryId={}",
+ featureOutput.getAuxiliaryId());
+ conductorState = CONDUCTOR_STATE.WORKING;
+
+ OFSessionUtil.registerSession(this,
+ featureOutput, version);
+ this.setListenerMapping(OFSessionUtil.getListenersMap());
+ LOG.info("handshake SETTLED: datapathId={}, auxiliaryId={}", featureOutput.getDatapathId(), featureOutput.getAuxiliaryId());
+ } catch (Throwable e) {
//handshake failed
LOG.error("issuing disconnect during handshake, reason: "+e.getMessage());
handleException(e);
// TODO:: get from configuration
return 2000;
}
+
+ /**
+ * @return milliseconds
+ */
+ private TimeUnit getMaxTimeoutUnit() {
+ // TODO:: get from configuration
+ return TimeUnit.MILLISECONDS;
+ }
+
/**
* @param e
*/
- private void handleException(Exception e) {
- Exception causeAndThread = new Exception("IN THREAD: "+Thread.currentThread().getName(), e);
+ protected void handleException(Throwable e) {
+ String sessionKeyId = null;
+ if (getSessionContext() != null) {
+ sessionKeyId = Arrays.toString(getSessionContext().getSessionKey().getId());
+ }
+
+ Exception causeAndThread = new Exception(
+ "IN THREAD: "+Thread.currentThread().getName() +
+ "; session:"+sessionKeyId, e);
try {
errorQueue.put(causeAndThread);
} catch (InterruptedException e1) {
try {
// TODO: read timeout from config
RpcResult<EchoOutput> echoReplyValue = echoReplyFuture.get(getMaxTimeout(),
- TimeUnit.SECONDS);
+ getMaxTimeoutUnit());
if (echoReplyValue.isSuccessful()) {
conductorState = CONDUCTOR_STATE.WORKING;
} else {
/**
* @param handshaking
*/
- private void checkState(CONDUCTOR_STATE expectedState) {
+ protected void checkState(CONDUCTOR_STATE expectedState) {
if (!conductorState.equals(expectedState)) {
throw new IllegalStateException("Expected state: " + expectedState
+ ", actual state:" + conductorState);
*/
protected short proposeVersion(short remoteVersion) {
Short proposal = null;
- for (short offer : versionOrder) {
+ for (short offer : ConnectionConductor.versionOrder) {
if (offer <= remoteVersion) {
proposal = offer;
break;
* @param list
* @return
*/
- protected short proposeBitmapVersion(List<Elements> list)
+ protected Short proposeBitmapVersion(List<Elements> list)
{
Short supportedHighestVersion = null;
if((null != list) && (0 != list.size()))
{
List<Boolean> bitmap = element.getVersionBitmap();
// check for version bitmap
- for(short bitPos : versionOrder)
+ for(short bitPos : ConnectionConductor.versionOrder)
{
// with all the version it should work.
if(bitmap.get(bitPos % Integer.SIZE))
*/
public void setListenerMapping(
Map<Class<? extends DataObject>, Collection<IMDMessageListener>> listenerMapping) {
- //TODO: adjust the listener interface
this.listenerMapping = listenerMapping;
}
@Override
public void onConnectionReady() {
- // TODO Auto-generated method stub
+ LOG.debug("connection is ready-to-use");
+ //TODO: fire first helloMessage
+ new Thread(new Runnable() {
+ @Override
+ public void run() {
+ sendFirstHelloMessage();
+ }
+ }).start();
}
+
}
--- /dev/null
+/**
+ * Copyright (c) 2013 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.openflowplugin.openflow.md.core;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.common.types.rev130731.HelloElementType;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.HelloInput;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.HelloInputBuilder;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.hello.Elements;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.hello.ElementsBuilder;
+
+/**
+ * @author mirehak
+ *
+ */
+public abstract class MessageFactory {
+
+ /**
+ * @param helloVersion
+ * @param helloXid
+ * @return HelloInput without elements
+ */
+ public static HelloInput createHelloInput(short helloVersion, long helloXid) {
+ return createHelloInput(helloVersion, helloXid, null);
+ }
+
+ /**
+ * @param highestVersion
+ * @param xid
+ * @return builder with prepared header
+ */
+ private static HelloInputBuilder prepareHelloInputBuilder(
+ short highestVersion, long xid) {
+ HelloInputBuilder helloInputbuilder = new HelloInputBuilder();
+ helloInputbuilder.setVersion(highestVersion);
+ helloInputbuilder.setXid(xid);
+ return helloInputbuilder;
+ }
+
+ /**
+ * @param helloVersion
+ * @param helloXid
+ * @param versionOrder
+ * @return HelloInput with elements (version bitmap)
+ */
+ public static HelloInput createHelloInput(short helloVersion, long helloXid, List<Short> versionOrder) {
+ HelloInputBuilder helloInputbuilder = prepareHelloInputBuilder(helloVersion, helloXid);
+ if (versionOrder != null) {
+ List<Elements> elementList = new ArrayList<>();
+
+ ElementsBuilder elementsBuilder = new ElementsBuilder();
+ elementsBuilder.setType(HelloElementType.VERSIONBITMAP);
+ List<Boolean> booleanList = new ArrayList<>();
+
+ int versionOrderIndex = versionOrder.size() - 1;
+
+ while (versionOrderIndex >= 0) {
+ short version = versionOrder.get(versionOrderIndex);
+ if (version == booleanList.size()) {
+ booleanList.add(true);
+ versionOrderIndex--;
+ } else {
+ booleanList.add(false);
+ }
+ }
+
+ elementsBuilder.setVersionBitmap(booleanList);
+ elementList.add(elementsBuilder.build());
+ helloInputbuilder.setElements(elementList);
+ }
+ return helloInputbuilder.build();
+ }
+}
*/
public class MessageDispatchServiceImpl implements IMessageDispatchService {
- private static final Logger LOG = LoggerFactory.getLogger(OFSessionUtil.class);
+ private static final Logger LOG = LoggerFactory.getLogger(MessageDispatchServiceImpl.class);
private SessionContext session;
private ConnectionAdapter getConnectionAdapter(SwitchConnectionDistinguisher cookie) {
if (!session.isValid()) {
- LOG.warn("Session for the cookie {} is invalid." + cookie);
+ LOG.warn("Session for the cookie {} is invalid.", cookie);
throw new IllegalArgumentException("Session for the cookie is invalid.");
}
- LOG.debug("finding connecton for cookie value {}. " + cookie);
+ LOG.debug("finding connecton for cookie value {}. ", cookie);
// set main connection as default
ConnectionAdapter connectionAdapter = session.getPrimaryConductor().getConnectionAdapter();
if (null != cookie) {
import java.math.BigInteger;
import java.util.ArrayList;
-import java.util.List;
import java.util.Collection;
import java.util.HashMap;
+import java.util.List;
import java.util.Map;
import java.util.Stack;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import org.opendaylight.openflowplugin.openflow.md.core.plan.ConnectionAdapterStackImpl;
import org.opendaylight.openflowplugin.openflow.md.core.plan.EventFactory;
import org.opendaylight.openflowplugin.openflow.md.core.plan.SwitchTestEvent;
-import org.opendaylight.openflowplugin.openflow.md.core.session.OFSessionUtil;
import org.opendaylight.openflowplugin.openflow.md.core.session.SessionContext;
import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.common.types.rev130731.ErrorType;
import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.common.types.rev130731.HelloElementType;
import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.ExperimenterMessageBuilder;
import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.FlowRemovedMessage;
import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.FlowRemovedMessageBuilder;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.GetFeaturesOutput;
import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.GetFeaturesOutputBuilder;
import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.HelloMessageBuilder;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.hello.Elements;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.hello.ElementsBuilder;
import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.PacketInMessage;
import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.PacketInMessageBuilder;
import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.PortStatusMessage;
import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.PortStatusMessageBuilder;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.hello.Elements;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.hello.ElementsBuilder;
import org.opendaylight.yangtools.yang.binding.DataObject;
-import org.opendaylight.yangtools.yang.binding.Notification;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
*/
public class ConnectionConductorImplTest {
- private static final Logger LOG = LoggerFactory
+ protected static final Logger LOG = LoggerFactory
.getLogger(ConnectionConductorImplTest.class);
protected ConnectionAdapterStackImpl adapter;
private Thread libSimulation;
private ScheduledThreadPoolExecutor pool = new ScheduledThreadPoolExecutor(
8);
- private int experimenterMessageCounter;
- private int packetinMessageCounter;
- private int flowremovedMessageCounter;
- private int portstatusAddMessageCounter;
- private int portstatusDeleteMessageCounter;
- private int portstatusModifyMessageCounter;
+ protected int experimenterMessageCounter;
+ protected int packetinMessageCounter;
+ protected int flowremovedMessageCounter;
+ protected int portstatusAddMessageCounter;
+ protected int portstatusDeleteMessageCounter;
+ protected int portstatusModifyMessageCounter;
/**
* @throws java.lang.Exception
builder1.setExperimenter(84L).setExpType(4L);
eventPlan.add(0, EventFactory.createDefaultNotificationEvent(42L,
EventFactory.DEFAULT_VERSION, builder1));
+
+ connectionConductor.setListenerMapping(assembleListenerMapping());
+
executeLater();
Runnable sendExperimenterCmd = new Runnable() {
eventPlan.add(
0,
EventFactory.createDefaultNotificationEvent(42L, (short) 0x05,
- getHelloBitmapMessage(Lists.newArrayList((short) 0x05, (short) 0x04))));
+ getHelloBitmapMessage(Lists.newArrayList((short) 0x05, (short) 0x02))));
executeNow();
Assert.assertNull(connectionConductor.getVersion());
}
int elementsCount = highestVersion / Integer.SIZE;
ElementsBuilder elementsBuilder = new ElementsBuilder();
- List<Elements> elementList = new ArrayList<Elements>();
+ List<Elements> elementList = new ArrayList<>();
int orderIndex = versionOrder.size();
int value = versionOrder.get(--orderIndex);
for (int index = 0; index <= elementsCount; index++) {
- List<Boolean> booleanList = new ArrayList<Boolean>();
+ List<Boolean> booleanList = new ArrayList<>();
for (int i = 0; i < Integer.SIZE; i++) {
if (value == ((index * Integer.SIZE) + i)) {
booleanList.add(true);
*/
@Test
public void testOnExperimenterMessage() throws InterruptedException {
- IMDMessageListener objEms = new ExperimenterMessageService() ;
- Map<Class<? extends DataObject>, Collection<IMDMessageListener>> listenerMapping = new HashMap<Class<? extends DataObject>, Collection<IMDMessageListener>>();
- Collection<IMDMessageListener> existingValues = new ArrayList<IMDMessageListener>();
- existingValues.add(objEms);
- listenerMapping.put(ExperimenterMessage.class, existingValues);
- connectionConductor.setListenerMapping(listenerMapping);
+ connectionConductor.setListenerMapping(assembleListenerMapping());
ExperimenterMessageBuilder builder1 = new ExperimenterMessageBuilder();
builder1.setExperimenter(84L).setExpType(4L);
connectionConductor.onExperimenterMessage(builder1.build());
Assert.assertEquals(2, experimenterMessageCounter);
}
+ /**
+ * @return listener mapping
+ */
+ private Map<Class<? extends DataObject>, Collection<IMDMessageListener>> assembleListenerMapping() {
+ IMDMessageListener objEms = new ExperimenterMessageService() ;
+ Map<Class<? extends DataObject>, Collection<IMDMessageListener>> listenerMapping = new HashMap<>();
+ Collection<IMDMessageListener> existingValues = new ArrayList<>();
+ existingValues.add(objEms);
+ listenerMapping.put(ExperimenterMessage.class, existingValues);
+ return listenerMapping;
+ }
+
}
--- /dev/null
+/**
+ * Copyright (c) 2013 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.openflowplugin.openflow.md.core;
+
+
+import org.junit.Assert;
+import org.junit.Test;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.common.types.rev130731.HelloElementType;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.HelloInput;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.hello.Elements;
+
+/**
+ * @author mirehak
+ *
+ */
+public class MessageFactoryTest {
+
+ /**
+ * Test method for {@link org.opendaylight.openflowplugin.openflow.md.core.MessageFactory#createHelloInputWoElements(java.lang.Short, java.lang.Long)}.
+ */
+ @Test
+ public void testCreateHelloInputWoElements() {
+ short highestVersion = (short) 0x04;
+ long xid = 42L;
+
+ HelloInput helloMsg = MessageFactory.createHelloInput(highestVersion, xid);
+ Assert.assertEquals(highestVersion, helloMsg.getVersion().shortValue());
+ Assert.assertEquals(xid, helloMsg.getXid().longValue());
+ Assert.assertNull(helloMsg.getElements());
+ }
+
+ /**
+ * Test method for {@link org.opendaylight.openflowplugin.openflow.md.core.MessageFactory#createHelloInputWithElements(java.lang.Short, java.lang.Long, java.util.List)}.
+ */
+ @Test
+ public void testCreateHelloInputWithElements() {
+ short highestVersion = (short) 0x04;
+ long xid = 42L;
+ Boolean[] expectedVersionBitmap = new Boolean[]{
+ false, true, false, false, true};
+
+ HelloInput helloMsg = MessageFactory.createHelloInput(highestVersion, xid,
+ ConnectionConductor.versionOrder);
+ Assert.assertEquals(highestVersion, helloMsg.getVersion().shortValue());
+ Assert.assertEquals(xid, helloMsg.getXid().longValue());
+ Assert.assertEquals(1, helloMsg.getElements().size());
+ Elements actualElement = helloMsg.getElements().get(0);
+ Assert.assertEquals(HelloElementType.VERSIONBITMAP, actualElement.getType());
+ Assert.assertArrayEquals(expectedVersionBitmap, actualElement.getVersionBitmap().toArray(new Boolean[0]));
+ }
+
+}
import java.util.List;
import java.util.Map;
import java.util.Stack;
-import java.util.concurrent.Callable;
import java.util.concurrent.Future;
-import java.util.concurrent.ScheduledThreadPoolExecutor;
-import java.util.concurrent.TimeUnit;
import org.opendaylight.controller.sal.common.util.RpcErrors;
import org.opendaylight.controller.sal.common.util.Rpcs;
protected SystemNotificationsListener systemListener;
protected Map<Long, SettableFuture<?>> rpcResults = new HashMap<>();
- private ScheduledThreadPoolExecutor pool = new ScheduledThreadPoolExecutor(
- 8);
protected boolean planTouched = false;
private long proceedTimeout;
protected List<Exception> occuredExceptions = new ArrayList<>();
private ConnectionReadyListener connectionReadyListener;
+
+ private int planItemCounter;
/**
* default ctor
String msg = null;
LOG.debug("checking rpc: " + rpcName);
if (!(eventPlan.peek() instanceof SwitchTestWaitForRpcEvent)) {
- msg = "expected [rpc], got [" + rpcInput.getClass().getSimpleName()
- + "]";
+ if (eventPlan.peek() instanceof SwitchTestNotificationEvent) {
+ SwitchTestNotificationEvent notifEvent = (SwitchTestNotificationEvent) (eventPlan.peek());
+ msg = "expected [notification: " +notifEvent.getPlannedNotification()+ "], got [" + rpcInput.getClass().getSimpleName()
+ + "]";
+ } else if (eventPlan.peek() instanceof SwitchTestRcpResponseEvent) {
+ SwitchTestRcpResponseEvent rpcEvent = (SwitchTestRcpResponseEvent) (eventPlan.peek());
+ msg = "expected [rpc: " +rpcEvent.getPlannedRpcResponse()+ "], got [" + rpcInput.getClass().getSimpleName()
+ + "]";
+ }
} else {
SwitchTestWaitForRpcEvent switchTestRpcEvent = (SwitchTestWaitForRpcEvent) eventPlan
.peek();
}
if (msg != null) {
- LOG.debug("check .. FAILED: " + msg);
+ LOG.debug("rpc check .. FAILED: " + msg);
occuredExceptions.add(new IllegalArgumentException(msg));
}
- LOG.debug("check .. OK");
+ LOG.debug("rpc check .. OK");
}
/**
* discard current event, execute next, if possible
*/
private synchronized void next() {
- LOG.debug("STEPPING TO NEXT event in plan");
+ LOG.debug("<---> STEPPING TO NEXT event in plan (leaving [{}] {})", planItemCounter, eventPlan.peek());
eventPlan.pop();
+ planItemCounter ++;
planTouched = true;
notify();
}
*/
private synchronized void proceed() {
boolean processed = false;
- LOG.debug("proceeding plan item: " + eventPlan.peek());
+ LOG.debug("proceeding plan item[{}]: {}", planItemCounter, eventPlan.peek());
if (eventPlan.peek() instanceof SwitchTestNotificationEvent) {
SwitchTestNotificationEvent notification = (SwitchTestNotificationEvent) eventPlan
.peek();
next();
} else {
try {
- LOG.debug("now waiting for HANDLER to act");
+ LOG.debug("now WAITING for OF_LISTENER to act ..");
wait(proceedTimeout);
} catch (InterruptedException e) {
LOG.error(e.getMessage(), e);
@Override
public void run() {
- LOG.debug("evenPlan STARTING ..");
+ LOG.debug("|---> evenPlan STARTING ..");
+ planItemCounter = 0;
while (!eventPlan.isEmpty()) {
planTouched = false;
proceed();
}
try {
- pool.awaitTermination(10 * JOB_DELAY, TimeUnit.MILLISECONDS);
+ Thread.sleep(JOB_DELAY);
} catch (InterruptedException e) {
LOG.error(e.getMessage(), e);
}
- LOG.debug("eventPlan done");
+ LOG.debug("<---| eventPlan DONE");
}
/**
private void processNotification(
final SwitchTestNotificationEvent notificationEvent) {
- Callable<Void> notifyCmd = new Callable<Void>() {
- @Override
- public Void call() throws Exception {
- Notification notification = notificationEvent
- .getPlannedNotification();
- LOG.debug("notificating HANDLER: "
- + notification.getClass().getSimpleName());
-
- // system events
- if (notification instanceof DisconnectEvent) {
- systemListener
- .onDisconnectEvent((DisconnectEvent) notification);
- }
- // of notifications
- else if (notification instanceof EchoRequestMessage) {
- ofListener
- .onEchoRequestMessage((EchoRequestMessage) notification);
- } else if (notification instanceof ErrorMessage) {
- ofListener.onErrorMessage((ErrorMessage) notification);
- } else if (notification instanceof ExperimenterMessage) {
- ofListener
- .onExperimenterMessage((ExperimenterMessage) notification);
- } else if (notification instanceof FlowRemovedMessage) {
- ofListener
- .onFlowRemovedMessage((FlowRemovedMessage) notification);
- } else if (notification instanceof HelloMessage) {
- ofListener.onHelloMessage((HelloMessage) notification);
- } else if (notification instanceof MultipartReplyMessage) {
- ofListener
- .onMultipartReplyMessage((MultipartReplyMessage) notification);
- } else if (notification instanceof MultipartRequestMessage) {
- ofListener
- .onMultipartRequestMessage((MultipartRequestMessage) notification);
- } else if (notification instanceof PacketInMessage) {
- ofListener
- .onPacketInMessage((PacketInMessage) notification);
- } else if (notification instanceof PortStatusMessage) {
- ofListener
- .onPortStatusMessage((PortStatusMessage) notification);
- }
- // default
- else {
- occuredExceptions.add(new IllegalStateException(
- "message listening not supported for type: "
- + notification.getClass()));
- }
-
- LOG.debug("thread finished");
- return null;
- }
+ Notification notification = notificationEvent
+ .getPlannedNotification();
+ LOG.debug("notificating OF_LISTENER: "
+ + notification.getClass().getSimpleName());
- };
+ // system events
+ if (notification instanceof DisconnectEvent) {
+ systemListener
+ .onDisconnectEvent((DisconnectEvent) notification);
+ }
+ // of notifications
+ else if (notification instanceof EchoRequestMessage) {
+ ofListener
+ .onEchoRequestMessage((EchoRequestMessage) notification);
+ } else if (notification instanceof ErrorMessage) {
+ ofListener.onErrorMessage((ErrorMessage) notification);
+ } else if (notification instanceof ExperimenterMessage) {
+ ofListener
+ .onExperimenterMessage((ExperimenterMessage) notification);
+ } else if (notification instanceof FlowRemovedMessage) {
+ ofListener
+ .onFlowRemovedMessage((FlowRemovedMessage) notification);
+ } else if (notification instanceof HelloMessage) {
+ ofListener.onHelloMessage((HelloMessage) notification);
+ } else if (notification instanceof MultipartReplyMessage) {
+ ofListener
+ .onMultipartReplyMessage((MultipartReplyMessage) notification);
+ } else if (notification instanceof MultipartRequestMessage) {
+ ofListener
+ .onMultipartRequestMessage((MultipartRequestMessage) notification);
+ } else if (notification instanceof PacketInMessage) {
+ ofListener
+ .onPacketInMessage((PacketInMessage) notification);
+ } else if (notification instanceof PortStatusMessage) {
+ ofListener
+ .onPortStatusMessage((PortStatusMessage) notification);
+ }
+ // default
+ else {
+ occuredExceptions.add(new IllegalStateException(
+ "message listening not supported for type: "
+ + notification.getClass()));
+ }
- pool.schedule(notifyCmd, JOB_DELAY, TimeUnit.MILLISECONDS);
+ LOG.debug("notification ["+notification.getClass().getSimpleName()+"] .. done");
}
/**
* @param rpcResponse
*/
private void processRpcResponse(final SwitchTestRcpResponseEvent rpcResponse) {
- Callable<Void> notifyCmd = new Callable<Void>() {
- @Override
- public Void call() throws Exception {
-
- OfHeader plannedRpcResponseValue = rpcResponse
- .getPlannedRpcResponse();
- LOG.debug("rpc-responding to HANDLER: " + rpcResponse.getXid());
-
- @SuppressWarnings("unchecked")
- SettableFuture<RpcResult<?>> response = (SettableFuture<RpcResult<?>>) rpcResults
- .get(rpcResponse.getXid());
-
- if (response != null) {
- boolean successful = plannedRpcResponseValue != null;
- Collection<RpcError> errors;
- if (successful) {
- errors = Collections.emptyList();
- } else {
- errors = Lists
- .newArrayList(RpcErrors
- .getRpcError(
- "unit",
- "unit",
- "not requested",
- ErrorSeverity.ERROR,
- "planned response to RPC.id = "
- + rpcResponse.getXid(),
+ OfHeader plannedRpcResponseValue = rpcResponse
+ .getPlannedRpcResponse();
+ LOG.debug("rpc-responding to OF_LISTENER: " + rpcResponse.getXid());
+
+ @SuppressWarnings("unchecked")
+ SettableFuture<RpcResult<?>> response = (SettableFuture<RpcResult<?>>) rpcResults
+ .get(rpcResponse.getXid());
+
+ if (response != null) {
+ boolean successful = plannedRpcResponseValue != null;
+ Collection<RpcError> errors;
+ if (successful) {
+ errors = Collections.emptyList();
+ } else {
+ errors = Lists
+ .newArrayList(RpcErrors
+ .getRpcError(
+ "unit",
+ "unit",
+ "not requested",
+ ErrorSeverity.ERROR,
+ "planned response to RPC.id = "
+ + rpcResponse.getXid(),
ErrorType.RPC,
new Exception(
"rpc response failed (planned behavior)")));
- }
- RpcResult<?> result = Rpcs.getRpcResult(successful,
- plannedRpcResponseValue, errors);
- response.set(result);
- } else {
- String msg = "RpcResponse not expected: xid="
- + rpcResponse.getXid()
- + ", "
- + plannedRpcResponseValue.getClass()
- .getSimpleName();
- LOG.error(msg);
- occuredExceptions.add(new IllegalStateException(msg));
- }
-
- LOG.debug("thread finished");
- return null;
}
- };
+ RpcResult<?> result = Rpcs.getRpcResult(successful,
+ plannedRpcResponseValue, errors);
+ response.set(result);
+ } else {
+ String msg = "RpcResponse not expected: xid="
+ + rpcResponse.getXid()
+ + ", "
+ + plannedRpcResponseValue.getClass()
+ .getSimpleName();
+ LOG.error(msg);
+ occuredExceptions.add(new IllegalStateException(msg));
+ }
- pool.schedule(notifyCmd, JOB_DELAY, TimeUnit.MILLISECONDS);
+ LOG.debug("rpc ["+rpcResponse.getXid()+"] .. done");
}
/**
*/
private static SettableFuture<RpcResult<Void>> createOneWayRpcResult() {
SettableFuture<RpcResult<Void>> result = SettableFuture.create();
- result.set(null);
+ List<RpcError> errors = Collections.emptyList();
+ result.set(Rpcs.getRpcResult(true, (Void) null, errors));
return result;
}
@Override
public void fireConnectionReadyNotification() {
- if (connectionReadyListener != null) {
connectionReadyListener.onConnectionReady();
- }
}
@Override
public MessageType getMessageType() {
return adapter.getMessageType();
}
-
}
enum MessageType {
@Override
public void fireConnectionReadyNotification() {
- if (connectionReadyListener != null) {
connectionReadyListener.onConnectionReady();
- }
}
@Override