BUG-1116 : if a new peer connects, advertize everything in loc-rib 29/10329/3
authorDana Kutenicsova <dkutenic@cisco.com>
Tue, 26 Aug 2014 14:27:12 +0000 (16:27 +0200)
committerRobert Varga <rovarga@cisco.com>
Sat, 30 Aug 2014 08:33:37 +0000 (10:33 +0200)
Change-Id: I44b334593e2239fb622da2f73b3ee23ebae783ce
Signed-off-by: Dana Kutenicsova <dkutenic@cisco.com>
bgp/rib-impl/src/main/java/org/opendaylight/protocol/bgp/rib/impl/AdjRIBsTransactionImpl.java
bgp/rib-impl/src/main/java/org/opendaylight/protocol/bgp/rib/impl/BGPSessionImpl.java
bgp/rib-impl/src/main/java/org/opendaylight/protocol/bgp/rib/impl/RIBImpl.java
bgp/rib-spi/src/main/java/org/opendaylight/protocol/bgp/rib/spi/AbstractAdjRIBs.java

index 95d4b193b5bfd0dd2692edfebbd2aa0f440cd48b..2d888311b2e12ffd6ef49dd284466b8ce60c1b74 100644 (file)
@@ -9,10 +9,8 @@ package org.opendaylight.protocol.bgp.rib.impl;
 
 import com.google.common.base.Preconditions;
 import com.google.common.util.concurrent.CheckedFuture;
-
 import java.util.Map;
 import java.util.Map.Entry;
-
 import org.opendaylight.controller.md.sal.binding.api.WriteTransaction;
 import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
 import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException;
@@ -44,33 +42,36 @@ class AdjRIBsTransactionImpl implements AdjRIBsTransaction {
     @Override
     public void setUptodate(final InstanceIdentifier<Tables> basePath, final boolean uptodate) {
         final InstanceIdentifier<Attributes> aid = basePath.child(Attributes.class);
-        trans.merge(LogicalDatastoreType.OPERATIONAL, aid, new AttributesBuilder().setUptodate(uptodate).build());
+        this.trans.merge(LogicalDatastoreType.OPERATIONAL, aid, new AttributesBuilder().setUptodate(uptodate).build());
         LOG.debug("Table {} switching uptodate to {}", basePath, uptodate);
     }
 
     public CheckedFuture<Void, TransactionCommitFailedException> commit() {
-        return trans.submit();
+        return this.trans.submit();
     }
 
     @Override
     public BGPObjectComparator comparator() {
-        return comparator;
+        return this.comparator;
     }
 
     @Override
-    public <K, V extends Route> void advertise(final RouteEncoder ribOut, final K key, final InstanceIdentifier<V> id, final Peer peer, final V obj) {
-        trans.put(LogicalDatastoreType.OPERATIONAL, id, obj, true);
-        for (Entry<Peer, AdjRIBsOut> e : ribs.entrySet()) {
-            if (e.getKey() != peer) {
+    public <K, V extends Route> void advertise(final RouteEncoder ribOut, final K key, final InstanceIdentifier<V> id, final Peer advertizingPeer, final V obj) {
+        this.trans.put(LogicalDatastoreType.OPERATIONAL, id, obj, true);
+        for (final Entry<Peer, AdjRIBsOut> e : this.ribs.entrySet()) {
+            if (e.getKey() != advertizingPeer) {
                 e.getValue().put(ribOut, key, obj);
+                LOG.trace("Advertizing to peer {}", e.getKey());
+            } else {
+                LOG.trace("Not advertizing to peer {}", e.getKey());
             }
         }
     }
 
     @Override
     public <K, V extends Route> void withdraw(final RouteEncoder ribOut, final K key, final InstanceIdentifier<V> id) {
-        trans.delete(LogicalDatastoreType.OPERATIONAL, id);
-        for (AdjRIBsOut r : ribs.values()) {
+        this.trans.delete(LogicalDatastoreType.OPERATIONAL, id);
+        for (final AdjRIBsOut r : this.ribs.values()) {
             r.put(ribOut, key, null);
         }
     }
index bcd9b18e9a24b9d3d5a775527a02c5ce764833c3..6c7fa56b1843257506db360374caead043b33005 100644 (file)
@@ -13,6 +13,8 @@ import com.google.common.base.Objects.ToStringHelper;
 import com.google.common.base.Preconditions;
 import com.google.common.collect.Sets;
 import io.netty.channel.Channel;
+import io.netty.channel.ChannelFuture;
+import io.netty.channel.ChannelFutureListener;
 import java.io.IOException;
 import java.util.Date;
 import java.util.Set;
@@ -166,10 +168,10 @@ public class BGPSessionImpl extends AbstractProtocolSession<Notification> implem
         } else if (msg instanceof Notify) {
             // Notifications are handled internally
             LOG.info("Session closed because Notification message received: {} / {}", ((Notify) msg).getErrorCode(),
-                    ((Notify) msg).getErrorSubcode());
+                ((Notify) msg).getErrorSubcode());
             this.closeWithoutMessage();
             this.listener.onSessionTerminated(this, new BGPTerminationReason(BGPError.forValue(((Notify) msg).getErrorCode(),
-                    ((Notify) msg).getErrorSubcode())));
+                ((Notify) msg).getErrorSubcode())));
         } else if (msg instanceof Keepalive) {
             // Keepalives are handled internally
             LOG.trace("Received KeepAlive messsage.");
@@ -193,9 +195,19 @@ public class BGPSessionImpl extends AbstractProtocolSession<Notification> implem
 
     synchronized void sendMessage(final Notification msg) {
         try {
-            this.channel.writeAndFlush(msg);
+            this.channel.writeAndFlush(msg).addListener(
+                new ChannelFutureListener() {
+                    @Override
+                    public void operationComplete(final ChannelFuture f) {
+                        if (!f.isSuccess()) {
+                            LOG.info("Failed to send message {} to socket {}", msg, f.cause(), BGPSessionImpl.this.channel);
+                        } else {
+                            LOG.trace("Message {} sent to socket {}", msg, BGPSessionImpl.this.channel);
+                        }
+                    }
+                });
             this.lastMessageSentAt = System.nanoTime();
-            LOG.debug("Sent message: {}", msg);
+            LOG.debug("Sent message: {} to peer {}", msg, this.bgpId);
         } catch (final Exception e) {
             LOG.warn("Message {} was not sent.", msg, e);
         }
@@ -310,12 +322,12 @@ public class BGPSessionImpl extends AbstractProtocolSession<Notification> implem
     }
 
     synchronized boolean isWritable() {
-        return channel != null && channel.isWritable();
+        return this.channel != null && this.channel.isWritable();
     }
 
     synchronized void schedule(final Runnable task) {
-        Preconditions.checkState(channel != null);
-        channel.eventLoop().submit(task);
+        Preconditions.checkState(this.channel != null);
+        this.channel.eventLoop().submit(task);
 
     }
 
index 3d4e522ca808adb13361333380025469adf85f00..39f81cd7d8a3d7854425ad524a7ceba8584acaca 100644 (file)
@@ -17,10 +17,11 @@ import com.google.common.util.concurrent.Futures;
 
 import java.util.Collections;
 import java.util.List;
+import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.ExecutionException;
-
+import java.util.concurrent.LinkedBlockingQueue;
 import javax.annotation.concurrent.ThreadSafe;
 
 import org.opendaylight.controller.md.sal.binding.api.BindingTransactionChain;
@@ -36,6 +37,7 @@ import org.opendaylight.protocol.bgp.rib.impl.spi.AdjRIBsOut;
 import org.opendaylight.protocol.bgp.rib.impl.spi.AdjRIBsOutRegistration;
 import org.opendaylight.protocol.bgp.rib.impl.spi.BGPDispatcher;
 import org.opendaylight.protocol.bgp.rib.impl.spi.RIB;
+import org.opendaylight.protocol.bgp.rib.spi.AbstractAdjRIBs;
 import org.opendaylight.protocol.bgp.rib.spi.AdjRIBsIn;
 import org.opendaylight.protocol.bgp.rib.spi.BGPObjectComparator;
 import org.opendaylight.protocol.bgp.rib.spi.Peer;
@@ -89,6 +91,35 @@ public final class RIBImpl extends DefaultRibReference implements AutoCloseable,
     private final Ipv4Address bgpIdentifier;
     private final List<BgpTableType> localTables;
     private final RIBTables tables;
+    private final BlockingQueue<Peer> peers;
+    private final Thread scheduler = new Thread(new Runnable() {
+
+        @Override
+        public void run() {
+            try {
+                final Peer peer = RIBImpl.this.peers.take();
+                LOG.debug("Advertizing loc-rib to new peer {}.", peer);
+                for (final BgpTableType key : RIBImpl.this.localTables) {
+                    final AdjRIBsTransactionImpl trans = new AdjRIBsTransactionImpl(RIBImpl.this.ribOuts, RIBImpl.this.comparator, RIBImpl.this.chain.newWriteOnlyTransaction());
+                    final AbstractAdjRIBs<?, ?, ?> adj = (AbstractAdjRIBs<?, ?, ?>) RIBImpl.this.tables.get(new TablesKey(key.getAfi(), key.getSafi()));
+                    adj.addAllEntries(trans);
+                    Futures.addCallback(trans.commit(), new FutureCallback<Void>() {
+                        @Override
+                        public void onSuccess(final Void result) {
+                            LOG.trace("Advertizing {} to peer {} committed successfully", key.getAfi(), peer);
+                        }
+
+                        @Override
+                        public void onFailure(final Throwable t) {
+                            LOG.error("Failed to update peer {} with RIB {}", peer, t);
+                        }
+                    });
+                }
+            } catch (final InterruptedException e) {
+
+            }
+        }
+    });
 
     public RIBImpl(final RibId ribId, final AsNumber localAs, final Ipv4Address localBgpId, final RIBExtensionConsumerContext extensions,
             final BGPDispatcher dispatcher, final ReconnectStrategyFactory tcpStrategyFactory,
@@ -103,6 +134,7 @@ public final class RIBImpl extends DefaultRibReference implements AutoCloseable,
         this.tcpStrategyFactory = Preconditions.checkNotNull(tcpStrategyFactory);
         this.localTables = ImmutableList.copyOf(localTables);
         this.tables = new RIBTables(extensions);
+        this.peers = new LinkedBlockingQueue<>();
 
         LOG.debug("Instantiating RIB table {} at {}", ribId, getInstanceIdentifier());
 
@@ -319,8 +351,14 @@ public final class RIBImpl extends DefaultRibReference implements AutoCloseable,
             }
         };
 
-        ribOuts.put(peer, aro);
-        // FIXME: schedule a walk over all the tables
+        this.ribOuts.put(peer, aro);
+        LOG.debug("Registering this peer {} to RIB-Out {}", peer, this.ribOuts);
+        try {
+            this.peers.put(peer);
+            this.scheduler.run();
+        } catch (final InterruptedException e) {
+            //
+        }
         return reg;
     }
 
index 5a96fe8701d6e360be3608e64b1412ca2e9f5c6d..f9ce6f1373fd28c62033b590b39f23d7f6f7a6f2 100644 (file)
@@ -14,7 +14,7 @@ import com.google.common.base.Preconditions;
 import java.util.HashMap;
 import java.util.Iterator;
 import java.util.Map;
-
+import java.util.Map.Entry;
 import javax.annotation.concurrent.GuardedBy;
 import javax.annotation.concurrent.ThreadSafe;
 
@@ -166,16 +166,15 @@ public abstract class AbstractAdjRIBs<I, D extends Identifiable<K> & Route, K ex
         this.basePath = Preconditions.checkNotNull(basePath);
         this.tableType = new BgpTableTypeImpl(basePath.getKey().getAfi(), basePath.getKey().getSafi());
         this.eor = new UpdateBuilder().setPathAttributes(new PathAttributesBuilder().addAugmentation(
-                PathAttributes1.class, new PathAttributes1Builder().setMpReachNlri(new MpReachNlriBuilder(this.tableType)
-                    .build()).build()).build()).build();
-
+            PathAttributes1.class, new PathAttributes1Builder().setMpReachNlri(new MpReachNlriBuilder(this.tableType)
+                .build()).build()).build()).build();
     }
 
     @Override
     public final synchronized void clear(final AdjRIBsTransaction trans, final Peer peer) {
-        final Iterator<Map.Entry<I, RIBEntry>> i = this.entries.entrySet().iterator();
+        final Iterator<Entry<I, RIBEntry>> i = this.entries.entrySet().iterator();
         while (i.hasNext()) {
-            final Map.Entry<I, RIBEntry> e = i.next();
+            final Entry<I, RIBEntry> e = i.next();
 
             if (e.getValue().removeState(trans, peer)) {
                 i.remove();
@@ -183,7 +182,15 @@ public abstract class AbstractAdjRIBs<I, D extends Identifiable<K> & Route, K ex
         }
 
         this.peers.remove(peer);
-        trans.setUptodate(basePath, !this.peers.values().contains(Boolean.FALSE));
+        trans.setUptodate(this.basePath, !this.peers.values().contains(Boolean.FALSE));
+    }
+
+    public final synchronized void addAllEntries(final AdjRIBsTransaction trans) {
+        for (final Entry<I, RIBEntry> e : this.entries.entrySet()) {
+            final RIBEntry entry = e.getValue();
+            final RIBEntryData<I, D, K> state = entry.currentState;
+            trans.advertise(this, e.getKey(), entry.name, state.peer, state.getDataObject(entry.key, entry.name.getKey()));
+        }
     }
 
     /**
@@ -272,7 +279,7 @@ public abstract class AbstractAdjRIBs<I, D extends Identifiable<K> & Route, K ex
             pab.fieldsFrom(route.getAttributes());
             pab.addAugmentation(PathAttributes1.class, new PathAttributes1Builder().setMpReachNlri(reach.build()).build()).build();
         } else {
-            final MpUnreachNlriBuilder unreach = new MpUnreachNlriBuilder(tableType);
+            final MpUnreachNlriBuilder unreach = new MpUnreachNlriBuilder(this.tableType);
             addWithdrawal(unreach, (I)key);
             pab.addAugmentation(PathAttributes2.class, new PathAttributes2Builder().setMpUnreachNlri(unreach.build()).build()).build();
         }