import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import io.netty.channel.Channel;
-import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.util.concurrent.Promise;
+import io.netty.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import javax.annotation.concurrent.GuardedBy;
import org.opendaylight.protocol.bgp.parser.BGPDocumentedException;
import org.opendaylight.protocol.bgp.rib.impl.spi.BGPSessionPreferences;
import org.opendaylight.protocol.bgp.rib.spi.BGPSessionListener;
import org.opendaylight.protocol.bgp.rib.spi.SessionNegotiator;
+import org.opendaylight.protocol.util.Ipv6Util;
import org.opendaylight.protocol.util.Values;
import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.inet.types.rev130715.IpAddress;
import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.inet.types.rev130715.Ipv4Address;
private final Channel channel;
@GuardedBy("this")
private State state = State.IDLE;
-
@GuardedBy("this")
private BGPSessionImpl session;
+ @GuardedBy("this")
+ private ScheduledFuture<?> pending;
AbstractBGPSessionNegotiator(final Promise<BGPSessionImpl> promise, final Channel channel,
final BGPPeerRegistry registry) {
preferences.getBgpId()).setBgpParameters(preferences.getParams()).build());
if (this.state != State.FINISHED) {
this.state = State.OPEN_SENT;
- this.channel.eventLoop().schedule(new Runnable() {
- @Override
- public void run() {
+ this.pending = this.channel.eventLoop().schedule(() -> {
+ synchronized (AbstractBGPSessionNegotiator.this) {
+ AbstractBGPSessionNegotiator.this.pending = null;
if (AbstractBGPSessionNegotiator.this.state != State.FINISHED) {
- AbstractBGPSessionNegotiator.this.sendMessage(buildErrorNotify(BGPError.HOLD_TIMER_EXPIRED));
+ AbstractBGPSessionNegotiator.this
+ .sendMessage(buildErrorNotify(BGPError.HOLD_TIMER_EXPIRED));
negotiationFailed(new BGPDocumentedException("HoldTimer expired", BGPError.FSM_ERROR));
AbstractBGPSessionNegotiator.this.state = State.FINISHED;
}
}
private IpAddress getRemoteIp() {
- return StrictBGPPeerRegistry.getIpAddress(this.channel.remoteAddress());
+ final IpAddress remoteIp = StrictBGPPeerRegistry.getIpAddress(this.channel.remoteAddress());
+ if (remoteIp.getIpv6Address() != null) {
+ return new IpAddress(Ipv6Util.getFullForm(remoteIp.getIpv6Address()));
+ }
+ return remoteIp;
}
protected synchronized void handleMessage(final Notification msg) {
- LOG.debug("Channel {} handling message in state {}", this.channel, this.state);
-
+ LOG.debug("Channel {} handling message in state {}, msg: {}", this.channel, this.state, msg);
switch (this.state) {
case FINISHED:
sendMessage(buildErrorNotify(BGPError.FSM_ERROR));
return;
case IDLE:
// to avoid race condition when Open message was sent by the peer before startNegotiation could be executed
- if (msg instanceof Open) {
- startNegotiation();
- handleOpen((Open) msg);
- return;
- }
+ if (msg instanceof Open) {
+ startNegotiation();
+ handleOpen((Open) msg);
+ return;
+ }
sendMessage(buildErrorNotify(BGPError.FSM_ERROR));
- return;
+ break;
case OPEN_CONFIRM:
if (msg instanceof Keepalive) {
negotiationSuccessful(this.session);
return builder.build();
}
- private void handleOpen(final Open openObj) {
+ private synchronized void handleOpen(final Open openObj) {
final IpAddress remoteIp = getRemoteIp();
final BGPSessionPreferences preferences = this.registry.getPeerPreferences(remoteIp);
try {
final BGPSessionListener peer = this.registry.getPeer(remoteIp, getSourceId(openObj, preferences), getDestinationId(openObj, preferences), openObj);
sendMessage(new KeepaliveBuilder().build());
- this.session = new BGPSessionImpl(peer, this.channel, openObj, preferences, this.registry);
this.state = State.OPEN_CONFIRM;
- LOG.debug("Channel {} moved to OpenConfirm state with remote proposal {}", this.channel, openObj);
+ this.session = new BGPSessionImpl(peer, this.channel, openObj, preferences, this.registry);
+ this.session.setChannelExtMsgCoder(openObj);
+ LOG.debug("Channel {} moved to OPEN_CONFIRM state with remote proposal {}", this.channel, openObj);
} catch (final BGPDocumentedException e) {
LOG.warn("Channel {} negotiation failed", this.channel, e);
negotiationFailed(e);
}
}
- private void negotiationFailed(final Throwable e) {
+ private synchronized void negotiationFailed(final Throwable e) {
LOG.warn("Channel {} negotiation failed: {}", this.channel, e.getMessage());
if (e instanceof BGPDocumentedException) {
// although sendMessage() can also result in calling this method, it won't create a cycle. In case sendMessage() fails to
private void negotiationFailedCloseChannel(final Throwable cause) {
LOG.debug("Negotiation on channel {} failed", this.channel, cause);
this.channel.close();
- this.promise.setFailure(cause);
+ synchronized (AbstractBGPSessionNegotiator.this) {
+ if (this.pending != null && this.pending.isCancellable()) {
+ this.pending.cancel(true);
+ this.pending = null;
+ }
+ }
}
private void sendMessage(final Notification msg) {
- this.channel.writeAndFlush(msg).addListener(new ChannelFutureListener() {
- @Override
- public void operationComplete(final ChannelFuture f) {
- if (!f.isSuccess()) {
- LOG.warn("Failed to send message {}", msg, f.cause());
- negotiationFailedCloseChannel(f.cause());
- } else {
- LOG.trace("Message {} sent to socket", msg);
- }
+ this.channel.writeAndFlush(msg).addListener((ChannelFutureListener) f -> {
+ if (!f.isSuccess()) {
+ LOG.warn("Failed to send message {} to channel {}", msg, AbstractBGPSessionNegotiator.this.channel, f.cause());
+ negotiationFailedCloseChannel(f.cause());
+ } else {
+ LOG.trace("Message {} sent to channel {}", msg, AbstractBGPSessionNegotiator.this.channel);
}
});
}