description "Initial model";\r
}\r
\r
- //augment "/ofproto:hello-message/ofproto:hello" {\r
- // leaf uid {\r
- // type uint16;\r
- // }\r
- //} \r
+ augment "/ofproto:hello-message/ofproto:elements" {\r
+ leaf uid {\r
+ type uint16;\r
+ }\r
+ }\r
}
\ No newline at end of file
import io.netty.util.concurrent.GenericFutureListener;
import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
public void onRemoval(
RemovalNotification<RpcResponseKey, SettableFuture<?>> notification) {
LOG.warn("rpc response discarded: "+notification.getKey());
- notification.getValue().cancel(true);
+ SettableFuture<?> future = notification.getValue();
+ if (!future.isDone()) {
+ future.cancel(true);
+ }
}
}).build();
LOG.info("ConnectionAdapter created");
@Override
public void consume(DataObject message) {
+ LOG.debug("Consume msg");
if (disconnectOccured ) {
return;
}
}
} else {
if (message instanceof OfHeader) {
+ LOG.debug("OFheader msg received");
RpcResponseKey key = createRpcResponseKey((OfHeader) message);
SettableFuture<RpcResult<?>> rpcFuture = findRpcResponse(key);
if (rpcFuture != null) {
- rpcFuture.set(Rpcs.getRpcResult(true, message, null));
+ LOG.debug("corresponding rpcFuture found");
+ List<RpcError> errors = Collections.emptyList();
+ rpcFuture.set(Rpcs.getRpcResult(true, message, errors));
responseCache.invalidate(key);
} else {
LOG.warn("received unexpected rpc response: "+key);
*/
private SettableFuture<RpcResult<Void>> sendToSwitchFuture(
DataObject input, final String failureInfo) {
+ LOG.debug("going to flush");
ChannelFuture resultFuture = channel.writeAndFlush(input);
+ LOG.debug("flushed");
ErrorSeverity errorSeverity = ErrorSeverity.ERROR;
String errorMessage = "check switch connection";
*/
private <IN extends OfHeader, OUT extends OfHeader> SettableFuture<RpcResult<OUT>> sendToSwitchExpectRpcResultFuture(
IN input, Class<OUT> responseClazz, final String failureInfo) {
+ LOG.debug("going to flush");
+ SettableFuture<RpcResult<OUT>> rpcResult = SettableFuture.create();
+ RpcResponseKey key = new RpcResponseKey(input.getXid(), responseClazz);
+ responseCache.put(key, rpcResult);
ChannelFuture resultFuture = channel.writeAndFlush(input);
+ LOG.debug("flushed");
ErrorSeverity errorSeverity = ErrorSeverity.ERROR;
String errorMessage = "check switch connection";
+
return handleRpcChannelFutureWithResponse(resultFuture, failureInfo, errorSeverity,
- errorMessage, input, responseClazz);
+ errorMessage, input, responseClazz, rpcResult, key);
}
/**
final ErrorSeverity errorSeverity, final String errorMessage) {
final SettableFuture<RpcResult<Void>> rpcResult = SettableFuture.create();
-
+ LOG.debug("handlerpcchannelfuture");
resultFuture.addListener(new GenericFutureListener<io.netty.util.concurrent.Future<? super Void>>() {
@Override
public void operationComplete(
io.netty.util.concurrent.Future<? super Void> future)
throws Exception {
- Collection<RpcError> errors = null;
+ LOG.debug("operation complete");
+ Collection<RpcError> errors = Collections.emptyList();
if (future.cause() != null) {
+ LOG.debug("future.cause != null");
RpcError rpcError = buildRpcError(failureInfo,
errorSeverity, errorMessage, future.cause());
errors = Lists.newArrayList(rpcError);
}
/**
- * @param input
- * @param responseClazz
* @param resultFuture
* @param failureInfo
* @param errorSeverity
* @param errorMessage
+ * @param input
+ * @param responseClazz
+ * @param key TODO
+ * @param future TODO
* @return
*/
private <IN extends OfHeader, OUT extends OfHeader> SettableFuture<RpcResult<OUT>> handleRpcChannelFutureWithResponse(
ChannelFuture resultFuture, final String failureInfo,
final ErrorSeverity errorSeverity, final String errorMessage,
- final IN input, Class<OUT> responseClazz) {
- final SettableFuture<RpcResult<OUT>> rpcResult = SettableFuture.create();
+ final IN input, Class<OUT> responseClazz, final SettableFuture<RpcResult<OUT>> rpcResult, final RpcResponseKey key) {
+ LOG.debug("handleRpcchanfuture with response");
resultFuture.addListener(new GenericFutureListener<io.netty.util.concurrent.Future<? super Void>>() {
io.netty.util.concurrent.Future<? super Void> future)
throws Exception {
+ LOG.debug("operation complete");
+ Collection<RpcError> errors = Collections.emptyList();
if (future.cause() != null) {
- Collection<RpcError> errors = null;
+ LOG.debug("ChannelFuture.cause != null");
RpcError rpcError = buildRpcError(failureInfo,
errorSeverity, errorMessage, future.cause());
errors = Lists.newArrayList(rpcError);
(OUT) null,
errors)
);
+ responseCache.invalidate(key);
} else {
- RpcResponseKey key = new RpcResponseKey(input.getXid(), input.getClass().toString());
- if (responseCache.getIfPresent(key) != null) {
- responseCache.invalidate(key);
+ LOG.debug("ChannelFuture.cause == null");
+ if (responseCache.getIfPresent(key) == null) {
+ LOG.debug("responcache: key wasn't present");
}
- responseCache.put(key, rpcResult);
}
}
});
* @return
*/
private static RpcResponseKey createRpcResponseKey(OfHeader message) {
- return new RpcResponseKey(message.getXid(), message.getClass().toString());
+ return new RpcResponseKey(message.getXid(), message.getClass());
}
/**
package org.opendaylight.openflowjava.protocol.impl.connection;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.OfHeader;
+
/**
* @author mirehak
public class RpcResponseKey {
private final long xid;
- private final String outputClazz;
+ private final Class<? extends OfHeader> outputClazz;
/**
* @param xid
* @param outputClazz
*/
- public RpcResponseKey(long xid, String outputClazz) {
+ public RpcResponseKey(long xid, Class<? extends OfHeader> outputClazz) {
super();
this.xid = xid;
this.outputClazz = outputClazz;
/**
* @return the outputClazz
*/
- public String getOutputClazz() {
+ public Class<? extends OfHeader> getOutputClazz() {
return outputClazz;
}
-
+
@Override
public int hashCode() {
final int prime = 31;
int result = 1;
- result = prime * result
- + ((outputClazz == null) ? 0 : outputClazz.hashCode());
result = prime * result + (int) (xid ^ (xid >>> 32));
return result;
}
if (outputClazz == null) {
if (other.outputClazz != null)
return false;
- } else if (!outputClazz.equals(other.outputClazz))
- return false;
+ } else if (!other.outputClazz.isAssignableFrom(outputClazz))
+ return false;
if (xid != other.xid)
return false;
return true;
\r
private static final Logger LOGGER = LoggerFactory.getLogger(DelegatingInboundHandler.class);\r
\r
- private MessageConsumer consumer;\r
+ protected MessageConsumer consumer;\r
private boolean inactiveMessageSent = false;\r
\r
/** \r
}\r
\r
@Override\r
- public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {\r
- consumer.consume((DataObject) msg);\r
+ public void channelRead(ChannelHandlerContext ctx, final Object msg)\r
+ throws Exception {\r
+ LOGGER.debug("Reading");\r
+ new Thread(new Runnable() {\r
+ @Override\r
+ public void run() {\r
+ consumer.consume((DataObject) msg);\r
+ }\r
+ }).start();\r
}\r
\r
@Override\r
@Override\r
protected void encode(ChannelHandlerContext ctx, OfHeader msg, ByteBuf out)\r
throws Exception {\r
+ LOGGER.debug("Encoding");\r
SerializationFactory.messageToBuffer(msg.getVersion(), out, msg);\r
if (out.readableBytes() > 0) {\r
+ out.retain();\r
ctx.writeAndFlush(out);\r
} else {\r
LOGGER.warn("Translated buffer is empty");\r
import io.netty.channel.group.DefaultChannelGroup;\r
import io.netty.channel.socket.SocketChannel;\r
\r
+import java.net.InetAddress;\r
import java.util.Iterator;\r
\r
import org.opendaylight.openflowjava.protocol.api.connection.SwitchConnectionHandler;\r
\r
@Override\r
protected void initChannel(SocketChannel ch) {\r
- LOGGER.debug("building pipeline");\r
- // TODO - call switchConnectionHandler accept first\r
+ InetAddress switchAddress = ch.remoteAddress().getAddress();\r
+ LOGGER.info("Incoming connection from (remote address): " + switchAddress.toString());\r
+ if (!switchConnectionHandler.accept(switchAddress)) {\r
+ ch.disconnect();\r
+ LOGGER.info("Incoming connection rejected");\r
+ return;\r
+ }\r
+ LOGGER.info("Incoming connection accepted - building pipeline");\r
allChannels.add(ch);\r
ConnectionFacade connectionAdapter = null;\r
connectionAdapter = ConnectionAdapterFactory.createConnectionAdapter(ch);\r
return channelInitializer;
}
- /**
- * Sets and starts TCPHandler.
- *
- * @param args
- * @throws Exception
- */
- public static void main(String[] args) throws Exception {
- int port;
- if (args.length > 0) {
- port = Integer.parseInt(args[0]);
- } else {
- port = 6633;
- }
- new Thread(new TcpHandler(port)).start();
- }
-
@Override
public ListenableFuture<Boolean> getIsOnlineFuture() {
return isOnlineFuture;
return msgType;\r
}\r
\r
+ @Override\r
+ public String toString() {\r
+ return "msgVersion: " + msgVersion + " msgType: " + msgType.getName();\r
+ }\r
+\r
@Override\r
public int hashCode() {\r
final int prime = 31;\r
private EventLoopGroup group;
private SettableFuture<Boolean> isOnlineFuture;
private SettableFuture<Boolean> automatedPartDone;
- private SettableFuture<Void> dataReceived;
- private int dataLimit;
/**
* Constructor of class
private void init() {
isOnlineFuture = SettableFuture.create();
automatedPartDone = SettableFuture.create();
- dataReceived = SettableFuture.create();
}
/**
group = new NioEventLoopGroup();
try {
Bootstrap b = new Bootstrap();
- if (securedClient) {
- b.group(group)
- .channel(NioSocketChannel.class)
- .handler(new SimpleClientInitializer(isOnlineFuture));
- } else {
- SimpleClientHandler plainHandler = new SimpleClientHandler(isOnlineFuture);
- plainHandler.setDataReceivedFuture(dataReceived , dataLimit);
- b.group(group)
- .channel(NioSocketChannel.class)
- .handler(plainHandler);
- }
+ b.group(group)
+ .channel(NioSocketChannel.class)
+ .handler(new SimpleClientInitializer(isOnlineFuture, securedClient));
Channel ch = b.connect(host, port).sync().channel();
return isOnlineFuture;
}
- /**
- * @return the dataReceived
- */
- public SettableFuture<Void> getDataReceived() {
- return dataReceived;
- }
-
/**
* @return the automatedPartDone
*/
public SettableFuture<Boolean> getAutomatedPartDone() {
return automatedPartDone;
}
-
- /**
- * @param dataLimit the dataLimit to set
- */
- public void setDataLimit(int dataLimit) {
- this.dataLimit = dataLimit;
- }
}
\ No newline at end of file
package org.opendaylight.openflowjava.protocol.impl.clients;\r
\r
import io.netty.buffer.ByteBuf;\r
+import io.netty.buffer.UnpooledByteBufAllocator;\r
import io.netty.channel.ChannelHandlerContext;\r
import io.netty.channel.ChannelInboundHandlerAdapter;\r
\r
+import org.opendaylight.openflowjava.protocol.impl.util.BufferHelper;\r
import org.opendaylight.openflowjava.protocol.impl.util.ByteBufUtils;\r
import org.slf4j.Logger;\r
import org.slf4j.LoggerFactory;\r
\r
private static final Logger LOGGER = LoggerFactory.getLogger(SimpleClientHandler.class);\r
private SettableFuture<Boolean> isOnlineFuture;\r
- private SettableFuture<Void> dataReceived;\r
- private int dataLimit;\r
- private int dataCounter = 0;\r
+ private int messagesReceived;\r
\r
/**\r
* @param isOnlineFuture future notifier of connected channel\r
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {\r
LOGGER.info("SimpleClientHandler - start of read");\r
ByteBuf bb = (ByteBuf) msg;\r
- dataCounter += bb.readableBytes();\r
if (LOGGER.isDebugEnabled()) {\r
LOGGER.debug(ByteBufUtils.byteBufToHexString(bb));\r
}\r
- LOGGER.info(msg.toString());\r
- LOGGER.info("SimpleClientHandler - end of read");\r
- if (dataCounter >= dataLimit) {\r
- LOGGER.debug("data obtained");\r
- dataReceived.set(null);\r
+ messagesReceived += readHeaders(bb);\r
+ LOGGER.debug("Messages received: " + messagesReceived);\r
+ switch (messagesReceived) {\r
+ case 2:\r
+ LOGGER.debug("FeaturesReply case");\r
+ ByteBuf featuresReply = createFeaturesReplyBytebuf();\r
+ ctx.write(featuresReply);\r
+ LOGGER.debug("FeaturesReply sent");\r
+ break;\r
+ default:\r
+ LOGGER.debug("Default case");\r
+ break;\r
}\r
+\r
+ ctx.flush();\r
+ LOGGER.info("end of read");\r
+ }\r
+\r
+ private static ByteBuf createFeaturesReplyBytebuf() {\r
+ ByteBuf featuresReply = UnpooledByteBufAllocator.DEFAULT.buffer();\r
+ featuresReply.writeByte(4);\r
+ featuresReply.writeByte(6);\r
+ featuresReply.writeShort(32);\r
+ ByteBuf featuresReplyBody = BufferHelper\r
+ .buildBuffer("00 01 02 03 04 05 06 07 00 01 02 03 01 01 00 00 00"\r
+ + " 01 02 03 00 01 02 03");\r
+ featuresReply.writeBytes(featuresReplyBody);\r
+ return featuresReply;\r
}\r
\r
-/* (non-Javadoc)\r
- * @see io.netty.channel.ChannelInboundHandlerAdapter#channelActive(io.netty.channel.ChannelHandlerContext)\r
- */\r
@Override\r
public void channelActive(ChannelHandlerContext ctx) throws Exception {\r
- System.out.println("CLIENT IS ACTIVE");\r
+ System.out.println("Client is active");\r
if (isOnlineFuture != null) {\r
isOnlineFuture.set(true);\r
isOnlineFuture = null;\r
}\r
}\r
\r
- /**\r
- * @param dataReceived\r
- * @param dataLimit\r
- */\r
- public void setDataReceivedFuture(SettableFuture<Void> dataReceived, int dataLimit) {\r
- this.dataReceived = dataReceived;\r
- this.dataLimit = dataLimit;\r
+ private static int readHeaders(ByteBuf bb) {\r
+ int messages = 0;\r
+ int length = 0;\r
+ while (bb.readableBytes() > 0) {\r
+ length = bb.getShort(2);\r
+ bb.skipBytes(length);\r
+ messages++;\r
+ }\r
+ return messages;\r
}\r
- \r
- \r
+\r
}\r
public class SimpleClientInitializer extends ChannelInitializer<SocketChannel> {
private SettableFuture<Boolean> sf;
+ private boolean secured;
/**
* @param sf future notifier of connected channel
*/
- public SimpleClientInitializer(SettableFuture<Boolean> sf) {
+ public SimpleClientInitializer(SettableFuture<Boolean> sf, boolean secured) {
this.sf = sf;
+ this.secured = secured;
}
@Override
public void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
- SSLEngine engine =
- SslContextFactory.getClientContext().createSSLEngine();
- engine.setUseClientMode(true);
- pipeline.addLast("ssl", new SslHandler(engine));
+ if (secured) {
+ SSLEngine engine = SslContextFactory.getClientContext()
+ .createSSLEngine();
+ engine.setUseClientMode(true);
+ pipeline.addLast("ssl", new SslHandler(engine));
+ }
pipeline.addLast("handler", new SimpleClientHandler(sf));
sf = null;
}
configs.add(new TestingConnConfigImpl(startupAddress, DEFAULT_PORT, DEFAULT_TLS_SUPPORT));\r
scpimpl.configure(configs);\r
scpimpl.startup().get(CONNECTION_TIMEOUT, TimeUnit.MILLISECONDS);\r
- \r
+\r
int amountOfCLients = 1;\r
- List<SimpleClient> clients = createAndStartClient(amountOfCLients, 24);\r
+ List<SimpleClient> clients = createAndStartClient(amountOfCLients);\r
SimpleClient firstClient = clients.get(0);\r
firstClient.getAutomatedPartDone().get();\r
- firstClient.getDataReceived().get();\r
- disconnectClients(clients);\r
+ mockPlugin.getFinishedFuture().get();\r
+ }\r
+\r
+ /**\r
+ * Library integration and communication test (with virtual machine)\r
+ * @throws Exception\r
+ */\r
+ //@Test\r
+ public void testCommunicationWithVM() throws Exception {\r
+ mockPlugin = new MockPlugin();\r
+ SwitchConnectionProviderImpl scpimpl = new SwitchConnectionProviderImpl();\r
+ scpimpl.setSwitchConnectionHandler(mockPlugin);\r
+ List<ConnectionConfiguration> configs = new ArrayList<>();\r
+ configs.add(new TestingConnConfigImpl(startupAddress, DEFAULT_PORT, DEFAULT_TLS_SUPPORT));\r
+ scpimpl.configure(configs);\r
+ scpimpl.startup().get(CONNECTION_TIMEOUT, TimeUnit.MILLISECONDS);\r
+ mockPlugin.getFinishedFuture().get();\r
}\r
\r
/**\r
* @throws InterruptedException\r
* @throws ExecutionException\r
*/\r
- private List<SimpleClient> createAndStartClient(int amountOfCLients, int dataLimit)\r
+ private List<SimpleClient> createAndStartClient(int amountOfCLients)\r
throws InterruptedException, ExecutionException {\r
List<SimpleClient> clientsHorde = new ArrayList<>();\r
for (int i = 0; i < amountOfCLients; i++) {\r
getClass().getResourceAsStream(OF_BINARY_MESSAGE_INPUT_TXT));\r
sc.setSecuredClient(false);\r
clientsHorde.add(sc);\r
- sc.setDataLimit(dataLimit);\r
sc.start();\r
}\r
for (SimpleClient sc : clientsHorde) {\r
/* Copyright (C)2013 Pantheon Technologies, s.r.o. All rights reserved. */\r
package org.opendaylight.openflowjava.protocol.impl.integration;\r
\r
+import java.lang.reflect.InvocationTargetException;\r
import java.net.InetAddress;\r
+import java.util.Arrays;\r
+import java.util.concurrent.ExecutionException;\r
+import java.util.concurrent.TimeUnit;\r
+import java.util.concurrent.TimeoutException;\r
\r
import org.opendaylight.openflowjava.protocol.api.connection.ConnectionAdapter;\r
import org.opendaylight.openflowjava.protocol.api.connection.SwitchConnectionHandler;\r
import org.opendaylight.openflowjava.protocol.impl.util.BufferHelper;\r
+import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.EchoReplyInput;\r
+import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.EchoReplyInputBuilder;\r
import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.EchoRequestMessage;\r
import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.ErrorMessage;\r
import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.ExperimenterMessage;\r
import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.FlowRemovedMessage;\r
+import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.GetFeaturesInput;\r
+import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.GetFeaturesInputBuilder;\r
+import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.GetFeaturesOutput;\r
import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.HelloInput;\r
import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.HelloInputBuilder;\r
import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.HelloMessage;\r
import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.PortStatusMessage;\r
import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.system.rev130927.DisconnectEvent;\r
import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.system.rev130927.SystemNotificationsListener;\r
+import org.opendaylight.yangtools.yang.common.RpcError;\r
+import org.opendaylight.yangtools.yang.common.RpcResult;\r
import org.slf4j.Logger;\r
import org.slf4j.LoggerFactory;\r
\r
+import com.google.common.util.concurrent.SettableFuture;\r
+\r
/**\r
* @author michal.polkorab\r
*\r
\r
private static final Logger LOGGER = LoggerFactory.getLogger(MockPlugin.class);\r
private ConnectionAdapter adapter;\r
+ private SettableFuture<Void> finishedFuture;\r
\r
+ public MockPlugin() {\r
+ finishedFuture = SettableFuture.create();\r
+ }\r
\r
@Override\r
public void onSwitchConnected(ConnectionAdapter connection) {\r
@Override\r
public void onEchoRequestMessage(EchoRequestMessage notification) {\r
LOGGER.debug("EchoRequest message received");\r
- \r
+ LOGGER.debug("Building EchoReplyInput");\r
+ EchoReplyInputBuilder replyBuilder = new EchoReplyInputBuilder();\r
+ try {\r
+ BufferHelper.setupHeader(replyBuilder);\r
+ } catch (NoSuchMethodException | SecurityException\r
+ | IllegalAccessException | IllegalArgumentException\r
+ | InvocationTargetException e) {\r
+ LOGGER.error(e.getMessage(), e);\r
+ }\r
+ replyBuilder.setXid(notification.getXid());\r
+ EchoReplyInput echoReplyInput = replyBuilder.build();\r
+ LOGGER.debug("EchoReplyInput built");\r
+ LOGGER.debug("Going to send EchoReplyInput");\r
+ adapter.echoReply(echoReplyInput);\r
+ LOGGER.debug("EchoReplyInput sent");\r
}\r
\r
@Override\r
public void onHelloMessage(HelloMessage notification) {\r
LOGGER.debug("Hello message received");\r
HelloInputBuilder hib = new HelloInputBuilder();\r
+ GetFeaturesInputBuilder featuresBuilder = new GetFeaturesInputBuilder();\r
try {\r
BufferHelper.setupHeader(hib);\r
+ BufferHelper.setupHeader(featuresBuilder);\r
} catch (Exception e) {\r
LOGGER.error(e.getMessage(), e);\r
}\r
HelloInput hi = hib.build();\r
adapter.hello(hi);\r
+ LOGGER.debug("hello msg sent");\r
+ GetFeaturesInput featuresInput = featuresBuilder.build();\r
+ try {\r
+ LOGGER.debug("Going to send featuresRequest");\r
+ RpcResult<GetFeaturesOutput> rpcResult = adapter.getFeatures(\r
+ featuresInput).get(2500, TimeUnit.MILLISECONDS);\r
+ if (rpcResult.isSuccessful()) {\r
+ byte[] byteArray = rpcResult.getResult().getDatapathId()\r
+ .toByteArray();\r
+ LOGGER.debug("DatapathId: " + Arrays.toString(byteArray));\r
+ } else {\r
+ RpcError rpcError = rpcResult.getErrors().iterator().next();\r
+ LOGGER.warn("rpcResult failed: "\r
+ + rpcError.getCause().getMessage(), rpcError.getCause());\r
+ }\r
+ } catch (InterruptedException | ExecutionException | TimeoutException e) {\r
+ LOGGER.error(e.getMessage(), e);\r
+ }\r
+ LOGGER.info("After FeaturesReply message - disconnecting");\r
+ adapter.disconnect();\r
+ finishedFuture.set(null);\r
}\r
\r
@Override\r
LOGGER.debug("disconnection ocured: "+notification.getInfo());\r
}\r
\r
+ public SettableFuture<Void> getFinishedFuture() {\r
+ return finishedFuture;\r
+ }\r
+\r
+\r
}\r