<version>0.0.1-SNAPSHOT</version>
<packaging>pom</packaging>
+ <properties>
+ <project.build.sourceEncoding>utf-8</project.build.sourceEncoding>
+ </properties>
+
+ <build>
+ <pluginManagement>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-compiler-plugin</artifactId>
+ <configuration>
+ <source>1.7</source>
+ <target>1.7</target>
+ </configuration>
+ </plugin>
+ </plugins>
+ </pluginManagement>
+ </build>
+
<repositories>
<!-- OpenDayLight Released artifact -->
<repository>
<version>0.0.1-SNAPSHOT</version>
<packaging>bundle</packaging>
+ <properties>
+ <yang.prototype.version>1.0-SNAPSHOT</yang.prototype.version>
+ </properties>
+
+
<build>
<plugins>
<plugin>
org.opendaylight.openflowplugin.openflow.internal.Activator
</Bundle-Activator>
</instructions>
- <manifestLocation>${project.basedir}/META-INF</manifestLocation>
+ <manifestLocation>${project.build.directory}/META-INF</manifestLocation>
</configuration>
</plugin>
</plugins>
<artifactId>openflow-protocol-api</artifactId>
<version>0.1-SNAPSHOT</version>
</dependency>
+ <dependency>
+ <groupId>org.opendaylight.openflowjava</groupId>
+ <artifactId>openflow-protocol-spi</artifactId>
+ <version>0.1-SNAPSHOT</version>
+ </dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.8.1</version>
<scope>test</scope>
</dependency>
+ <dependency>
+ <groupId>org.opendaylight.controller</groupId>
+ <artifactId>sal-common-util</artifactId>
+ <version>${yang.prototype.version}</version>
+ <scope>test</scope>
+ </dependency>
</dependencies>
</project>
import org.opendaylight.controller.sal.topology.IPluginInTopologyService;
import org.opendaylight.controller.sal.topology.IPluginOutTopologyService;
import org.opendaylight.controller.sal.utils.GlobalConstants;
+import org.opendaylight.openflowjava.protocol.spi.connection.SwitchConnectionProvider;
import org.opendaylight.openflowplugin.openflow.IDataPacketListen;
import org.opendaylight.openflowplugin.openflow.IDataPacketMux;
import org.opendaylight.openflowplugin.openflow.IDiscoveryListener;
import org.opendaylight.openflowplugin.openflow.core.IController;
import org.opendaylight.openflowplugin.openflow.core.IMessageListener;
import org.opendaylight.openflowplugin.openflow.core.internal.Controller;
+import org.opendaylight.openflowplugin.openflow.md.core.MDController;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Object[] res = { Controller.class, OFStatisticsManager.class,
FlowProgrammerService.class, ReadServiceFilter.class,
DiscoveryService.class, DataPacketMuxDemux.class, InventoryService.class,
- InventoryServiceShim.class, TopologyServiceShim.class };
+ InventoryServiceShim.class, TopologyServiceShim.class,
+ MDController.class};
return res;
}
.setCallbacks("setStatisticsManager",
"unsetStatisticsManager").setRequired(false));
}
+
+ if (imp.equals(MDController.class)) {
+// c.setInterface(new String[] { IDiscoveryListener.class.getName(),
+// IContainerListener.class.getName(),
+// IRefreshInternalProvider.class.getName(),
+// IInventoryShimExternalListener.class.getName() }, null);
+ c.add(createServiceDependency()
+ .setService(SwitchConnectionProvider.class)
+ .setCallbacks("setSwitchConnectionProvider",
+ "unsetSwitchConnectionProvider")
+ .setRequired(true));
+ logger.debug("configuring MDController ..");
+ }
}
}
--- /dev/null
+/**
+ * Copyright (c) 2013 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.openflowplugin.openflow.md.core;
+
+import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.GetFeaturesOutput;
+
+/**
+ * @author mirehak
+ */
+/**
+ * @author mirehak
+ *
+ */
+public interface ConnectionConductor {
+
+ /** distinguished connection states */
+ public enum CONDUCTOR_STATE {
+ /** initial phase of talking to switch */
+ HANDSHAKING,
+ /** standard phase - interacting with switch */
+ WORKING,
+ /** talking to switch is over - resting in pieces */
+ RIP
+ }
+
+ /**
+ * initialize wiring around {@link #connectionAdapter}
+ */
+ public void init();
+
+ /**
+ * @return the negotiated version
+ */
+ public Short getVersion();
+
+ /**
+ * @return the state of conductor
+ */
+ public CONDUCTOR_STATE getConductorState();
+
+ /**
+ * @param conductorState
+ */
+ public void setConductorState(CONDUCTOR_STATE conductorState);
+
+ /**
+ * @return the switch features
+ */
+ public GetFeaturesOutput getFeatures();
+}
--- /dev/null
+/**
+ * Copyright (c) 2013 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.openflowplugin.openflow.md.core;
+
+import org.opendaylight.openflowjava.protocol.api.connection.ConnectionAdapter;
+
+/**
+ * @author mirehak
+ *
+ */
+public abstract class ConnectionConductorFactory {
+
+ /**
+ * @param connectionAdapter
+ * @return conductor for given connection
+ */
+ public static ConnectionConductor createConductor(ConnectionAdapter connectionAdapter) {
+ ConnectionConductor connectionConductor = new ConnectionConductorImpl(connectionAdapter);
+ connectionConductor.init();
+ return connectionConductor;
+ }
+
+}
--- /dev/null
+/**
+ * Copyright (c) 2013 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.openflowplugin.openflow.md.core;
+
+import java.util.List;
+import java.util.concurrent.Future;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+
+import org.opendaylight.openflowjava.protocol.api.connection.ConnectionAdapter;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.EchoReplyInputBuilder;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.EchoRequestMessage;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.ErrorMessage;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.ExperimenterMessage;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.FlowRemovedMessage;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.GetFeaturesInputBuilder;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.GetFeaturesOutput;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.HelloInputBuilder;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.HelloMessage;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.MultipartReplyMessage;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.MultipartRequestMessage;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.OpenflowProtocolListener;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.PacketInMessage;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.PortStatusMessage;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.system.rev130927.DisconnectEvent;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.system.rev130927.SystemNotificationsListener;
+import org.opendaylight.yangtools.yang.common.RpcResult;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.collect.Lists;
+
+/**
+ * @author mirehak
+ */
+public class ConnectionConductorImpl implements OpenflowProtocolListener,
+ SystemNotificationsListener, ConnectionConductor {
+
+ private static final Logger LOG = LoggerFactory
+ .getLogger(ConnectionConductorImpl.class);
+
+ private LinkedBlockingQueue<Exception> errorQueue = new LinkedBlockingQueue<>();
+
+ private final ConnectionAdapter connectionAdapter;
+ private final List<Short> versionOrder;
+ private ConnectionConductor.CONDUCTOR_STATE conductorState;
+ private Short version;
+
+ private GetFeaturesOutput features;
+
+ /**
+ * @param connectionAdapter
+ */
+ public ConnectionConductorImpl(ConnectionAdapter connectionAdapter) {
+ this.connectionAdapter = connectionAdapter;
+ conductorState = CONDUCTOR_STATE.HANDSHAKING;
+ versionOrder = Lists.newArrayList((short) 0x04, (short) 0x01);
+ new Thread(new ErrorQueueHandler(errorQueue)).start();
+ }
+
+ @Override
+ public void init() {
+ connectionAdapter.setMessageListener(this);
+ connectionAdapter.setSystemListener(this);
+ }
+
+ @Override
+ public void onEchoRequestMessage(EchoRequestMessage arg0) {
+ LOG.debug("echo request received: " + arg0.getXid());
+ EchoReplyInputBuilder builder = new EchoReplyInputBuilder();
+ builder.setVersion(arg0.getVersion());
+ builder.setXid(arg0.getXid());
+ builder.setData(arg0.getData());
+
+ connectionAdapter.echoReply(builder.build());
+ }
+
+ @Override
+ public void onErrorMessage(ErrorMessage errorMessage) {
+ // TODO Auto-generated method stub
+ LOG.debug("error received, type: " + errorMessage.getType()
+ + "; code: " + errorMessage.getCode());
+ }
+
+ @Override
+ public void onExperimenterMessage(ExperimenterMessage experimenterMessage) {
+ // TODO Auto-generated method stub
+ LOG.debug("experimenter received, type: "
+ + experimenterMessage.getExpType());
+ }
+
+ @Override
+ public void onFlowRemovedMessage(FlowRemovedMessage arg0) {
+ // TODO Auto-generated method stub
+ }
+
+ @Override
+ public void onHelloMessage(HelloMessage hello) {
+ // do handshake
+ LOG.info("handshake STARTED");
+ checkState(CONDUCTOR_STATE.HANDSHAKING);
+
+ Short remoteVersion = hello.getVersion();
+ short proposedVersion;
+ try {
+ proposedVersion = proposeVersion(remoteVersion);
+ } catch (Exception e) {
+ handleException(e);
+ throw e;
+ }
+ HelloInputBuilder helloBuilder = new HelloInputBuilder();
+ helloBuilder.setVersion(proposedVersion).setXid(hello.getXid());
+ LOG.debug("sending helloReply");
+ connectionAdapter.hello(helloBuilder.build());
+
+ if (proposedVersion != remoteVersion) {
+ // need to wait for another hello
+ } else {
+ // sent version is equal to remote --> version is negotiated
+ version = proposedVersion;
+ LOG.debug("version set: " + proposedVersion);
+
+ // request features
+ GetFeaturesInputBuilder featuresBuilder = new GetFeaturesInputBuilder();
+ featuresBuilder.setVersion(version).setXid(hello.getXid());
+ Future<RpcResult<GetFeaturesOutput>> featuresFuture = connectionAdapter
+ .getFeatures(featuresBuilder.build());
+ LOG.debug("waiting for features");
+ RpcResult<GetFeaturesOutput> rpcFeatures;
+ try {
+ rpcFeatures = featuresFuture.get(getMaxTimeout(),
+ TimeUnit.MILLISECONDS);
+ LOG.debug("obtained features: datapathId="
+ + rpcFeatures.getResult().getDatapathId());
+ conductorState = CONDUCTOR_STATE.WORKING;
+ this.features = rpcFeatures.getResult();
+ LOG.info("handshake SETTLED");
+ } catch (Exception e) {
+ handleException(e);
+ }
+ }
+ }
+
+ /**
+ * @return rpc-response timeout in [ms]
+ */
+ private long getMaxTimeout() {
+ // TODO:: get from configuration
+ return 2000;
+ }
+
+ /**
+ * @param e
+ */
+ private void handleException(Exception e) {
+ try {
+ errorQueue.put(e);
+ } catch (InterruptedException e1) {
+ LOG.error(e1.getMessage(), e1);
+ }
+ }
+
+ @Override
+ public void onMultipartReplyMessage(MultipartReplyMessage arg0) {
+ // TODO Auto-generated method stub
+ }
+
+ @Override
+ public void onMultipartRequestMessage(MultipartRequestMessage arg0) {
+ // TODO Auto-generated method stub
+ }
+
+ @Override
+ public void onPacketInMessage(PacketInMessage arg0) {
+ // TODO Auto-generated method stub
+ }
+
+ @Override
+ public void onPortStatusMessage(PortStatusMessage arg0) {
+ // TODO Auto-generated method stub
+ }
+
+ /**
+ * @param conductorState
+ * the connectionState to set
+ */
+ @Override
+ public void setConductorState(CONDUCTOR_STATE conductorState) {
+ this.conductorState = conductorState;
+ }
+
+ @Override
+ public CONDUCTOR_STATE getConductorState() {
+ return conductorState;
+ }
+
+ /**
+ * @param handshaking
+ */
+ private void checkState(CONDUCTOR_STATE expectedState) {
+ if (!conductorState.equals(expectedState)) {
+ throw new IllegalStateException("Expected state: " + expectedState
+ + ", actual state:" + conductorState);
+ }
+ }
+
+ @Override
+ public void onDisconnectEvent(DisconnectEvent arg0) {
+ // TODO Auto-generated method stub
+ }
+
+ protected short proposeVersion(short remoteVersion) {
+ Short proposal = null;
+ for (short offer : versionOrder) {
+ if (offer <= remoteVersion) {
+ proposal = offer;
+ break;
+ }
+ }
+ if (proposal == null) {
+ throw new IllegalArgumentException("unsupported version: "
+ + remoteVersion);
+ }
+ return proposal;
+ }
+
+ @Override
+ public Short getVersion() {
+ return version;
+ }
+
+ @Override
+ public GetFeaturesOutput getFeatures() {
+ return features;
+ }
+}
--- /dev/null
+/**
+ * Copyright (c) 2013 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.openflowplugin.openflow.md.core;
+
+import java.net.InetAddress;
+
+import org.opendaylight.openflowjava.protocol.api.connection.ConnectionConfiguration;
+
+/**
+ * @author mirehak
+ */
+public abstract class ConnectionConfigurationFactory {
+
+ /** OF default listening port */
+ private static final int DEFAULT_OF_PORT = 6653;
+ /** OF legacy default listening port */
+ private static final int LEGACY_OF_PORT = 6633;
+
+ /**
+ * @return default connection configuration
+ */
+ public static ConnectionConfiguration getDefault() {
+ return new ConnectionConfiguration() {
+
+ @Override
+ public InetAddress getAddress() {
+ // all interfaces
+ return null;
+ }
+
+ @Override
+ public int getPort() {
+ return DEFAULT_OF_PORT;
+ }
+
+ @Override
+ public FEATURE_SUPPORT getTlsSupport() {
+ return FEATURE_SUPPORT.NOT_SUPPORTED;
+ }
+
+ @Override
+ public Object getTransferProtocol() {
+ // TODO:: TCP/UDP ...
+ return null;
+ }
+ };
+ }
+
+}
--- /dev/null
+/**
+ * Copyright (c) 2013 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.openflowplugin.openflow.md.core;
+
+import java.util.concurrent.LinkedBlockingQueue;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * dumping all exceptions to log
+ * @author mirehak
+ */
+public class ErrorQueueHandler implements Runnable {
+
+ private static final Logger LOG = LoggerFactory
+ .getLogger(ErrorQueueHandler.class);
+
+ private LinkedBlockingQueue<Exception> errorQueue;
+
+ /**
+ * @param errorQueue
+ */
+ public ErrorQueueHandler(LinkedBlockingQueue<Exception> errorQueue) {
+ this.errorQueue = errorQueue;
+ }
+
+ @Override
+ public void run() {
+ while (true) {
+ Exception error;
+ try {
+ error = errorQueue.take();
+ LOG.error(error.getMessage(), error);
+ } catch (InterruptedException e) {
+ LOG.warn(e.getMessage());
+ }
+ }
+ }
+}
--- /dev/null
+/**
+ * Copyright (c) 2013 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.openflowplugin.openflow.md.core;
+
+import java.util.Collection;
+import java.util.List;
+import java.util.concurrent.Future;
+
+import org.opendaylight.openflowjava.protocol.api.connection.ConnectionConfiguration;
+import org.opendaylight.openflowjava.protocol.api.connection.SwitchConnectionHandler;
+import org.opendaylight.openflowjava.protocol.spi.connection.SwitchConnectionProvider;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.collect.Lists;
+
+/**
+ * @author mirehak
+ *
+ */
+public class MDController {
+
+ private static final Logger LOG = LoggerFactory
+ .getLogger(MDController.class);
+
+ private SwitchConnectionProvider switchConnectionProvider;
+
+ /**
+ * @param switchConnectionProvider the switchConnectionProvider to set
+ */
+ public void setSwitchConnectionProvider(
+ SwitchConnectionProvider switchConnectionProvider) {
+ this.switchConnectionProvider = switchConnectionProvider;
+ }
+
+ /**
+ * @param switchConnectionProviderToUnset the switchConnectionProvider to unset
+ */
+ public void unsetSwitchConnectionProvider(
+ SwitchConnectionProvider switchConnectionProviderToUnset) {
+ if (this.switchConnectionProvider == switchConnectionProviderToUnset) {
+ this.switchConnectionProvider = null;
+ }
+ }
+
+ /**
+ * Function called by dependency manager after "init ()" is called and after
+ * the services provided by the class are registered in the service registry
+ *
+ */
+ public void start() {
+ LOG.debug("starting ..");
+ LOG.debug("switchConnectionProvider: "+switchConnectionProvider);
+ // setup handler
+ SwitchConnectionHandler switchConnectionHandler = new SwitchConnectionHandlerImpl();
+ switchConnectionProvider.setSwitchConnectionHandler(switchConnectionHandler);
+ // configure and startup library servers
+ switchConnectionProvider.configure(getConnectionConfiguration());
+ Future<List<Boolean>> srvStarted = switchConnectionProvider.startup();
+ }
+
+ /**
+ * @return wished connections configurations
+ */
+ private Collection<ConnectionConfiguration> getConnectionConfiguration() {
+ //TODO:: get config from state manager
+ ConnectionConfiguration configuration = ConnectionConfigurationFactory.getDefault();
+ return Lists.newArrayList(configuration);
+ }
+
+ /**
+ * Function called by the dependency manager before the services exported by
+ * the component are unregistered, this will be followed by a "destroy ()"
+ * calls
+ *
+ */
+ public void stop() {
+ LOG.debug("stopping");
+ }
+
+ /**
+ * Function called by the dependency manager when at least one dependency
+ * become unsatisfied or when the component is shutting down because for
+ * example bundle is being stopped.
+ *
+ */
+ public void destroy() {
+ // do nothing
+ }
+
+}
--- /dev/null
+/**
+ * Copyright (c) 2013 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.openflowplugin.openflow.md.core;
+
+import java.net.InetAddress;
+
+import org.opendaylight.openflowjava.protocol.api.connection.ConnectionAdapter;
+import org.opendaylight.openflowjava.protocol.api.connection.SwitchConnectionHandler;
+
+/**
+ * @author mirehak
+ *
+ */
+public class SwitchConnectionHandlerImpl implements SwitchConnectionHandler {
+
+ @Override
+ public boolean accept(InetAddress address) {
+ // TODO:: add policy derived rules
+ return true;
+ }
+
+ @Override
+ public void onSwitchConnected(ConnectionAdapter connectionAdapter) {
+ ConnectionConductor conductor = ConnectionConductorFactory.createConductor(connectionAdapter);
+ //TODO:: store conductor
+ }
+
+}
--- /dev/null
+/**
+ * Copyright (c) 2013 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.openflowplugin.openflow.md.core;
+
+import java.math.BigInteger;
+import java.util.Stack;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.opendaylight.openflowplugin.openflow.md.core.plan.ConnectionAdapterStackImpl;
+import org.opendaylight.openflowplugin.openflow.md.core.plan.EventFactory;
+import org.opendaylight.openflowplugin.openflow.md.core.plan.SwitchTestEvent;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.common.types.rev130731.ErrorType;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.EchoRequestMessageBuilder;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.ErrorMessageBuilder;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.ExperimenterInputBuilder;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.ExperimenterMessageBuilder;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.GetFeaturesOutputBuilder;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.HelloMessageBuilder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * @author mirehak
+ */
+public class ConnectionConductorImplTest {
+
+ private static final Logger LOG = LoggerFactory
+ .getLogger(ConnectionConductorImplTest.class);
+
+ protected ConnectionAdapterStackImpl adapter;
+ private ConnectionConductorImpl connectionConductor;
+ private Stack<SwitchTestEvent> eventPlan;
+
+ private Thread libSimulation;
+ private ScheduledThreadPoolExecutor pool = new ScheduledThreadPoolExecutor(
+ 8);
+
+ /**
+ * @throws java.lang.Exception
+ */
+ @Before
+ public void setUp() throws Exception {
+ adapter = new ConnectionAdapterStackImpl();
+ connectionConductor = new ConnectionConductorImpl(adapter);
+ connectionConductor.init();
+ eventPlan = new Stack<>();
+ adapter.setEventPlan(eventPlan);
+ adapter.setProceedTimeout(5000L);
+ adapter.checkListeners();
+ }
+
+ /**
+ * @throws java.lang.Exception
+ */
+ @After
+ public void tearDown() throws Exception {
+ if (libSimulation != null) {
+ libSimulation.join();
+ }
+ for (Exception problem : adapter.getOccuredExceptions()) {
+ LOG.error("during simulation on adapter side: "
+ + problem.getMessage());
+ }
+ Assert.assertEquals(0, adapter.getOccuredExceptions().size());
+ adapter = null;
+ if (LOG.isDebugEnabled()) {
+ if (eventPlan.size() > 0) {
+ LOG.debug("eventPlan size: " + eventPlan.size());
+ for (SwitchTestEvent event : eventPlan) {
+ LOG.debug(" # EVENT:: " + event.toString());
+ }
+ }
+ }
+ Assert.assertTrue("plan is not finished", eventPlan.isEmpty());
+ eventPlan = null;
+ }
+
+ /**
+ * Test method for
+ * {@link org.opendaylight.openflowplugin.openflow.md.core.ConnectionConductorImpl#onEchoRequestMessage(org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.EchoRequestMessage)}
+ * .
+ * @throws Exception
+ */
+ @Test
+ public void testOnEchoRequestMessage() throws Exception {
+ eventPlan.add(0, EventFactory.createDefaultNotificationEvent(42L,
+ EventFactory.DEFAULT_VERSION, new EchoRequestMessageBuilder()));
+ eventPlan.add(0,
+ EventFactory.createDefaultWaitForRpcEvent(42, "echoReply"));
+ executeNow();
+ }
+
+ /**
+ * Test of handshake, covering version negotiation and features .
+ * @throws Exception
+ */
+ @Test
+ public void testHandshake1() throws Exception {
+ eventPlan.add(0, EventFactory.createDefaultNotificationEvent(42L,
+ EventFactory.DEFAULT_VERSION, new HelloMessageBuilder()));
+ eventPlan.add(0,
+ EventFactory.createDefaultWaitForRpcEvent(42, "helloReply"));
+ eventPlan.add(0,
+ EventFactory.createDefaultWaitForRpcEvent(42, "getFeatures"));
+ GetFeaturesOutputBuilder getFeaturesOutputBuilder = new GetFeaturesOutputBuilder();
+ getFeaturesOutputBuilder.setDatapathId(new BigInteger("102030405060"));
+ getFeaturesOutputBuilder.setAuxiliaryId((short) 0);
+ getFeaturesOutputBuilder.setBuffers(4L);
+ getFeaturesOutputBuilder.setReserved(0L);
+ getFeaturesOutputBuilder.setTables((short) 2);
+ getFeaturesOutputBuilder.setCapabilities(84L);
+
+ eventPlan.add(0, EventFactory.createDefaultRpcResponseEvent(42,
+ EventFactory.DEFAULT_VERSION, getFeaturesOutputBuilder));
+
+ execute(true);
+ Assert.assertEquals(ConnectionConductor.CONDUCTOR_STATE.WORKING,
+ connectionConductor.getConductorState());
+ Assert.assertEquals((short) 0x04, connectionConductor.getVersion()
+ .shortValue());
+ }
+
+ /**
+ * Test of handshake, covering version negotiation and features .
+ * @throws Exception
+ */
+ @Test
+ public void testHandshake2() throws Exception {
+ eventPlan.add(0, EventFactory.createDefaultNotificationEvent(42L,
+ (short) 0x05, new HelloMessageBuilder()));
+ eventPlan.add(0,
+ EventFactory.createDefaultWaitForRpcEvent(42, "helloReply"));
+ eventPlan.add(0, EventFactory.createDefaultNotificationEvent(42L,
+ (short) 0x03, new HelloMessageBuilder()));
+ eventPlan.add(0,
+ EventFactory.createDefaultWaitForRpcEvent(42, "helloReply"));
+ eventPlan.add(0, EventFactory.createDefaultNotificationEvent(42L,
+ (short) 0x01, new HelloMessageBuilder()));
+ eventPlan.add(0,
+ EventFactory.createDefaultWaitForRpcEvent(42, "helloReply"));
+ eventPlan.add(0,
+ EventFactory.createDefaultWaitForRpcEvent(42, "getFeatures"));
+ GetFeaturesOutputBuilder getFeaturesOutputBuilder = new GetFeaturesOutputBuilder();
+ getFeaturesOutputBuilder.setDatapathId(new BigInteger("102030405060"));
+ getFeaturesOutputBuilder.setAuxiliaryId((short) 0);
+ getFeaturesOutputBuilder.setBuffers(4L);
+ getFeaturesOutputBuilder.setReserved(0L);
+ getFeaturesOutputBuilder.setTables((short) 2);
+ getFeaturesOutputBuilder.setCapabilities(84L);
+
+ eventPlan.add(0, EventFactory.createDefaultRpcResponseEvent(42,
+ EventFactory.DEFAULT_VERSION, getFeaturesOutputBuilder));
+
+ executeNow();
+ Assert.assertEquals(ConnectionConductor.CONDUCTOR_STATE.WORKING,
+ connectionConductor.getConductorState());
+ Assert.assertEquals((short) 0x01, connectionConductor.getVersion()
+ .shortValue());
+ }
+
+ /**
+ * Test of handshake, covering version negotiation and features .
+ * @throws Exception
+ */
+ @Test
+ public void testHandshake3() throws Exception {
+ eventPlan.add(0, EventFactory.createDefaultNotificationEvent(42L,
+ (short) 0x00, new HelloMessageBuilder()));
+
+ executeNow();
+ Assert.assertEquals(ConnectionConductor.CONDUCTOR_STATE.HANDSHAKING,
+ connectionConductor.getConductorState());
+ Assert.assertNull(connectionConductor.getVersion());
+ }
+
+ /**
+ * Test method for
+ * {@link org.opendaylight.openflowplugin.openflow.md.core.ConnectionConductorImpl#onExperimenterMessage(org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.ExperimenterMessage)}
+ * .
+ * @throws InterruptedException
+ */
+ @Test
+ public void testOnExperimenterMessage1() throws InterruptedException {
+ eventPlan.add(0,
+ EventFactory.createDefaultWaitForRpcEvent(42, "experimenter"));
+ ExperimenterMessageBuilder builder1 = new ExperimenterMessageBuilder();
+ builder1.setExperimenter(84L).setExpType(4L);
+ eventPlan.add(0, EventFactory.createDefaultNotificationEvent(42L,
+ EventFactory.DEFAULT_VERSION, builder1));
+ executeLater();
+
+ Runnable sendExperimenterCmd = new Runnable() {
+
+ @Override
+ public void run() {
+ ExperimenterInputBuilder builder2 = new ExperimenterInputBuilder();
+ builder2.setExperimenter(84L).setExpType(4L);
+ EventFactory.setupHeader(42L, builder2);
+ adapter.experimenter(builder2.build());
+ }
+ };
+ pool.schedule(sendExperimenterCmd,
+ ConnectionAdapterStackImpl.JOB_DELAY, TimeUnit.MILLISECONDS);
+ }
+
+ /**
+ * Test method for
+ * {@link org.opendaylight.openflowplugin.openflow.md.core.ConnectionConductorImpl#onExperimenterMessage(org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.ExperimenterMessage)}
+ * .
+ * @throws InterruptedException
+ */
+ @Test
+ public void testOnExperimenterMessage2() throws InterruptedException {
+ eventPlan.add(0,
+ EventFactory.createDefaultWaitForRpcEvent(42, "experimenter"));
+ ErrorMessageBuilder builder1 = new ErrorMessageBuilder();
+ builder1.setType(ErrorType.BADREQUEST).setCode(3)
+ .setData(new byte[] { 1, 2, 3 });
+
+ eventPlan.add(0, EventFactory.createDefaultNotificationEvent(42L,
+ EventFactory.DEFAULT_VERSION, builder1));
+
+ executeLater();
+
+ Runnable sendExperimenterCmd = new Runnable() {
+
+ @Override
+ public void run() {
+ ExperimenterInputBuilder builder2 = new ExperimenterInputBuilder();
+ builder2.setExperimenter(84L).setExpType(4L);
+ EventFactory.setupHeader(42L, builder2);
+ adapter.experimenter(builder2.build());
+ }
+ };
+ pool.schedule(sendExperimenterCmd,
+ ConnectionAdapterStackImpl.JOB_DELAY, TimeUnit.MILLISECONDS);
+ }
+
+ /**
+ * Test method for
+ * {@link org.opendaylight.openflowplugin.openflow.md.core.ConnectionConductorImpl#onFlowRemovedMessage(org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.FlowRemovedMessage)}
+ * .
+ */
+ @Test
+ public void testOnFlowRemovedMessage() {
+ // fail("Not yet implemented");
+ // TODO:: add test
+ }
+
+ /**
+ * Test method for
+ * {@link org.opendaylight.openflowplugin.openflow.md.core.ConnectionConductorImpl#onMultipartReplyMessage(org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.MultipartReplyMessage)}
+ * .
+ */
+ @Test
+ public void testOnMultipartReplyMessage() {
+ // fail("Not yet implemented");
+ // TODO:: add test
+ }
+
+ /**
+ * Test method for
+ * {@link org.opendaylight.openflowplugin.openflow.md.core.ConnectionConductorImpl#onMultipartRequestMessage(org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.MultipartRequestMessage)}
+ * .
+ */
+ @Test
+ public void testOnMultipartRequestMessage() {
+ // fail("Not yet implemented");
+ // TODO:: add test
+ }
+
+ /**
+ * Test method for
+ * {@link org.opendaylight.openflowplugin.openflow.md.core.ConnectionConductorImpl#onPacketInMessage(org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.PacketInMessage)}
+ * .
+ */
+ @Test
+ public void testOnPacketInMessage() {
+ // fail("Not yet implemented");
+ // TODO:: add test
+ }
+
+ /**
+ * Test method for
+ * {@link org.opendaylight.openflowplugin.openflow.md.core.ConnectionConductorImpl#onPortStatusMessage(org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.PortStatusMessage)}
+ * .
+ */
+ @Test
+ public void testOnPortStatusMessage() {
+ // fail("Not yet implemented");
+ // TODO:: add test
+ }
+
+ /**
+ * Test method for
+ * {@link org.opendaylight.openflowplugin.openflow.md.core.ConnectionConductorImpl#proposeVersion(short)}
+ * .
+ */
+ @Test
+ public void testProposeVersion() {
+ short[] remoteVer = new short[] { 0x05, 0x04, 0x03, 0x02, 0x01, 0x8f,
+ 0xff };
+ short[] expectedProposal = new short[] { 0x04, 0x04, 0x01, 0x01, 0x01,
+ 0x04, 0x04 };
+
+ for (int i = 0; i < remoteVer.length; i++) {
+ short actualProposal = connectionConductor
+ .proposeVersion(remoteVer[i]);
+ Assert.assertEquals(
+ String.format("proposing for version: %04x", remoteVer[i]),
+ expectedProposal[i], actualProposal);
+ }
+
+ try {
+ connectionConductor.proposeVersion((short) 0);
+ Assert.fail("there should be no proposition for this version");
+ } catch (Exception e) {
+ // expected
+ }
+ }
+
+ /**
+ * @throws InterruptedException
+ */
+ private void executeLater() throws InterruptedException {
+ execute(false);
+ }
+
+ /**
+ * @throws InterruptedException
+ */
+ private void executeNow() throws InterruptedException {
+ execute(true);
+ }
+
+ /**
+ * @throws InterruptedException
+ */
+ private void execute(boolean join) throws InterruptedException {
+ libSimulation = new Thread(adapter, "junit-adapter");
+ libSimulation.start();
+ if (join) {
+ libSimulation.join();
+ }
+ }
+
+}
--- /dev/null
+/**
+ * Copyright (c) 2013 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.openflowplugin.openflow.md.core.plan;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Stack;
+import java.util.concurrent.Callable;
+import java.util.concurrent.Future;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+
+import org.opendaylight.controller.sal.common.util.RpcErrors;
+import org.opendaylight.controller.sal.common.util.Rpcs;
+import org.opendaylight.openflowjava.protocol.api.connection.ConnectionAdapter;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.BarrierInput;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.BarrierOutput;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.EchoInput;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.EchoOutput;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.EchoReplyInput;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.EchoRequestMessage;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.ErrorMessage;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.ExperimenterInput;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.ExperimenterMessage;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.FlowModInput;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.FlowRemovedMessage;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.GetAsyncInput;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.GetAsyncOutput;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.GetConfigInput;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.GetConfigOutput;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.GetFeaturesInput;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.GetFeaturesOutput;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.GetQueueConfigInput;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.GetQueueConfigOutput;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.GroupModInput;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.HelloInput;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.HelloMessage;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.MeterModInput;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.MultipartReplyMessage;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.MultipartRequestMessage;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.OfHeader;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.OpenflowProtocolListener;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.PacketInMessage;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.PacketOutInput;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.PortModInput;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.PortStatusMessage;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.RoleRequestInput;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.RoleRequestOutput;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.SetAsyncInput;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.SetConfigInput;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.TableModInput;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.system.rev130927.DisconnectEvent;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.system.rev130927.SystemNotificationsListener;
+import org.opendaylight.yangtools.yang.binding.Notification;
+import org.opendaylight.yangtools.yang.common.RpcError;
+import org.opendaylight.yangtools.yang.common.RpcError.ErrorSeverity;
+import org.opendaylight.yangtools.yang.common.RpcError.ErrorType;
+import org.opendaylight.yangtools.yang.common.RpcResult;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.collect.Lists;
+import com.google.common.util.concurrent.SettableFuture;
+
+/**
+ * @author mirehak
+ */
+public class ConnectionAdapterStackImpl implements ConnectionAdapter, Runnable {
+
+ /** notify and rpc-response thread start default delay in [ms] */
+ public static final int JOB_DELAY = 50;
+
+ protected static final Logger LOG = LoggerFactory
+ .getLogger(ConnectionAdapterStackImpl.class);
+
+ protected Stack<? extends SwitchTestEvent> eventPlan;
+ protected OpenflowProtocolListener ofListener;
+ protected SystemNotificationsListener systemListener;
+
+ protected Map<Long, SettableFuture<?>> rpcResults = new HashMap<>();
+ private ScheduledThreadPoolExecutor pool = new ScheduledThreadPoolExecutor(
+ 8);
+ protected boolean planTouched = false;
+
+ private long proceedTimeout;
+
+ protected List<Exception> occuredExceptions = new ArrayList<>();
+
+ /**
+ * default ctor
+ */
+ public ConnectionAdapterStackImpl() {
+ // do nothing
+ }
+
+ @Override
+ public Future<RpcResult<BarrierOutput>> barrier(BarrierInput arg0) {
+ checkRpc(arg0, "barrier");
+ SettableFuture<RpcResult<BarrierOutput>> result = createAndRegisterRpcResult(arg0);
+ next();
+ return result;
+ }
+
+ @Override
+ public Future<RpcResult<EchoOutput>> echo(EchoInput arg0) {
+ checkRpc(arg0, "echo");
+ Future<RpcResult<EchoOutput>> result = createAndRegisterRpcResult(arg0);
+ next();
+ return result;
+ }
+
+ @Override
+ public Future<RpcResult<Void>> echoReply(EchoReplyInput arg0) {
+ checkRpc(arg0, "echoReply");
+ SettableFuture<RpcResult<Void>> result = createOneWayRpcResult();
+ next();
+ return result;
+ }
+
+ @Override
+ public Future<RpcResult<Void>> experimenter(ExperimenterInput arg0) {
+ checkRpc(arg0, "experimenter");
+ SettableFuture<RpcResult<Void>> result = createOneWayRpcResult();
+ next();
+ return result;
+ }
+
+ @Override
+ public Future<RpcResult<Void>> flowMod(FlowModInput arg0) {
+ checkRpc(arg0, "flowMod");
+ SettableFuture<RpcResult<Void>> result = createOneWayRpcResult();
+ next();
+ return result;
+ }
+
+ @Override
+ public Future<RpcResult<GetAsyncOutput>> getAsync(GetAsyncInput arg0) {
+ checkRpc(arg0, "echo");
+ Future<RpcResult<GetAsyncOutput>> result = createAndRegisterRpcResult(arg0);
+ next();
+ return result;
+ }
+
+ @Override
+ public Future<RpcResult<GetConfigOutput>> getConfig(GetConfigInput arg0) {
+ checkRpc(arg0, "echo");
+ Future<RpcResult<GetConfigOutput>> result = createAndRegisterRpcResult(arg0);
+ next();
+ return result;
+ }
+
+ @Override
+ public Future<RpcResult<GetFeaturesOutput>> getFeatures(
+ GetFeaturesInput arg0) {
+ checkRpc(arg0, "getFeatures");
+ Future<RpcResult<GetFeaturesOutput>> result = createAndRegisterRpcResult(arg0);
+ next();
+ return result;
+ }
+
+ @Override
+ public Future<RpcResult<GetQueueConfigOutput>> getQueueConfig(
+ GetQueueConfigInput arg0) {
+ checkRpc(arg0, "echo");
+ Future<RpcResult<GetQueueConfigOutput>> result = createAndRegisterRpcResult(arg0);
+ next();
+ return result;
+ }
+
+ @Override
+ public Future<RpcResult<Void>> groupMod(GroupModInput arg0) {
+ checkRpc(arg0, "groupMod");
+ SettableFuture<RpcResult<Void>> result = createOneWayRpcResult();
+ next();
+ return result;
+ }
+
+ @Override
+ public Future<RpcResult<Void>> hello(HelloInput arg0) {
+ checkRpc(arg0, "helloReply");
+ SettableFuture<RpcResult<Void>> result = createOneWayRpcResult();
+ next();
+ return result;
+ }
+
+ @Override
+ public Future<RpcResult<Void>> meterMod(MeterModInput arg0) {
+ checkRpc(arg0, "meterMod");
+ SettableFuture<RpcResult<Void>> result = createOneWayRpcResult();
+ next();
+ return result;
+ }
+
+ @Override
+ public Future<RpcResult<Void>> packetOut(PacketOutInput arg0) {
+ checkRpc(arg0, "packetOut");
+ SettableFuture<RpcResult<Void>> result = createOneWayRpcResult();
+ next();
+ return result;
+ }
+
+ @Override
+ public Future<RpcResult<Void>> portMod(PortModInput arg0) {
+ checkRpc(arg0, "portMod");
+ SettableFuture<RpcResult<Void>> result = createOneWayRpcResult();
+ next();
+ return result;
+ }
+
+ @Override
+ public Future<RpcResult<RoleRequestOutput>> roleRequest(
+ RoleRequestInput arg0) {
+ checkRpc(arg0, "echo");
+ Future<RpcResult<RoleRequestOutput>> result = createAndRegisterRpcResult(arg0);
+ next();
+ return result;
+ }
+
+ @Override
+ public Future<RpcResult<Void>> setAsync(SetAsyncInput arg0) {
+ checkRpc(arg0, "setAsync");
+ SettableFuture<RpcResult<Void>> result = createOneWayRpcResult();
+ next();
+ return result;
+ }
+
+ @Override
+ public Future<RpcResult<Void>> setConfig(SetConfigInput arg0) {
+ checkRpc(arg0, "setConfig");
+ SettableFuture<RpcResult<Void>> result = createOneWayRpcResult();
+ next();
+ return result;
+ }
+
+ @Override
+ public Future<RpcResult<Void>> tableMod(TableModInput arg0) {
+ checkRpc(arg0, "tableMod");
+ SettableFuture<RpcResult<Void>> result = createOneWayRpcResult();
+ next();
+ return result;
+ }
+
+ @Override
+ public Future<Boolean> disconnect() {
+ // TODO Auto-generated method stub
+ return null;
+ }
+
+ @Override
+ public boolean isAlive() {
+ // TODO make dynamic
+ return true;
+ }
+
+ @Override
+ public void setMessageListener(OpenflowProtocolListener ofListener) {
+ this.ofListener = ofListener;
+ }
+
+ @Override
+ public void checkListeners() {
+ if (ofListener == null || systemListener == null) {
+ occuredExceptions
+ .add(new IllegalStateException("missing listeners"));
+ }
+ }
+
+ @Override
+ public void setSystemListener(SystemNotificationsListener systemListener) {
+ this.systemListener = systemListener;
+ }
+
+ /**
+ * @param rpcInput
+ * rpc call parameter
+ * @param rpcName
+ * rpc yang name
+ */
+ private synchronized void checkRpc(OfHeader rpcInput, String rpcName) {
+ String msg = null;
+ LOG.debug("checking rpc: " + rpcName);
+ if (!(eventPlan.peek() instanceof SwitchTestWaitForRpcEvent)) {
+ msg = "expected [rpc], got [" + rpcInput.getClass().getSimpleName()
+ + "]";
+ } else {
+ SwitchTestWaitForRpcEvent switchTestRpcEvent = (SwitchTestWaitForRpcEvent) eventPlan
+ .peek();
+ if (!rpcName.equals(switchTestRpcEvent.getRpcName())) {
+ msg = "expected rpc name [" + switchTestRpcEvent.getRpcName()
+ + "], got [" + rpcName + "]";
+ } else if (!rpcInput.getXid().equals(switchTestRpcEvent.getXid())) {
+ msg = "expected xid [" + switchTestRpcEvent.getXid()
+ + "], got [" + rpcInput.getXid() + "]";
+ }
+ }
+
+ if (msg != null) {
+ LOG.debug("check .. FAILED: " + msg);
+ occuredExceptions.add(new IllegalArgumentException(msg));
+ }
+ LOG.debug("check .. OK");
+ }
+
+ /**
+ * discard current event, execute next, if possible
+ */
+ private synchronized void next() {
+ LOG.debug("STEPPING TO NEXT event in plan");
+ eventPlan.pop();
+ planTouched = true;
+ notify();
+ }
+
+ /**
+ * start or continue processing plan
+ */
+ private synchronized void proceed() {
+ boolean processed = false;
+ LOG.debug("proceeding plan item: " + eventPlan.peek());
+ if (eventPlan.peek() instanceof SwitchTestNotificationEvent) {
+ SwitchTestNotificationEvent notification = (SwitchTestNotificationEvent) eventPlan
+ .peek();
+ processNotification(notification);
+ processed = true;
+ } else if (eventPlan.peek() instanceof SwitchTestRcpResponseEvent) {
+ SwitchTestRcpResponseEvent rpcResponse = (SwitchTestRcpResponseEvent) eventPlan
+ .peek();
+ processRpcResponse(rpcResponse);
+ processed = true;
+ }
+
+ if (processed) {
+ next();
+ } else {
+ try {
+ LOG.debug("now waiting for HANDLER to act");
+ wait(proceedTimeout);
+ } catch (InterruptedException e) {
+ LOG.error(e.getMessage(), e);
+ }
+ }
+ }
+
+ @Override
+ public void run() {
+ LOG.debug("evenPlan STARTING ..");
+ while (!eventPlan.isEmpty()) {
+ planTouched = false;
+ proceed();
+ if (!planTouched) {
+ occuredExceptions.add(new IllegalStateException(
+ "eventPlan STALLED"));
+ }
+ }
+
+ try {
+ pool.awaitTermination(10 * JOB_DELAY, TimeUnit.MILLISECONDS);
+ } catch (InterruptedException e) {
+ LOG.error(e.getMessage(), e);
+ }
+ LOG.debug("eventPlan done");
+ }
+
+ /**
+ * @param notificationEvent
+ */
+ private void processNotification(
+ final SwitchTestNotificationEvent notificationEvent) {
+
+ Callable<Void> notifyCmd = new Callable<Void>() {
+ @Override
+ public Void call() throws Exception {
+ Notification notification = notificationEvent
+ .getPlannedNotification();
+ LOG.debug("notificating HANDLER: "
+ + notification.getClass().getSimpleName());
+
+ // system events
+ if (notification instanceof DisconnectEvent) {
+ systemListener
+ .onDisconnectEvent((DisconnectEvent) notification);
+ }
+ // of notifications
+ else if (notification instanceof EchoRequestMessage) {
+ ofListener
+ .onEchoRequestMessage((EchoRequestMessage) notification);
+ } else if (notification instanceof ErrorMessage) {
+ ofListener.onErrorMessage((ErrorMessage) notification);
+ } else if (notification instanceof ExperimenterMessage) {
+ ofListener
+ .onExperimenterMessage((ExperimenterMessage) notification);
+ } else if (notification instanceof FlowRemovedMessage) {
+ ofListener
+ .onFlowRemovedMessage((FlowRemovedMessage) notification);
+ } else if (notification instanceof HelloMessage) {
+ ofListener.onHelloMessage((HelloMessage) notification);
+ } else if (notification instanceof MultipartReplyMessage) {
+ ofListener
+ .onMultipartReplyMessage((MultipartReplyMessage) notification);
+ } else if (notification instanceof MultipartRequestMessage) {
+ ofListener
+ .onMultipartRequestMessage((MultipartRequestMessage) notification);
+ } else if (notification instanceof PacketInMessage) {
+ ofListener
+ .onPacketInMessage((PacketInMessage) notification);
+ } else if (notification instanceof PortStatusMessage) {
+ ofListener
+ .onPortStatusMessage((PortStatusMessage) notification);
+ }
+ // default
+ else {
+ occuredExceptions.add(new IllegalStateException(
+ "message listening not supported for type: "
+ + notification.getClass()));
+ }
+
+ LOG.debug("thread finished");
+ return null;
+ }
+
+ };
+
+ pool.schedule(notifyCmd, JOB_DELAY, TimeUnit.MILLISECONDS);
+ }
+
+ /**
+ * @param rpcResponse
+ */
+ private void processRpcResponse(final SwitchTestRcpResponseEvent rpcResponse) {
+ Callable<Void> notifyCmd = new Callable<Void>() {
+ @Override
+ public Void call() throws Exception {
+
+ OfHeader plannedRpcResponseValue = rpcResponse
+ .getPlannedRpcResponse();
+ LOG.debug("rpc-responding to HANDLER: " + rpcResponse.getXid());
+
+ @SuppressWarnings("unchecked")
+ SettableFuture<RpcResult<?>> response = (SettableFuture<RpcResult<?>>) rpcResults
+ .get(rpcResponse.getXid());
+
+ if (response != null) {
+ boolean successful = plannedRpcResponseValue != null;
+ Collection<RpcError> errors;
+ if (successful) {
+ errors = Collections.emptyList();
+ } else {
+ errors = Lists
+ .newArrayList(RpcErrors
+ .getRpcError(
+ "unit",
+ "unit",
+ "not requested",
+ ErrorSeverity.ERROR,
+ "planned response to RPC.id = "
+ + rpcResponse.getXid(),
+ ErrorType.RPC,
+ new Exception(
+ "rpc response failed (planned behavior)")));
+ }
+ RpcResult<?> result = Rpcs.getRpcResult(successful,
+ plannedRpcResponseValue, errors);
+ response.set(result);
+ } else {
+ String msg = "RpcResponse not expected: xid="
+ + rpcResponse.getXid()
+ + ", "
+ + plannedRpcResponseValue.getClass()
+ .getSimpleName();
+ LOG.error(msg);
+ occuredExceptions.add(new IllegalStateException(msg));
+ }
+
+ LOG.debug("thread finished");
+ return null;
+ }
+ };
+
+ pool.schedule(notifyCmd, JOB_DELAY, TimeUnit.MILLISECONDS);
+ }
+
+ /**
+ * @param arg0
+ * rpc call content
+ * @return rpc future result
+ */
+ private <IN extends OfHeader, OUT extends OfHeader> SettableFuture<RpcResult<OUT>> createAndRegisterRpcResult(
+ IN arg0) {
+ SettableFuture<RpcResult<OUT>> result = SettableFuture.create();
+ rpcResults.put(arg0.getXid(), result);
+ return result;
+ }
+
+ /**
+ * @return rpc future result
+ */
+ private static SettableFuture<RpcResult<Void>> createOneWayRpcResult() {
+ SettableFuture<RpcResult<Void>> result = SettableFuture.create();
+ result.set(null);
+ return result;
+ }
+
+ /**
+ * @param eventPlan
+ * the eventPlan to set
+ */
+ public void setEventPlan(Stack<? extends SwitchTestEvent> eventPlan) {
+ this.eventPlan = eventPlan;
+ }
+
+ /**
+ * @param proceedTimeout
+ * max timeout for processing one planned event (in [ms])
+ */
+ public void setProceedTimeout(long proceedTimeout) {
+ this.proceedTimeout = proceedTimeout;
+ }
+
+ /**
+ * @return the occuredExceptions
+ */
+ public List<Exception> getOccuredExceptions() {
+ return occuredExceptions;
+ }
+}
--- /dev/null
+/**
+ * Copyright (c) 2013 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.openflowplugin.openflow.md.core.plan;
+
+import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.OfHeader;
+import org.opendaylight.yangtools.yang.binding.Notification;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * @author mirehak
+ */
+public abstract class EventFactory {
+
+ private static final Logger LOG = LoggerFactory
+ .getLogger(EventFactory.class);
+
+ /** default protocol version */
+ public static final Short DEFAULT_VERSION = 4;
+
+ /**
+ * @param xid
+ * transaction id
+ * @param version
+ * version id
+ * @param builder
+ * message builder instance
+ * @return default notification event
+ */
+ public static SwitchTestNotificationEvent createDefaultNotificationEvent(
+ long xid, short version, Object builder) {
+ SwitchTestNotificationEventImpl event = new SwitchTestNotificationEventImpl();
+ Notification notification = build(setupHeader(xid, version, builder));
+ event.setNotification(notification);
+ return event;
+ }
+
+ /**
+ * @param xid
+ * transaction id
+ * @param version
+ * version id
+ * @param builder
+ * rpc response builder instance
+ * @return default notification event
+ */
+ public static SwitchTestRcpResponseEvent createDefaultRpcResponseEvent(
+ long xid, short version, Object builder) {
+ SwitchTestRcpResponseEventImpl event = new SwitchTestRcpResponseEventImpl();
+ OfHeader rpcResponse = build(setupHeader(xid, version, builder));
+ event.setResponse(rpcResponse);
+ event.setXid(xid);
+ return event;
+ }
+
+ /**
+ * @param setupHeader
+ * @return
+ */
+ @SuppressWarnings("unchecked")
+ private static <E> E build(Object builder) {
+ E notification = null;
+ try {
+ Class<?> builderClazz = builder.getClass();
+ notification = (E) builderClazz.getMethod("build", new Class[0])
+ .invoke(builder, new Object[0]);
+ } catch (Exception e) {
+ LOG.error(e.getMessage(), e);
+ }
+ return notification;
+ }
+
+ /**
+ * @param xid
+ * transaction id
+ * @param rpcName
+ * name of rpc method
+ * @return default notification event
+ */
+ public static SwitchTestWaitForRpcEvent createDefaultWaitForRpcEvent(
+ long xid, String rpcName) {
+ SwitchTestWaitForRpcEventImpl event = new SwitchTestWaitForRpcEventImpl();
+ event.setRpcName(rpcName);
+ event.setXid(xid);
+ return event;
+ }
+
+ /**
+ * @param xid
+ * @param version
+ * @param builder
+ * @return original builder
+ */
+ public static <E> E setupHeader(long xid, short version, E builder) {
+ try {
+ Class<?> builderClazz = builder.getClass();
+ builderClazz.getMethod("setXid", Long.class).invoke(builder, xid);
+ builderClazz.getMethod("setVersion", Short.class).invoke(builder,
+ version);
+ } catch (Exception e) {
+ LOG.error(e.getMessage(), e);
+ }
+
+ return builder;
+ }
+
+ /**
+ * use {@link #DEFAULT_VERSION}
+ * @param xid
+ * @param builder
+ * @return original builder
+ */
+ public static <E> E setupHeader(long xid, E builder) {
+ return setupHeader(xid, DEFAULT_VERSION, builder);
+ }
+
+}
--- /dev/null
+/**
+ * Copyright (c) 2013 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.openflowplugin.openflow.md.core.plan;
+
+/**
+ * @author mirehak
+ */
+public interface SwitchTestEvent {
+ // nothing, unifying iface
+}
--- /dev/null
+/**
+ * Copyright (c) 2013 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.openflowplugin.openflow.md.core.plan;
+
+import org.opendaylight.yangtools.yang.binding.Notification;
+
+/**
+ * @author mirehak
+ */
+public interface SwitchTestNotificationEvent extends SwitchTestEvent {
+
+ /**
+ * @return next switch notification/rpc response
+ */
+ Notification getPlannedNotification();
+}
--- /dev/null
+/**
+ * Copyright (c) 2013 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.openflowplugin.openflow.md.core.plan;
+
+import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.OfHeader;
+import org.opendaylight.yangtools.yang.binding.Notification;
+
+/**
+ * @author mirehak
+ */
+public class SwitchTestNotificationEventImpl implements
+ SwitchTestNotificationEvent {
+
+ private Notification notification;
+
+ /**
+ * @param notification
+ * the notification to set
+ */
+ public void setNotification(Notification notification) {
+ this.notification = notification;
+ }
+
+ @Override
+ public Notification getPlannedNotification() {
+ return notification;
+ }
+
+ @Override
+ public String toString() {
+ StringBuffer sb = new StringBuffer("SwitchTestNotificationEventImpl [notification=");
+ if (notification instanceof OfHeader) {
+ OfHeader header = (OfHeader) notification;
+ sb.append("version:").append(header.getVersion()).append(";")
+ .append("xid:").append(header.getXid()).append(";")
+ .append("type:").append(header.getClass().getSimpleName());
+ } else {
+ sb.append(notification.toString());
+ }
+ sb.append("]");
+ return sb.toString();
+ }
+}
--- /dev/null
+/**
+ * Copyright (c) 2013 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.openflowplugin.openflow.md.core.plan;
+
+import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.OfHeader;
+
+/**
+ * @author mirehak
+ */
+public interface SwitchTestRcpResponseEvent extends SwitchTestEvent {
+
+ /**
+ * @return switch notification/rpc response
+ */
+ public OfHeader getPlannedRpcResponse();
+
+ /**
+ * @return response transaction id
+ */
+ public long getXid();
+}
--- /dev/null
+/**
+ * Copyright (c) 2013 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.openflowplugin.openflow.md.core.plan;
+
+import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.OfHeader;
+
+/**
+ * @author mirehak
+ */
+public class SwitchTestRcpResponseEventImpl implements
+ SwitchTestRcpResponseEvent {
+
+ private OfHeader response;
+ private long xid;
+
+ /**
+ * @param response
+ * the response to set
+ */
+ public void setResponse(OfHeader response) {
+ this.response = response;
+ }
+
+ @Override
+ public OfHeader getPlannedRpcResponse() {
+ return response;
+ }
+
+ @Override
+ public long getXid() {
+ return xid;
+ }
+
+ /**
+ * @param xid
+ * the xid to set
+ */
+ public void setXid(long xid) {
+ this.xid = xid;
+ }
+
+ @Override
+ public String toString() {
+ return "SwitchTestRcpResponseEventImpl [response=" + response.getClass().getSimpleName()
+ + ", xid=" + xid + "]";
+ }
+}
--- /dev/null
+/**
+ * Copyright (c) 2013 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.openflowplugin.openflow.md.core.plan;
+
+/**
+ * @author mirehak
+ */
+public interface SwitchTestWaitForRpcEvent extends SwitchTestEvent {
+
+ /**
+ * @return expected name of RPC
+ */
+ public String getRpcName();
+
+ /**
+ * @return expected transaction ID
+ */
+ public long getXid();
+
+}
--- /dev/null
+/**
+ * Copyright (c) 2013 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.openflowplugin.openflow.md.core.plan;
+
+/**
+ * @author mirehak
+ */
+public class SwitchTestWaitForRpcEventImpl implements SwitchTestWaitForRpcEvent {
+
+ private String rpcName;
+ private long xid;
+
+ @Override
+ public String getRpcName() {
+ return rpcName;
+ }
+
+ @Override
+ public long getXid() {
+ return xid;
+ }
+
+ /**
+ * @param rpcName
+ * the rpcName to set
+ */
+ public void setRpcName(String rpcName) {
+ this.rpcName = rpcName;
+ }
+
+ /**
+ * @param xid
+ * the xid to set
+ */
+ public void setXid(long xid) {
+ this.xid = xid;
+ }
+
+ @Override
+ public String toString() {
+ return "SwitchTestWaitForRpcEventImpl [rpcName=" + rpcName + ", xid=" + xid
+ + "]";
+ }
+}