}
}
- synchronized void sendMessage(final Notification msg) {
- try {
- this.channel.writeAndFlush(msg).addListener(
- new ChannelFutureListener() {
- @Override
- public void operationComplete(final ChannelFuture f) {
- if (!f.isSuccess()) {
- LOG.info("Failed to send message {} to socket {}", msg, f.cause(), BGPSessionImpl.this.channel);
- } else {
- LOG.trace("Message {} sent to socket {}", msg, BGPSessionImpl.this.channel);
- }
+ @GuardedBy("this")
+ private final void writeEpilogue(final ChannelFuture future, final Notification msg) {
+ future.addListener(
+ new ChannelFutureListener() {
+ @Override
+ public void operationComplete(final ChannelFuture f) {
+ if (!f.isSuccess()) {
+ LOG.info("Failed to send message {} to socket {}", msg, f.cause(), BGPSessionImpl.this.channel);
+ } else {
+ LOG.trace("Message {} sent to socket {}", msg, BGPSessionImpl.this.channel);
}
- });
- this.lastMessageSentAt = System.nanoTime();
- this.sessionStats.updateSentMsgTotal();
- if (msg instanceof Update) {
- this.sessionStats.updateSentMsgUpd();
- } else if (msg instanceof Notify) {
- this.sessionStats.updateSentMsgErr((Notify) msg);
- }
+ }
+ });
+ this.lastMessageSentAt = System.nanoTime();
+ this.sessionStats.updateSentMsgTotal();
+ if (msg instanceof Update) {
+ this.sessionStats.updateSentMsgUpd();
+ } else if (msg instanceof Notify) {
+ this.sessionStats.updateSentMsgErr((Notify) msg);
+ }
+ }
+
+ void flush() {
+ this.channel.flush();
+ }
+
+ synchronized void write(final Notification msg) {
+ try {
+ writeEpilogue(this.channel.write(msg), msg);
} catch (final Exception e) {
LOG.warn("Message {} was not sent.", msg, e);
}
}
+ synchronized void sendMessage(final Notification msg) {
+ writeEpilogue(this.channel.writeAndFlush(msg), msg);
+ }
+
private synchronized void closeWithoutMessage() {
LOG.debug("Closing session: {}", this);
removePeerSession();
this.session = Preconditions.checkNotNull(session);
}
- void write(final Notification msg) {
+ private void ensureWritable() {
if (blocked) {
LOG.trace("Blocked slow path tripped on session {}", session);
synchronized (this) {
LOG.debug("Resuming write on session {}", session);
}
}
+ }
+
+ void write(final Notification msg) {
+ ensureWritable();
+ session.write(msg);
+ }
+ void writeAndFlush(final Notification msg) {
+ ensureWritable();
session.sendMessage(msg);
}
void flush() {
- // FIXME: no-op, as we do not have hatching APIs in session yet
+ session.flush();
}
@Override
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.mockito.Matchers.any;
-
import com.google.common.base.Optional;
import com.google.common.collect.Lists;
import com.google.common.util.concurrent.CheckedFuture;
import io.netty.channel.Channel;
+import io.netty.channel.DefaultChannelPromise;
import io.netty.channel.EventLoop;
import java.math.BigInteger;
import java.net.InetSocketAddress;
import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.network.concepts.rev131125.IsoSystemIdentifier;
import org.opendaylight.yangtools.yang.binding.DataObject;
import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
+import org.opendaylight.yangtools.yang.binding.Notification;
public class ApplicationPeerTest {
Mockito.doReturn(null).when(this.eventLoop).schedule(any(Runnable.class), any(long.class), any(TimeUnit.class));
Mockito.doReturn(Boolean.TRUE).when(this.channel).isWritable();
Mockito.doReturn(null).when(this.channel).close();
+ Mockito.doReturn(new DefaultChannelPromise(channel)).when(this.channel).writeAndFlush(any(Notification.class));
Mockito.doReturn(new InetSocketAddress("localhost", 12345)).when(this.channel).remoteAddress();
Mockito.doReturn(new InetSocketAddress("localhost", 12345)).when(this.channel).localAddress();