*/
package org.opendaylight.protocol.bgp.rib.impl;
+import io.netty.util.concurrent.Future;
+
import java.io.Closeable;
import java.io.IOException;
}
@Override
- public BGPSession createClient(final BGPConnection connection, final ProtocolMessageFactory parser) throws IOException {
- return (BGPSession) this.dispatcher.createClient(connection, new BGPSessionFactory(parser));
+ public Future<? extends BGPSession> createClient(final BGPConnection connection, final ProtocolMessageFactory parser) throws IOException {
+ return this.dispatcher.createClient(connection, new BGPSessionFactory(parser));
}
public Dispatcher getDispatcher() {
import java.io.Closeable;
import java.io.IOException;
import java.net.InetSocketAddress;
+import java.util.concurrent.ExecutionException;
import org.opendaylight.protocol.bgp.parser.BGPSession;
import org.opendaylight.protocol.bgp.parser.BGPSessionListener;
import org.opendaylight.protocol.concepts.ListenerRegistration;
import org.opendaylight.protocol.framework.ProtocolMessageFactory;
+import com.google.common.base.Preconditions;
+
/**
* Implementation of {@link BGP}.
*/
public BGPImpl(final BGPDispatcher dispatcher, final ProtocolMessageFactory parser, final InetSocketAddress address,
final BGPSessionProposal proposal, final BGPSessionProposalChecker checker) throws IOException {
- this.dispatcher = dispatcher;
- this.parser = parser;
- this.address = address;
- this.proposal = proposal;
+ this.dispatcher = Preconditions.checkNotNull(dispatcher);
+ this.parser = Preconditions.checkNotNull(parser);
+ this.address = Preconditions.checkNotNull(address);
+ this.proposal = Preconditions.checkNotNull(proposal);
this.checker = checker;
}
* {@inheritDoc}
*/
@Override
- public ListenerRegistration<BGPSessionListener> registerUpdateListener(final BGPSessionListener listener) throws IOException {
- final BGPSession session = this.dispatcher.createClient(
- new BGPConnectionImpl(this.address, listener, this.proposal.getProposal(), this.checker), this.parser);
+ public BGPListenerRegistration registerUpdateListener(final BGPSessionListener listener) throws IOException {
+ final BGPSession session;
+ try {
+ session = this.dispatcher.createClient(
+ new BGPConnectionImpl(this.address, listener, this.proposal.getProposal(), this.checker), this.parser).get();
+ } catch (InterruptedException | ExecutionException e) {
+ throw new IOException("Failed to connect to peer", e);
+ }
return new BGPListenerRegistration(listener, session);
}
import org.opendaylight.protocol.bgp.rib.impl.spi.BGPConnection;
import org.opendaylight.protocol.framework.ProtocolConnection;
import org.opendaylight.protocol.framework.ProtocolMessageFactory;
-import org.opendaylight.protocol.framework.ProtocolSession;
import org.opendaylight.protocol.framework.ProtocolSessionFactory;
import org.opendaylight.protocol.framework.SessionParent;
/**
*
*/
-public final class BGPSessionFactory implements ProtocolSessionFactory {
+public final class BGPSessionFactory implements ProtocolSessionFactory<BGPSessionImpl> {
private final ProtocolMessageFactory parser;
}
@Override
- public ProtocolSession getProtocolSession(final SessionParent parent, final Timer timer, final ProtocolConnection connection,
+ public BGPSessionImpl getProtocolSession(final SessionParent parent, final Timer timer, final ProtocolConnection connection,
final int sessionId, final ChannelHandlerContext ctx) {
return new BGPSessionImpl(parent, timer, (BGPConnection) connection, sessionId, this.parser, ctx);
}
*/
package org.opendaylight.protocol.bgp.rib.impl.spi;
+import io.netty.util.concurrent.Future;
+
import java.io.IOException;
import org.opendaylight.protocol.bgp.parser.BGPSession;
* @return client session
* @throws IOException
*/
- BGPSession createClient(BGPConnection connection, ProtocolMessageFactory parser) throws IOException;
+ Future<? extends BGPSession> createClient(BGPConnection connection, ProtocolMessageFactory parser) throws IOException;
}
import static org.junit.Assert.assertEquals;
import static org.mockito.Matchers.any;
import static org.mockito.Mockito.doReturn;
+import io.netty.util.concurrent.Future;
-import java.io.IOException;
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
import java.util.Collections;
import org.junit.After;
import org.mockito.Mock;
import org.mockito.MockitoAnnotations;
import org.opendaylight.protocol.bgp.parser.BGPParameter;
+import org.opendaylight.protocol.bgp.parser.BGPSession;
import org.opendaylight.protocol.bgp.rib.impl.BGPImpl.BGPListenerRegistration;
import org.opendaylight.protocol.bgp.rib.impl.spi.BGPConnection;
import org.opendaylight.protocol.bgp.rib.impl.spi.BGPDispatcher;
@Mock
private ProtocolMessageFactory parser;
+ @Mock
+ private Future<BGPSession> future;
+
private BGPImpl bgp;
@Before
- public void setUp() throws IOException {
+ public void setUp() throws Exception {
MockitoAnnotations.initMocks(this);
doReturn("").when(this.parser).toString();
- doReturn(null).when(this.disp).createClient(any(BGPConnection.class), any(ProtocolMessageFactory.class));
+
+ doReturn(null).when(this.future).get();
+ doReturn(future).when(this.disp).createClient(any(BGPConnection.class), any(ProtocolMessageFactory.class));
}
@Test
- public void testBgpImpl() throws IOException {
+ public void testBgpImpl() throws Exception {
doReturn(new BGPSessionPreferences(null, 0, null, Collections.<BGPParameter> emptyList())).when(this.prop).getProposal();
- this.bgp = new BGPImpl(this.disp, this.parser, null, this.prop, null);
- final BGPListenerRegistration reg = (BGPListenerRegistration) this.bgp.registerUpdateListener(new SimpleSessionListener());
+ this.bgp = new BGPImpl(this.disp, this.parser, new InetSocketAddress(InetAddress.getLoopbackAddress(), 2000), this.prop, null);
+ final BGPListenerRegistration reg = this.bgp.registerUpdateListener(new SimpleSessionListener());
assertEquals(SimpleSessionListener.class, reg.getListener().getClass());
}
*/
package org.opendaylight.protocol.framework;
+import io.netty.util.concurrent.Future;
+
import java.io.IOException;
import java.net.InetSocketAddress;
*
* @return instance of ProtocolServer
*/
- public ProtocolServer createServer(final InetSocketAddress address, final ProtocolConnectionFactory connectionFactory,
- final ProtocolSessionFactory sfactory) throws IOException;
+ public Future<ProtocolServer> createServer(final InetSocketAddress address, final ProtocolConnectionFactory connectionFactory,
+ final ProtocolSessionFactory<?> sfactory) throws IOException;
/**
* Creates a client.
*
* @return session associated with this client
*/
- public ProtocolSession createClient(final ProtocolConnection connection, final ProtocolSessionFactory sfactory) throws IOException;
+ public <T extends ProtocolSession> Future<T> createClient(final ProtocolConnection connection, final ProtocolSessionFactory<T> sfactory) throws IOException;
}
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.util.concurrent.DefaultPromise;
+import io.netty.util.concurrent.Future;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.Map;
import java.util.Timer;
-import java.util.concurrent.ExecutionException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
}
- final class ClientChannelInitializer extends ChannelInitializer<SocketChannel> {
+ final class ClientChannelInitializer<T extends ProtocolSession> extends ChannelInitializer<SocketChannel> {
- private final ProtocolSessionFactory sfactory;
+ private final ProtocolSessionFactory<T> sfactory;
private final ProtocolConnection connection;
- private ProtocolSession session;
+ private T session;
- public ClientChannelInitializer(final ProtocolConnection connection, final ProtocolSessionFactory sfactory) {
+ public ClientChannelInitializer(final ProtocolConnection connection, final ProtocolSessionFactory<T> sfactory) {
this.connection = connection;
this.sfactory = sfactory;
}
ch.pipeline().addAfter("inbound", "encoder", factory.getEncoder());
}
- public ProtocolSession getSession() {
+ public T getSession() {
return this.session;
}
+ }
+
+ static final class ProtocolServerPromise extends DefaultPromise<ProtocolServer> {
+ private final ChannelFuture cf;
+
+ ProtocolServerPromise(final ChannelFuture cf) {
+ super();
+ this.cf = cf;
+ }
+ @Override
+ public synchronized boolean cancel(final boolean mayInterruptIfRunning) {
+ this.cf.cancel(mayInterruptIfRunning);
+ return super.cancel(mayInterruptIfRunning);
+ }
}
- static final class ProtocolSessionPromise extends DefaultPromise<ProtocolSession> {
+ static final class ProtocolSessionPromise<T extends ProtocolSession> extends DefaultPromise<T> {
private final ChannelFuture cf;
ProtocolSessionPromise(final ChannelFuture cf) {
}
@Override
- public ProtocolServer createServer(final InetSocketAddress address, final ProtocolConnectionFactory connectionFactory,
- final ProtocolSessionFactory sessionFactory) {
+ public Future<ProtocolServer> createServer(final InetSocketAddress address, final ProtocolConnectionFactory connectionFactory,
+ final ProtocolSessionFactory<?> sessionFactory) {
final ProtocolServer server = new ProtocolServer(address, connectionFactory, sessionFactory, this);
final ServerBootstrap b = new ServerBootstrap();
b.group(this.bossGroup, this.workerGroup);
// Bind and start to accept incoming connections.
final ChannelFuture f = b.bind(address);
- this.serverSessions.put(server, f.channel());
+ final ProtocolServerPromise p = new ProtocolServerPromise(f);
+
+ f.addListener(new ChannelFutureListener() {
+ @Override
+ public void operationComplete(final ChannelFuture cf) {
+ if (cf.isSuccess()) {
+ p.setSuccess(server);
+ synchronized (serverSessions) {
+ serverSessions.put(server, cf.channel());
+ }
+ return;
+ } else if (cf.isCancelled()) {
+ p.cancel(false);
+ } else
+ p.setFailure(cf.cause());
+ }
+ });
+
logger.debug("Created server {}.", server);
- return server;
+ return p;
}
@Override
- public ProtocolSession createClient(final ProtocolConnection connection, final ProtocolSessionFactory sfactory) {
+ public <T extends ProtocolSession> Future<T> createClient(final ProtocolConnection connection, final ProtocolSessionFactory<T> sfactory) {
final Bootstrap b = new Bootstrap();
b.group(this.workerGroup);
b.channel(NioSocketChannel.class);
b.option(ChannelOption.SO_KEEPALIVE, true);
- final ClientChannelInitializer init = new ClientChannelInitializer(connection, sfactory);
+ final ClientChannelInitializer<T> init = new ClientChannelInitializer<T>(connection, sfactory);
b.handler(init);
final ChannelFuture f = b.connect(connection.getPeerAddress());
- final ProtocolSessionPromise p = new ProtocolSessionPromise(f);
+ final ProtocolSessionPromise<T> p = new ProtocolSessionPromise<T>(f);
f.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(final ChannelFuture cf) {
if (cf.isSuccess()) {
- p.setSuccess(init.getSession());
+ final T s = init.getSession();
+ p.setSuccess(s);
+ synchronized (clientSessions) {
+ clientSessions.put(s, cf.channel());
+ }
return;
} else if (cf.isCancelled()) {
p.cancel(false);
p.setFailure(cf.cause());
}
});
- ProtocolSession s = null;
- try {
- s = p.get();
- this.clientSessions.put(p.get(), f.channel());
- } catch (InterruptedException | ExecutionException e) {
- logger.warn("Client not created. Exception {}.", e.getMessage(), e);
- }
+
logger.debug("Client created.");
- return s;
+ return p;
}
@Override
@Override
public void onSessionClosed(final ProtocolSession session) {
- logger.trace("Removing client session: {}", session);
- final Channel ch = this.clientSessions.get(session);
- ch.close();
- this.clientSessions.remove(session);
- logger.debug("Removed client session: {}", session.toString());
+ synchronized (clientSessions) {
+ logger.trace("Removing client session: {}", session);
+ final Channel ch = this.clientSessions.get(session);
+ ch.close();
+ this.clientSessions.remove(session);
+ logger.debug("Removed client session: {}", session.toString());
+ }
}
void onServerClosed(final ProtocolServer server) {
- logger.trace("Removing server session: {}", server);
- final Channel ch = this.serverSessions.get(server);
- ch.close();
- this.clientSessions.remove(server);
- logger.debug("Removed server session: {}", server.toString());
+ synchronized (serverSessions) {
+ logger.trace("Removing server session: {}", server);
+ final Channel ch = this.serverSessions.get(server);
+ ch.close();
+ this.clientSessions.remove(server);
+ logger.debug("Removed server session: {}", server.toString());
+ }
}
}
private final InetSocketAddress serverAddress;
private final ProtocolConnectionFactory connectionFactory;
- private final ProtocolSessionFactory sessionFactory;
+ private final ProtocolSessionFactory<?> sessionFactory;
/**
* Maps clients of this server to their address. The client is represented as PCEP session. Used BiMap for
* @param sessionFactory factory for sessions
*/
public ProtocolServer(final InetSocketAddress address, final ProtocolConnectionFactory connectionFactory,
- final ProtocolSessionFactory sessionFactory, final Dispatcher parent) {
+ final ProtocolSessionFactory<?> sessionFactory, final Dispatcher parent) {
this.serverAddress = address;
this.sessions = HashBiMap.create();
this.connectionFactory = connectionFactory;
/**
* Factory for generating Protocol Sessions. This class should be extended to return protocol specific session.
*/
-public interface ProtocolSessionFactory {
+public interface ProtocolSessionFactory<T extends ProtocolSession> {
/**
* Creates and returns protocol specific session.
* @param sessionId session identifier
* @return new session
*/
- public ProtocolSession getProtocolSession(SessionParent dispatcher, Timer timer, ProtocolConnection connection, int sessionId,
+ public T getProtocolSession(SessionParent dispatcher, Timer timer, ProtocolConnection connection, int sessionId,
ChannelHandlerContext ctx);
}
package org.opendaylight.protocol.framework;
import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
import java.io.IOException;
+import java.net.ConnectException;
import java.net.InetSocketAddress;
import java.util.concurrent.ExecutionException;
}
};
}
- }, new SimpleSessionFactory(MAX_MSGSIZE));
+ }, new SimpleSessionFactory(MAX_MSGSIZE)).get();
this.clientDispatcher = new DispatcherImpl(new MessageFactory());
public SessionListener getListener() {
return ServerTest.this.pce;
}
- }, new SimpleSessionFactory(MAX_MSGSIZE));
+ }, new SimpleSessionFactory(MAX_MSGSIZE)).get();
final int maxAttempts = 1000;
int attempts = 0;
assertTrue(this.pce.up);
}
- @Test
- public void testConnectionFailed() throws IOException, InterruptedException, ExecutionException {
+ public void testConnectionFailed() throws IOException, InterruptedException {
this.dispatcher = new DispatcherImpl(new MessageFactory());
this.clientDispatcher = new DispatcherImpl(new MessageFactory());
final SimpleSessionListener listener = new SimpleSessionListener();
- final ProtocolSession session = this.clientDispatcher.createClient(new ProtocolConnection() {
- @Override
- public SessionPreferencesChecker getProposalChecker() {
- return new SimpleSessionProposalChecker();
- }
-
- @Override
- public SessionPreferences getProposal() {
- return new SimpleSessionPreferences();
- }
-
- @Override
- public InetSocketAddress getPeerAddress() {
- return ServerTest.this.serverAddress;
- }
-
- @Override
- public SessionListener getListener() {
- return listener;
- }
- }, new SimpleSessionFactory(MAX_MSGSIZE));
- if (session == null)
- listener.failed = true;
- final int maxAttempts = 100;
- int attempts = 0;
- synchronized (listener) {
- while (!listener.failed && ++attempts < maxAttempts) {
- listener.wait(100);
- }
+ try {
+ final ProtocolSession session = this.clientDispatcher.createClient(new ProtocolConnection() {
+ @Override
+ public SessionPreferencesChecker getProposalChecker() {
+ return new SimpleSessionProposalChecker();
+ }
+
+ @Override
+ public SessionPreferences getProposal() {
+ return new SimpleSessionPreferences();
+ }
+
+ @Override
+ public InetSocketAddress getPeerAddress() {
+ return ServerTest.this.serverAddress;
+ }
+
+ @Override
+ public SessionListener getListener() {
+ return listener;
+ }
+ }, new SimpleSessionFactory(MAX_MSGSIZE)).get();
+
+ fail("Connection succeeded unexpectedly");
+ } catch (ExecutionException e) {
+ assertTrue(listener.failed);
+ assertTrue(e.getCause() instanceof ConnectException);
}
- assertTrue(listener.failed);
}
@After
import java.util.Timer;
-public final class SimpleSessionFactory implements ProtocolSessionFactory {
+public final class SimpleSessionFactory implements ProtocolSessionFactory<SimpleSession> {
private final int maximumMessageSize;
public SimpleSessionFactory(final int maximumMessageSize) {
}
@Override
- public ProtocolSession getProtocolSession(final SessionParent parent, final Timer timer, final ProtocolConnection connection,
+ public SimpleSession getProtocolSession(final SessionParent parent, final Timer timer, final ProtocolConnection connection,
final int sessionId, final ChannelHandlerContext ctx) {
return new SimpleSession(connection, parent, this.maximumMessageSize);
}
*/
package org.opendaylight.protocol.pcep;
+import io.netty.util.concurrent.Future;
+
import java.io.IOException;
import java.net.InetSocketAddress;
* @return instance of PCEPServer
* @throws IOException if some IO error occurred
*/
- public ProtocolServer createServer(final InetSocketAddress address, final PCEPConnectionFactory connectionFactory) throws IOException;
+ public Future<ProtocolServer> createServer(final InetSocketAddress address, final PCEPConnectionFactory connectionFactory) throws IOException;
/**
* Creates a client. Needs to be started via the start method.
* @return session associated with this client.
* @throws IOException if some IO error occurred
*/
- public PCEPSession createClient(PCEPConnection connection) throws IOException;
+ public Future<? extends PCEPSession> createClient(PCEPConnection connection) throws IOException;
/**
* Sets the limit of maximum unknown messages per minute. If not set by the user, default is 5 messages/minute.
*/
package org.opendaylight.protocol.pcep.impl;
+import io.netty.util.concurrent.Future;
+
import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.concurrent.ExecutionException;
}
@Override
- public ProtocolServer createServer(final InetSocketAddress address, final PCEPConnectionFactory connectionFactory) throws IOException {
+ public Future<ProtocolServer> createServer(final InetSocketAddress address, final PCEPConnectionFactory connectionFactory) throws IOException {
connectionFactory.setProposal(this.proposalFactory, address, 0);
return this.dispatcher.createServer(address, connectionFactory, new PCEPSessionFactoryImpl(this.maxUnknownMessages));
}
* @throws InterruptedException
*/
@Override
- public PCEPSession createClient(final PCEPConnection connection) throws IOException {
- return (PCEPSession) this.dispatcher.createClient(connection, new PCEPSessionFactoryImpl(this.maxUnknownMessages));
+ public Future<? extends PCEPSession> createClient(final PCEPConnection connection) throws IOException {
+ return this.dispatcher.createClient(connection, new PCEPSessionFactoryImpl(this.maxUnknownMessages));
}
@Override
import java.util.Timer;
import org.opendaylight.protocol.framework.ProtocolConnection;
-import org.opendaylight.protocol.framework.ProtocolSession;
import org.opendaylight.protocol.framework.ProtocolSessionFactory;
import org.opendaylight.protocol.framework.SessionParent;
import org.opendaylight.protocol.pcep.PCEPConnection;
-public class PCEPSessionFactoryImpl implements ProtocolSessionFactory {
+public class PCEPSessionFactoryImpl implements ProtocolSessionFactory<PCEPSessionImpl> {
private final int maxUnknownMessages;
}
@Override
- public ProtocolSession getProtocolSession(final SessionParent parent, final Timer timer, final ProtocolConnection connection,
+ public PCEPSessionImpl getProtocolSession(final SessionParent parent, final Timer timer, final ProtocolConnection connection,
final int sessionId, final ChannelHandlerContext ctx) {
return new PCEPSessionImpl(parent, timer, (PCEPConnection) connection, new PCEPMessageFactory(), this.maxUnknownMessages, sessionId, ctx);
}
*/
package org.opendaylight.protocol.pcep.testtool;
-import java.io.IOException;
import java.net.InetAddress;
import java.net.InetSocketAddress;
"With no parameters, this help is printed.";
- public static void main(final String[] args) throws IOException {
+ public static void main(final String[] args) throws Exception {
if (args.length == 0 || (args.length == 1 && args[0].equalsIgnoreCase("--help"))) {
System.out.println(Main.usage);
return;
@Override
public void setProposal(final PCEPSessionProposalFactory proposals, final InetSocketAddress address, final int sessionId) {
}
- });
+ }).get();
// try {
// Thread.sleep(10000);