Use sendMessage() from AbstractSessionNegotiator 61/5461/2
authorRobert Varga <rovarga@cisco.com>
Tue, 25 Feb 2014 16:41:56 +0000 (17:41 +0100)
committerRobert Varga <rovarga@cisco.com>
Thu, 27 Feb 2014 13:53:17 +0000 (14:53 +0100)
AbstractSessionNegotiator now exposes sendMessage(), which aside from
issuing a writeAndFlush() also makes sure the message leaves the socket
and fails negotiation otherwise. Let's use that throughout our
negotiators, as that will harden them in race conditions.

Change-Id: Ice6995bc3a0121bef219268ac18cdb885118d1a1
Signed-off-by: Robert Varga <rovarga@cisco.com>
bgp/rib-impl/src/main/java/org/opendaylight/protocol/bgp/rib/impl/BGPSessionNegotiator.java
pcep/impl/src/main/java/org/opendaylight/protocol/pcep/impl/AbstractPCEPSessionNegotiator.java

index b0d824de13cdfa554fe73573da124d1fd74b6a02..7a89348ff11f462bfb5b2ab5c0d46127b14fbda5 100644 (file)
@@ -8,8 +8,6 @@
 package org.opendaylight.protocol.bgp.rib.impl;
 
 import io.netty.channel.Channel;
-import io.netty.channel.ChannelFuture;
-import io.netty.channel.ChannelFutureListener;
 import io.netty.util.Timeout;
 import io.netty.util.Timer;
 import io.netty.util.TimerTask;
@@ -86,24 +84,10 @@ public final class BGPSessionNegotiator extends AbstractSessionNegotiator<Notifi
                this.timer = Preconditions.checkNotNull(timer);
        }
 
-       private void writeMessage(final Object o) {
-               this.channel.writeAndFlush(o).addListener(new ChannelFutureListener() {
-                       @Override
-                       public void operationComplete(final ChannelFuture f) throws Exception {
-                               if (f.isSuccess()) {
-                                       LOG.trace("Message {} sent to socket", o);
-                               } else {
-                                       LOG.info("Failed to send message {}", o, f.cause());
-                                       negotiationFailed(f.cause());
-                               }
-                       }
-               });
-       }
-
        @Override
        protected void startNegotiation() {
                Preconditions.checkState(this.state == State.Idle);
-               this.writeMessage(new OpenBuilder().setMyAsNumber(this.localPref.getMyAs().getValue().intValue()).setHoldTimer(
+               this.sendMessage(new OpenBuilder().setMyAsNumber(this.localPref.getMyAs().getValue().intValue()).setHoldTimer(
                                this.localPref.getHoldTime()).setBgpIdentifier(this.localPref.getBgpId()).setBgpParameters(this.localPref.getParams()).build());
                this.state = State.OpenSent;
 
@@ -113,7 +97,7 @@ public final class BGPSessionNegotiator extends AbstractSessionNegotiator<Notifi
                        public void run(final Timeout timeout) throws Exception {
                                synchronized (lock) {
                                        if (BGPSessionNegotiator.this.state != State.Finished) {
-                                               BGPSessionNegotiator.this.writeMessage(new NotifyBuilder().setErrorCode(BGPError.HOLD_TIMER_EXPIRED.getCode()).setErrorSubcode(
+                                               BGPSessionNegotiator.this.sendMessage(new NotifyBuilder().setErrorCode(BGPError.HOLD_TIMER_EXPIRED.getCode()).setErrorSubcode(
                                                                BGPError.HOLD_TIMER_EXPIRED.getSubcode()).build());
                                                negotiationFailed(new BGPDocumentedException("HoldTimer expired", BGPError.FSM_ERROR));
                                                BGPSessionNegotiator.this.state = State.Finished;
@@ -132,7 +116,7 @@ public final class BGPSessionNegotiator extends AbstractSessionNegotiator<Notifi
                case Idle:
                        final Notify fsmError = new NotifyBuilder().setErrorCode(BGPError.FSM_ERROR.getCode()).setErrorSubcode(
                                        BGPError.FSM_ERROR.getSubcode()).build();
-                       this.writeMessage(fsmError);
+                       this.sendMessage(fsmError);
                case OpenConfirm:
                        if (msg instanceof Keepalive) {
                                negotiationSuccessful(this.session);
@@ -153,7 +137,7 @@ public final class BGPSessionNegotiator extends AbstractSessionNegotiator<Notifi
 
                // Catch-all for unexpected message
                LOG.warn("Channel {} state {} unexpected message {}", this.channel, this.state, msg);
-               this.writeMessage(new NotifyBuilder().setErrorCode(BGPError.FSM_ERROR.getCode()).setErrorSubcode(BGPError.FSM_ERROR.getSubcode()).build());
+               this.sendMessage(new NotifyBuilder().setErrorCode(BGPError.FSM_ERROR.getCode()).setErrorSubcode(BGPError.FSM_ERROR.getSubcode()).build());
                negotiationFailed(new BGPDocumentedException("Unexpected message", BGPError.FSM_ERROR));
                this.state = State.Finished;
        }
@@ -165,7 +149,7 @@ public final class BGPSessionNegotiator extends AbstractSessionNegotiator<Notifi
                                LOG.info("Open message unacceptable. Check the configuration of BGP speaker.");
                        }
                        this.remotePref = openObj;
-                       this.writeMessage(new KeepaliveBuilder().build());
+                       this.sendMessage(new KeepaliveBuilder().build());
                        this.session = new BGPSessionImpl(this.timer, this.listener, this.channel, this.remotePref);
                        this.state = State.OpenConfirm;
                        LOG.debug("Channel {} moved to OpenConfirm state with remote proposal {}", this.channel, this.remotePref);
@@ -173,7 +157,7 @@ public final class BGPSessionNegotiator extends AbstractSessionNegotiator<Notifi
                }
                final Notify ntf = new NotifyBuilder().setErrorCode(BGPError.UNSPECIFIC_OPEN_ERROR.getCode()).setErrorSubcode(
                                BGPError.UNSPECIFIC_OPEN_ERROR.getSubcode()).build();
-               this.writeMessage(ntf);
+               this.sendMessage(ntf);
                negotiationFailed(new BGPDocumentedException("Open message unacceptable. Check the configuration of BGP speaker.", BGPError.forValue(
                                ntf.getErrorCode(), ntf.getErrorSubcode())));
                this.state = State.Finished;
index e1fabedcfef2538d2185deb5bf0a55a5e8f5c105..bbaa82f8669f941b18a1c826bcaba22f915c86a5 100644 (file)
@@ -149,7 +149,7 @@ public abstract class AbstractPCEPSessionNegotiator extends AbstractSessionNegot
         */
        private void sendErrorMessage(final PCEPErrors value) {
 
-               this.channel.writeAndFlush(Util.createErrorMessage(value, null));
+               this.sendMessage(Util.createErrorMessage(value, null));
        }
 
        private void scheduleFailTimer() {
@@ -189,7 +189,7 @@ public abstract class AbstractPCEPSessionNegotiator extends AbstractSessionNegot
                this.localPrefs = getInitialProposal();
                final OpenMessage m = new org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.pcep.message.rev131007.OpenBuilder().setOpenMessage(
                                new OpenMessageBuilder().setOpen(this.localPrefs).build()).build();
-               this.channel.writeAndFlush(m);
+               this.sendMessage(m);
                this.state = State.OpenWait;
                scheduleFailTimer();
 
@@ -236,7 +236,7 @@ public abstract class AbstractPCEPSessionNegotiator extends AbstractSessionNegot
                                        this.state = State.Finished;
                                        return;
                                }
-                               this.channel.writeAndFlush(new OpenBuilder().setOpenMessage(new OpenMessageBuilder().setOpen(this.localPrefs).build()).build());
+                               this.sendMessage(new OpenBuilder().setOpenMessage(new OpenMessageBuilder().setOpen(this.localPrefs).build()).build());
                                if (!this.remoteOK) {
                                        this.state = State.OpenWait;
                                }
@@ -250,7 +250,7 @@ public abstract class AbstractPCEPSessionNegotiator extends AbstractSessionNegot
                                final org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.pcep.types.rev131005.open.message.OpenMessage o = ((org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.pcep.message.rev131007.Open) msg).getOpenMessage();
                                final Open open = o.getOpen();
                                if (isProposalAcceptable(open)) {
-                                       this.channel.writeAndFlush(KEEPALIVE);
+                                       this.sendMessage(KEEPALIVE);
                                        this.remotePrefs = open;
                                        this.remoteOK = true;
                                        if (this.localOK) {
@@ -279,7 +279,7 @@ public abstract class AbstractPCEPSessionNegotiator extends AbstractSessionNegot
                                        return;
                                }
 
-                               this.channel.writeAndFlush(Util.createErrorMessage(PCEPErrors.NON_ACC_NEG_SESSION_CHAR, newPrefs));
+                               this.sendMessage(Util.createErrorMessage(PCEPErrors.NON_ACC_NEG_SESSION_CHAR, newPrefs));
 
                                this.openRetry = true;
                                this.state = this.localOK ? State.OpenWait : State.KeepWait;