*/
public void checkListeners();
+ /**
+ * notify listener about connection ready-to-use event
+ */
+ public void fireConnectionReadyNotification();
+
+ /**
+ * set listener for connection became ready-to-use event
+ * @param connectionReadyListener
+ */
+ public void setConnectionReadyListener(ConnectionReadyListener connectionReadyListener);
+
}
--- /dev/null
+/**
+ * Copyright (c) 2013 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.openflowjava.protocol.api.connection;
+
+/**
+ * @author mirehak
+ *
+ */
+public interface ConnectionReadyListener {
+
+ /**
+ * fired when connection becomes ready-to-use
+ */
+ public void onConnectionReady();
+}
* @param ch
* @return connection adapter tcp-implementation
*/
- public static ConnectionFacade createConnectionAdapter(SocketChannel ch) {
+ public static ConnectionFacade createConnectionFacade(SocketChannel ch) {
ConnectionAdapterImpl connectionAdapter = new ConnectionAdapterImpl();
connectionAdapter.setChannel(ch);
return connectionAdapter;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import org.opendaylight.controller.sal.common.util.RpcErrors;
import org.opendaylight.controller.sal.common.util.Rpcs;
+import org.opendaylight.openflowjava.protocol.api.connection.ConnectionReadyListener;
import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.BarrierInput;
import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.BarrierOutput;
import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.EchoInput;
import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.system.rev130927.DisconnectEvent;
import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.system.rev130927.SwitchIdleEvent;
import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.system.rev130927.SystemNotificationsListener;
+import org.opendaylight.yangtools.yang.binding.DataContainer;
import org.opendaylight.yangtools.yang.binding.DataObject;
import org.opendaylight.yangtools.yang.binding.Notification;
import org.opendaylight.yangtools.yang.common.RpcError;
protected Cache<RpcResponseKey, SettableFuture<?>> responseCache;
private SystemNotificationsListener systemListener;
private boolean disconnectOccured = false;
- private ExecutorService threadPool;
-
+
+ protected ConnectionReadyListener connectionReadyListener;
+
/**
* default ctor
*/
.concurrencyLevel(1)
.expireAfterWrite(RPC_RESPONSE_EXPIRATION, TimeUnit.MINUTES)
.removalListener(new ResponseRemovalListener()).build();
- threadPool = Executors.newCachedThreadPool();
LOG.info("ConnectionAdapter created");
}
}
@Override
- public void consume(final DataObject message) {
- threadPool.execute(new Runnable() {
- @Override
- public void run() {
- consumeIntern(message);
- }
- });
- }
-
- protected void consumeIntern(final DataObject message) {
- LOG.debug("Consume msg");
+ public void consume(DataObject message) {
+ LOG.debug("ConsumeIntern msg");
if (disconnectOccured ) {
return;
}
* @param errorMessage
* @param input
* @param responseClazz
- * @param key TODO
- * @param future TODO
+ * @param key of rpcResponse
* @return
*/
private <IN extends OfHeader, OUT extends OfHeader> SettableFuture<RpcResult<OUT>> handleRpcChannelFutureWithResponse(
if (messageListener == null) {
buffer.append("MessageListener ");
}
+ if (connectionReadyListener == null) {
+ buffer.append("ConnectionReadyListener ");
+ }
if (buffer.length() > 0) {
throw new IllegalStateException("Missing listeners: " + buffer.toString());
}
}
-
- static class ResponseRemovalListener implements RemovalListener<RpcResponseKey, SettableFuture<?>> {
+ static class ResponseRemovalListener implements RemovalListener<RpcResponseKey, SettableFuture<?>> {
@Override
public void onRemoval(
RemovalNotification<RpcResponseKey, SettableFuture<?>> notification) {
}
}
+ /**
+ * Class is used ONLY for exiting msgQueue processing thread
+ * @author michal.polkorab
+ */
+ static class ExitingDataObject implements DataObject {
+ @Override
+ public Class<? extends DataContainer> getImplementedInterface() {
+ return null;
+ }
+ }
+
+ @Override
+ public void fireConnectionReadyNotification() {
+ new Thread(new Runnable() {
+ @Override
+ public void run() {
+ connectionReadyListener.onConnectionReady();
+ }
+ }).start();
+ }
+
+
+ @Override
+ public void setConnectionReadyListener(
+ ConnectionReadyListener connectionReadyListener) {
+ this.connectionReadyListener = connectionReadyListener;
+ }
+
}
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+/**
+ *
+ * @author michal.polkorab
+ *
+ */
public class IdleHandler extends IdleStateHandler{
-
+
private static final Logger LOGGER = LoggerFactory.getLogger(IdleHandler.class);
+ /**
+ *
+ * @param readerIdleTime
+ * @param writerIdleTime
+ * @param allIdleTime
+ * @param unit
+ */
public IdleHandler(long readerIdleTime, long writerIdleTime,
long allIdleTime, TimeUnit unit) {
super(readerIdleTime, writerIdleTime, allIdleTime, unit);
}
LOGGER.info("Incoming connection accepted - building pipeline");
allChannels.add(ch);
- ConnectionFacade connectionAdapter = null;
- connectionAdapter = ConnectionAdapterFactory.createConnectionAdapter(ch);
+ ConnectionFacade connectionFacade = null;
+ connectionFacade = ConnectionAdapterFactory.createConnectionFacade(ch);
try {
LOGGER.debug("calling plugin: "+switchConnectionHandler);
- switchConnectionHandler.onSwitchConnected(connectionAdapter);
- connectionAdapter.checkListeners();
+ switchConnectionHandler.onSwitchConnected(connectionFacade);
+ connectionFacade.checkListeners();
+
+ TlsDetector tlsDetector = new TlsDetector();
+ tlsDetector.setConnectionFacade(connectionFacade);
+
ch.pipeline().addLast(COMPONENT_NAMES.IDLE_HANDLER.name(), new IdleHandler(switchIdleTimeout, 0, 0, TimeUnit.MILLISECONDS));
- ch.pipeline().addLast(COMPONENT_NAMES.TLS_DETECTOR.name(), new TlsDetector());
+ ch.pipeline().addLast(COMPONENT_NAMES.TLS_DETECTOR.name(), tlsDetector);
ch.pipeline().addLast(COMPONENT_NAMES.OF_FRAME_DECODER.name(), new OFFrameDecoder());
ch.pipeline().addLast(COMPONENT_NAMES.OF_VERSION_DETECTOR.name(), new OFVersionDetector());
ch.pipeline().addLast(COMPONENT_NAMES.OF_DECODER.name(), new OF13Decoder());
ch.pipeline().addLast(COMPONENT_NAMES.OF_ENCODER.name(), new OF13Encoder());
- ch.pipeline().addLast(COMPONENT_NAMES.DELEGATING_INBOUND_HANDLER.name(), new DelegatingInboundHandler(connectionAdapter));
+ ch.pipeline().addLast(COMPONENT_NAMES.DELEGATING_INBOUND_HANDLER.name(), new DelegatingInboundHandler(connectionFacade));
} catch (Exception e) {
LOGGER.error(e.getMessage(), e);
ch.close();
public void operationComplete(
io.netty.util.concurrent.Future<Object> downResult) throws Exception {
result.set(downResult.isSuccess());
- result.setException(downResult.cause());
+ if (downResult.cause() != null) {
+ result.setException(downResult.cause());
+ }
}
});
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.opendaylight.openflowjava.protocol.impl.connection.ConnectionFacade;
import org.opendaylight.openflowjava.protocol.impl.core.TcpHandler.COMPONENT_NAMES;
import org.opendaylight.openflowjava.protocol.impl.util.ByteBufUtils;
private boolean detectSsl;
private static final Logger LOGGER = LoggerFactory
.getLogger(TlsDetector.class);
+
+ private ConnectionFacade connectionFacade;
/**
* Constructor of class
} else {
LOGGER.info("Connection is not encrypted");
}
+
+ if (connectionFacade != null) {
+ LOGGER.debug("Firing onConnectionReady notification");
+ connectionFacade.fireConnectionReadyNotification();
+ }
+
ctx.pipeline().remove(COMPONENT_NAMES.TLS_DETECTOR.name());
}
+
+ /**
+ * @param connectionFacade the connectionFacade to set
+ */
+ public void setConnectionFacade(ConnectionFacade connectionFacade) {
+ this.connectionFacade = connectionFacade;
+ }
}
public void testHandshakeAndEcho() throws Exception {\r
int amountOfCLients = 1;\r
Stack<ClientEvent> scenario = ScenarioFactory.createHandshakeScenario();\r
- scenario.add(0, new SleepEvent(1500));\r
+ scenario.add(0, new SleepEvent(100));\r
scenario.add(0, new SendEvent(ByteBufUtils.hexStringToBytes("04 02 00 08 00 00 00 04")));\r
- scenario.add(0, new SleepEvent(1500));\r
+ scenario.add(0, new SleepEvent(100));\r
scenario.add(0, new WaitForMessageEvent(ByteBufUtils.hexStringToBytes("04 03 00 08 00 00 00 04")));\r
ScenarioHandler handler = new ScenarioHandler(scenario);\r
List<SimpleClient> clients = createAndStartClient(amountOfCLients, handler);\r
import java.util.concurrent.TimeoutException;\r
\r
import org.opendaylight.openflowjava.protocol.api.connection.ConnectionAdapter;\r
+import org.opendaylight.openflowjava.protocol.api.connection.ConnectionReadyListener;\r
import org.opendaylight.openflowjava.protocol.api.connection.SwitchConnectionHandler;\r
import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.EchoReplyInput;\r
import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.EchoReplyInputBuilder;\r
* @author michal.polkorab\r
*\r
*/\r
-public class MockPlugin implements OpenflowProtocolListener, SwitchConnectionHandler, SystemNotificationsListener {\r
+public class MockPlugin implements OpenflowProtocolListener, SwitchConnectionHandler, \r
+ SystemNotificationsListener, ConnectionReadyListener {\r
\r
- private static final Logger LOGGER = LoggerFactory.getLogger(MockPlugin.class);\r
- private ConnectionAdapter adapter;\r
+ protected static final Logger LOGGER = LoggerFactory.getLogger(MockPlugin.class);\r
+ protected ConnectionAdapter adapter;\r
private SettableFuture<Void> finishedFuture;\r
private int idleCounter = 0;\r
\r
this.adapter = connection;\r
connection.setMessageListener(this);\r
connection.setSystemListener(this);\r
+ connection.setConnectionReadyListener(this);\r
}\r
\r
@Override\r
}\r
\r
@Override\r
- public void onEchoRequestMessage(EchoRequestMessage notification) {\r
- LOGGER.debug("EchoRequest message received");\r
- LOGGER.debug("Building EchoReplyInput");\r
- EchoReplyInputBuilder replyBuilder = new EchoReplyInputBuilder();\r
- replyBuilder.setVersion((short) 4);\r
- replyBuilder.setXid(notification.getXid());\r
- EchoReplyInput echoReplyInput = replyBuilder.build();\r
- LOGGER.debug("EchoReplyInput built");\r
- LOGGER.debug("Going to send EchoReplyInput");\r
- adapter.echoReply(echoReplyInput);\r
- LOGGER.debug("EchoReplyInput sent");\r
- LOGGER.debug("adapter: "+adapter);\r
+ public void onEchoRequestMessage(final EchoRequestMessage notification) {\r
+ new Thread(new Runnable() {\r
+ @Override\r
+ public void run() {\r
+ LOGGER.debug("EchoRequest message received");\r
+ EchoReplyInputBuilder replyBuilder = new EchoReplyInputBuilder();\r
+ replyBuilder.setVersion((short) 4);\r
+ replyBuilder.setXid(notification.getXid());\r
+ EchoReplyInput echoReplyInput = replyBuilder.build();\r
+ adapter.echoReply(echoReplyInput);\r
+ LOGGER.debug("EchoReplyInput sent");\r
+ LOGGER.debug("adapter: "+adapter);\r
+ }\r
+ }).start();\r
}\r
\r
@Override\r
\r
@Override\r
public void onHelloMessage(HelloMessage notification) {\r
- LOGGER.debug("adapter: "+adapter);\r
- LOGGER.debug("Hello message received");\r
- HelloInputBuilder hib = new HelloInputBuilder();\r
+ new Thread(new Runnable() {\r
+ @Override\r
+ public void run() {\r
+ LOGGER.debug("Hello message received");\r
+ HelloInputBuilder hib = new HelloInputBuilder();\r
+ hib.setVersion((short) 4);\r
+ hib.setXid(2L);\r
+ HelloInput hi = hib.build();\r
+ adapter.hello(hi);\r
+ LOGGER.debug("hello msg sent");\r
+ new Thread(new Runnable() {\r
+ @Override\r
+ public void run() {\r
+ sendFeaturesReply();\r
+ }\r
+ }).start();\r
+ LOGGER.debug("adapter: "+adapter);\r
+ }\r
+ }).start();\r
+ }\r
+ \r
+ protected void sendFeaturesReply() {\r
GetFeaturesInputBuilder featuresBuilder = new GetFeaturesInputBuilder();\r
- hib.setVersion((short) 4);\r
- hib.setXid(2L);\r
featuresBuilder.setVersion((short) 4);\r
featuresBuilder.setXid(3L);\r
- HelloInput hi = hib.build();\r
- adapter.hello(hi);\r
- LOGGER.debug("hello msg sent");\r
GetFeaturesInput featuresInput = featuresBuilder.build();\r
try {\r
LOGGER.debug("Going to send featuresRequest");\r
}\r
} catch (InterruptedException | ExecutionException | TimeoutException e) {\r
LOGGER.error(e.getMessage(), e);\r
- // TODO - Collect exceptions and check for existence in tests\r
}\r
LOGGER.info("After FeaturesReply message");\r
- LOGGER.debug("adapter: "+adapter);\r
}\r
\r
protected void shutdown() {\r
Future<Boolean> disconnect = adapter.disconnect();\r
disconnect.get();\r
LOGGER.info("Disconnected");\r
- }\r
+ } \r
} catch (Exception e) {\r
LOGGER.error(e.getMessage(), e);\r
}\r
public int getIdleCounter() {\r
return idleCounter;\r
}\r
+ \r
+ @Override\r
+ public void onConnectionReady() {\r
+ LOGGER.debug("connection ready notification arrived");\r
+ }\r
\r
\r
}\r
@Override
public boolean eventExecuted() {
LOGGER.debug("sending message");
- Thread thread = new Thread(new Runnable() {
- @Override
- public void run() {
- LOGGER.debug("start of run");
- ByteBuf buffer = ctx.alloc().buffer();
- buffer.writeBytes(msgToSend);
- ctx.writeAndFlush(buffer);
- LOGGER.debug(">> " + ByteBufUtils.bytesToHexString(msgToSend));
- }
- });
- thread.start();
- try {
- thread.join();
- } catch (InterruptedException e) {
- LOGGER.error(e.getMessage(), e);
- }
+ LOGGER.debug("start of run");
+ ByteBuf buffer = ctx.alloc().buffer();
+ buffer.writeBytes(msgToSend);
+ ctx.writeAndFlush(buffer);
+ LOGGER.debug(">> " + ByteBufUtils.bytesToHexString(msgToSend));
LOGGER.debug("message sent");
return true;
}
public class SimpleClientHandler extends ChannelInboundHandlerAdapter {\r
\r
protected static final Logger LOGGER = LoggerFactory.getLogger(SimpleClientHandler.class);\r
+ private static final int OFP_HEADER_LENGTH = 8;\r
private SettableFuture<Boolean> isOnlineFuture;\r
protected ScenarioHandler scenarioHandler;\r
\r
if (LOGGER.isDebugEnabled()) {\r
LOGGER.debug("<< " + ByteBufUtils.byteBufToHexString(bb));\r
}\r
- byte[] message = new byte[8];\r
+ \r
+ if (bb.readableBytes() < OFP_HEADER_LENGTH) {\r
+ LOGGER.debug("too few bytes received: "+bb.readableBytes()+" - wait for next data portion");\r
+ return;\r
+ }\r
+ int msgSize = bb.getUnsignedShort(2);\r
+ byte[] message = new byte[msgSize];\r
bb.readBytes(message);\r
scenarioHandler.addOfMsg(message);\r
- skipMsg(bb);\r
LOGGER.info("end of read");\r
}\r
\r
\r
}\r
\r
- private static void skipMsg(ByteBuf bb) {\r
- if (bb.readableBytes() > 0) {\r
- bb.skipBytes(bb.getShort(2));\r
- }\r
- }\r
-\r
/**\r
* @param scenarioHandler handler of scenario events\r
*/\r