private final Channel channel;
@GuardedBy("this")
private State state = State.IDLE;
-
@GuardedBy("this")
private BGPSessionImpl session;
this.channel.eventLoop().schedule(new Runnable() {
@Override
public void run() {
- if (AbstractBGPSessionNegotiator.this.state != State.FINISHED) {
- AbstractBGPSessionNegotiator.this.sendMessage(buildErrorNotify(BGPError.HOLD_TIMER_EXPIRED));
- negotiationFailed(new BGPDocumentedException("HoldTimer expired", BGPError.FSM_ERROR));
- AbstractBGPSessionNegotiator.this.state = State.FINISHED;
+ synchronized (AbstractBGPSessionNegotiator.this) {
+ if (AbstractBGPSessionNegotiator.this.state != State.FINISHED) {
+ AbstractBGPSessionNegotiator.this
+ .sendMessage(buildErrorNotify(BGPError.HOLD_TIMER_EXPIRED));
+ negotiationFailed(new BGPDocumentedException("HoldTimer expired", BGPError.FSM_ERROR));
+ AbstractBGPSessionNegotiator.this.state = State.FINISHED;
+ }
}
}
}, INITIAL_HOLDTIMER, TimeUnit.MINUTES);
}
protected synchronized void handleMessage(final Notification msg) {
- LOG.debug("Channel {} handling message in state {}", this.channel, this.state);
-
+ LOG.debug("Channel {} handling message in state {}, msg: {}", this.channel, this.state, msg);
switch (this.state) {
case FINISHED:
sendMessage(buildErrorNotify(BGPError.FSM_ERROR));
return;
case IDLE:
// to avoid race condition when Open message was sent by the peer before startNegotiation could be executed
- if (msg instanceof Open) {
- startNegotiation();
- handleOpen((Open) msg);
- return;
- }
+ if (msg instanceof Open) {
+ startNegotiation();
+ handleOpen((Open) msg);
+ return;
+ }
sendMessage(buildErrorNotify(BGPError.FSM_ERROR));
- return;
+ break;
case OPEN_CONFIRM:
if (msg instanceof Keepalive) {
negotiationSuccessful(this.session);
return builder.build();
}
- private void handleOpen(final Open openObj) {
+ private synchronized void handleOpen(final Open openObj) {
final IpAddress remoteIp = getRemoteIp();
final BGPSessionPreferences preferences = this.registry.getPeerPreferences(remoteIp);
try {
final BGPSessionListener peer = this.registry.getPeer(remoteIp, getSourceId(openObj, preferences), getDestinationId(openObj, preferences), openObj);
sendMessage(new KeepaliveBuilder().build());
- this.session = new BGPSessionImpl(peer, this.channel, openObj, preferences, this.registry);
this.state = State.OPEN_CONFIRM;
- LOG.debug("Channel {} moved to OpenConfirm state with remote proposal {}", this.channel, openObj);
+ this.session = new BGPSessionImpl(peer, this.channel, openObj, preferences, this.registry);
+ this.session.setChannelExtMsgCoder(openObj);
+ LOG.debug("Channel {} moved to OPEN_CONFIRM state with remote proposal {}", this.channel, openObj);
} catch (final BGPDocumentedException e) {
LOG.warn("Channel {} negotiation failed", this.channel, e);
negotiationFailed(e);
}
}
- private void negotiationFailed(final Throwable e) {
+ private synchronized void negotiationFailed(final Throwable e) {
LOG.warn("Channel {} negotiation failed: {}", this.channel, e.getMessage());
if (e instanceof BGPDocumentedException) {
// although sendMessage() can also result in calling this method, it won't create a cycle. In case sendMessage() fails to
@Override
public void operationComplete(final ChannelFuture f) {
if (!f.isSuccess()) {
- LOG.warn("Failed to send message {}", msg, f.cause());
+ LOG.warn("Failed to send message {} to channel {}", msg, AbstractBGPSessionNegotiator.this.channel, f.cause());
negotiationFailedCloseChannel(f.cause());
} else {
- LOG.trace("Message {} sent to socket", msg);
+ LOG.trace("Message {} sent to channel {}", msg, AbstractBGPSessionNegotiator.this.channel);
}
}
});
this.keepAlive = this.holdTimerValue / KA_TO_DEADTIMER_RATIO;
this.asNumber = AsNumberUtil.advertizedAsNumber(remoteOpen);
this.peerRegistry = peerRegistry;
- final boolean enableExMess = BgpExtendedMessageUtil.advertizedBgpExtendedMessageCapability(remoteOpen);
- if (enableExMess) {
- this.channel.pipeline().replace(BGPMessageHeaderDecoder.class, EXTENDED_MSG_DECODER, BGPMessageHeaderDecoder.getExtendedBGPMessageHeaderDecoder());
- }
final Set<TablesKey> tts = Sets.newHashSet();
final Set<BgpTableType> tats = Sets.newHashSet();
this.tableTypes, this.addPathTypes);
}
+ /**
+ * Set the extend message coder for current channel
+ * The reason for separating this part from constructor is, in #channel.pipeline().replace(..), the
+ * invokeChannelRead() will be invoked after the original message coder handler got removed. And there
+ * is chance that before the session instance is fully initiated (constructor returns), a KeepAlive
+ * message arrived already in the channel buffer. Thus #AbstractBGPSessionNegotiator.handleMessage(..)
+ * gets invoked again and a deadlock is caused. A BGP final state machine error will happen as BGP
+ * negotiator is still in OPEN_SENT state as the session constructor hasn't returned yet.
+ *
+ * @param remoteOpen
+ */
+ public synchronized void setChannelExtMsgCoder(final Open remoteOpen) {
+ final boolean enableExMess = BgpExtendedMessageUtil.advertizedBgpExtendedMessageCapability(remoteOpen);
+ if (enableExMess) {
+ this.channel.pipeline().replace(BGPMessageHeaderDecoder.class, EXTENDED_MSG_DECODER, BGPMessageHeaderDecoder.getExtendedBGPMessageHeaderDecoder());
+ }
+ }
+
@Override
public synchronized void close() {
if (this.state != State.IDLE) {
import org.slf4j.LoggerFactory;
public class BGPDispatcherImplTest {
-
+ private static final short HOLD_TIMER = 30;
private static final AsNumber AS_NUMBER = new AsNumber(30L);
private static final int RETRY_TIMER = 1;
private static final BgpTableType IPV_4_TT = new BgpTableTypeImpl(Ipv4AddressFamily.class, UnicastSubsequentAddressFamily.class);
Assert.assertEquals(Sets.newHashSet(IPV_4_TT), session.getAdvertisedTableTypes());
Assert.assertTrue(serverChannel.isWritable());
session.close();
-
+ this.serverListener.releaseConnection();
checkIdleState(this.clientListener);
checkIdleState(this.serverListener);
}
capas.add(new OptionalCapabilitiesBuilder().setCParameters(BgpExtendedMessageUtil.EXTENDED_MESSAGE_CAPABILITY).build());
tlvs.add(new BgpParametersBuilder().setOptionalCapabilities(capas).build());
final BgpId bgpId = new BgpId(new Ipv4Address(socketAddress.getAddress().getHostAddress()));
- return new BGPSessionPreferences(AS_NUMBER, (short) 4, bgpId, AS_NUMBER, tlvs, Optional.absent());
+ return new BGPSessionPreferences(AS_NUMBER, HOLD_TIMER, bgpId, AS_NUMBER, tlvs, Optional.absent());
}
}
doReturn(futureChannel).when(this.speakerListener).close();
this.listener = new SimpleSessionListener();
this.bgpSession = new BGPSessionImpl(this.listener, this.speakerListener, this.classicOpen, this.classicOpen.getHoldTimer(), null);
+ this.bgpSession.setChannelExtMsgCoder(this.classicOpen);
}
@Test
import org.mockito.Mockito;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
-import org.opendaylight.controller.config.api.IdentityAttributeRef;
import org.opendaylight.controller.config.yang.bgp.rib.impl.RouteTable;
import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
import org.opendaylight.protocol.bgp.parser.BGPDocumentedException;
import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.inet.types.rev130715.Ipv4Prefix;
import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.bgp.inet.rev150305.ipv4.routes.ipv4.routes.Ipv4Route;
import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.bgp.message.rev130919.KeepaliveBuilder;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.bgp.message.rev130919.Open;
import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.bgp.message.rev130919.OpenBuilder;
import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.bgp.message.rev130919.UpdateBuilder;
import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.bgp.message.rev130919.open.message.BgpParameters;
Lists.newArrayList(new OptionalCapabilitiesBuilder().setCParameters(new CParametersBuilder().addAugmentation(
CParameters1.class, new CParameters1Builder().setMultiprotocolCapability(new MultiprotocolCapabilityBuilder()
.setAfi(Ipv4AddressFamily.class).setSafi(UnicastSubsequentAddressFamily.class).build()).build()).build()).build())).build());
- this.session = new BGPSessionImpl(this.classic, channel, new OpenBuilder().setBgpIdentifier(new Ipv4Address("1.1.1.1")).setHoldTimer(50).setMyAsNumber(72).setBgpParameters(params).build(), 30, null);
+ final Open openObj = new OpenBuilder().setBgpIdentifier(new Ipv4Address("1.1.1.1")).setHoldTimer(50).setMyAsNumber(72).setBgpParameters(params).build();
+ this.session = new BGPSessionImpl(this.classic, channel, openObj, 30, null);
+ this.session.setChannelExtMsgCoder(openObj);
}
}
/**
* Generate a random high range port number
*
- * @return A port number range from 10000 to 50000
+ * @return A port number range from 20000 to 60000
*/
public static int getRandomPort() {
- final int randPort = 10000 + (int) Math.round(40000 * Math.random());
+ final int randPort = 20000 + (int) Math.round(40000 * Math.random());
return randPort;
}