import com.google.common.base.Preconditions;
import com.google.common.util.concurrent.CheckedFuture;
-
import java.util.Map;
import java.util.Map.Entry;
-
import org.opendaylight.controller.md.sal.binding.api.WriteTransaction;
import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException;
@Override
public void setUptodate(final InstanceIdentifier<Tables> basePath, final boolean uptodate) {
final InstanceIdentifier<Attributes> aid = basePath.child(Attributes.class);
- trans.merge(LogicalDatastoreType.OPERATIONAL, aid, new AttributesBuilder().setUptodate(uptodate).build());
+ this.trans.merge(LogicalDatastoreType.OPERATIONAL, aid, new AttributesBuilder().setUptodate(uptodate).build());
LOG.debug("Table {} switching uptodate to {}", basePath, uptodate);
}
public CheckedFuture<Void, TransactionCommitFailedException> commit() {
- return trans.submit();
+ return this.trans.submit();
}
@Override
public BGPObjectComparator comparator() {
- return comparator;
+ return this.comparator;
}
@Override
- public <K, V extends Route> void advertise(final RouteEncoder ribOut, final K key, final InstanceIdentifier<V> id, final Peer peer, final V obj) {
- trans.put(LogicalDatastoreType.OPERATIONAL, id, obj, true);
- for (Entry<Peer, AdjRIBsOut> e : ribs.entrySet()) {
- if (e.getKey() != peer) {
+ public <K, V extends Route> void advertise(final RouteEncoder ribOut, final K key, final InstanceIdentifier<V> id, final Peer advertizingPeer, final V obj) {
+ this.trans.put(LogicalDatastoreType.OPERATIONAL, id, obj, true);
+ for (final Entry<Peer, AdjRIBsOut> e : this.ribs.entrySet()) {
+ if (e.getKey() != advertizingPeer) {
e.getValue().put(ribOut, key, obj);
+ LOG.trace("Advertizing to peer {}", e.getKey());
+ } else {
+ LOG.trace("Not advertizing to peer {}", e.getKey());
}
}
}
@Override
public <K, V extends Route> void withdraw(final RouteEncoder ribOut, final K key, final InstanceIdentifier<V> id) {
- trans.delete(LogicalDatastoreType.OPERATIONAL, id);
- for (AdjRIBsOut r : ribs.values()) {
+ this.trans.delete(LogicalDatastoreType.OPERATIONAL, id);
+ for (final AdjRIBsOut r : this.ribs.values()) {
r.put(ribOut, key, null);
}
}
import com.google.common.base.Preconditions;
import com.google.common.collect.Sets;
import io.netty.channel.Channel;
+import io.netty.channel.ChannelFuture;
+import io.netty.channel.ChannelFutureListener;
import java.io.IOException;
import java.util.Date;
import java.util.Set;
} else if (msg instanceof Notify) {
// Notifications are handled internally
LOG.info("Session closed because Notification message received: {} / {}", ((Notify) msg).getErrorCode(),
- ((Notify) msg).getErrorSubcode());
+ ((Notify) msg).getErrorSubcode());
this.closeWithoutMessage();
this.listener.onSessionTerminated(this, new BGPTerminationReason(BGPError.forValue(((Notify) msg).getErrorCode(),
- ((Notify) msg).getErrorSubcode())));
+ ((Notify) msg).getErrorSubcode())));
} else if (msg instanceof Keepalive) {
// Keepalives are handled internally
LOG.trace("Received KeepAlive messsage.");
synchronized void sendMessage(final Notification msg) {
try {
- this.channel.writeAndFlush(msg);
+ this.channel.writeAndFlush(msg).addListener(
+ new ChannelFutureListener() {
+ @Override
+ public void operationComplete(final ChannelFuture f) {
+ if (!f.isSuccess()) {
+ LOG.info("Failed to send message {} to socket {}", msg, f.cause(), BGPSessionImpl.this.channel);
+ } else {
+ LOG.trace("Message {} sent to socket {}", msg, BGPSessionImpl.this.channel);
+ }
+ }
+ });
this.lastMessageSentAt = System.nanoTime();
- LOG.debug("Sent message: {}", msg);
+ LOG.debug("Sent message: {} to peer {}", msg, this.bgpId);
} catch (final Exception e) {
LOG.warn("Message {} was not sent.", msg, e);
}
}
synchronized boolean isWritable() {
- return channel != null && channel.isWritable();
+ return this.channel != null && this.channel.isWritable();
}
synchronized void schedule(final Runnable task) {
- Preconditions.checkState(channel != null);
- channel.eventLoop().submit(task);
+ Preconditions.checkState(this.channel != null);
+ this.channel.eventLoop().submit(task);
}
import java.util.Collections;
import java.util.List;
+import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutionException;
-
+import java.util.concurrent.LinkedBlockingQueue;
import javax.annotation.concurrent.ThreadSafe;
import org.opendaylight.controller.md.sal.binding.api.BindingTransactionChain;
import org.opendaylight.protocol.bgp.rib.impl.spi.AdjRIBsOutRegistration;
import org.opendaylight.protocol.bgp.rib.impl.spi.BGPDispatcher;
import org.opendaylight.protocol.bgp.rib.impl.spi.RIB;
+import org.opendaylight.protocol.bgp.rib.spi.AbstractAdjRIBs;
import org.opendaylight.protocol.bgp.rib.spi.AdjRIBsIn;
import org.opendaylight.protocol.bgp.rib.spi.BGPObjectComparator;
import org.opendaylight.protocol.bgp.rib.spi.Peer;
private final Ipv4Address bgpIdentifier;
private final List<BgpTableType> localTables;
private final RIBTables tables;
+ private final BlockingQueue<Peer> peers;
+ private final Thread scheduler = new Thread(new Runnable() {
+
+ @Override
+ public void run() {
+ try {
+ final Peer peer = RIBImpl.this.peers.take();
+ LOG.debug("Advertizing loc-rib to new peer {}.", peer);
+ for (final BgpTableType key : RIBImpl.this.localTables) {
+ final AdjRIBsTransactionImpl trans = new AdjRIBsTransactionImpl(RIBImpl.this.ribOuts, RIBImpl.this.comparator, RIBImpl.this.chain.newWriteOnlyTransaction());
+ final AbstractAdjRIBs<?, ?, ?> adj = (AbstractAdjRIBs<?, ?, ?>) RIBImpl.this.tables.get(new TablesKey(key.getAfi(), key.getSafi()));
+ adj.addAllEntries(trans);
+ Futures.addCallback(trans.commit(), new FutureCallback<Void>() {
+ @Override
+ public void onSuccess(final Void result) {
+ LOG.trace("Advertizing {} to peer {} committed successfully", key.getAfi(), peer);
+ }
+
+ @Override
+ public void onFailure(final Throwable t) {
+ LOG.error("Failed to update peer {} with RIB {}", peer, t);
+ }
+ });
+ }
+ } catch (final InterruptedException e) {
+
+ }
+ }
+ });
public RIBImpl(final RibId ribId, final AsNumber localAs, final Ipv4Address localBgpId, final RIBExtensionConsumerContext extensions,
final BGPDispatcher dispatcher, final ReconnectStrategyFactory tcpStrategyFactory,
this.tcpStrategyFactory = Preconditions.checkNotNull(tcpStrategyFactory);
this.localTables = ImmutableList.copyOf(localTables);
this.tables = new RIBTables(extensions);
+ this.peers = new LinkedBlockingQueue<>();
LOG.debug("Instantiating RIB table {} at {}", ribId, getInstanceIdentifier());
}
};
- ribOuts.put(peer, aro);
- // FIXME: schedule a walk over all the tables
+ this.ribOuts.put(peer, aro);
+ LOG.debug("Registering this peer {} to RIB-Out {}", peer, this.ribOuts);
+ try {
+ this.peers.put(peer);
+ this.scheduler.run();
+ } catch (final InterruptedException e) {
+ //
+ }
return reg;
}
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
-
+import java.util.Map.Entry;
import javax.annotation.concurrent.GuardedBy;
import javax.annotation.concurrent.ThreadSafe;
this.basePath = Preconditions.checkNotNull(basePath);
this.tableType = new BgpTableTypeImpl(basePath.getKey().getAfi(), basePath.getKey().getSafi());
this.eor = new UpdateBuilder().setPathAttributes(new PathAttributesBuilder().addAugmentation(
- PathAttributes1.class, new PathAttributes1Builder().setMpReachNlri(new MpReachNlriBuilder(this.tableType)
- .build()).build()).build()).build();
-
+ PathAttributes1.class, new PathAttributes1Builder().setMpReachNlri(new MpReachNlriBuilder(this.tableType)
+ .build()).build()).build()).build();
}
@Override
public final synchronized void clear(final AdjRIBsTransaction trans, final Peer peer) {
- final Iterator<Map.Entry<I, RIBEntry>> i = this.entries.entrySet().iterator();
+ final Iterator<Entry<I, RIBEntry>> i = this.entries.entrySet().iterator();
while (i.hasNext()) {
- final Map.Entry<I, RIBEntry> e = i.next();
+ final Entry<I, RIBEntry> e = i.next();
if (e.getValue().removeState(trans, peer)) {
i.remove();
}
this.peers.remove(peer);
- trans.setUptodate(basePath, !this.peers.values().contains(Boolean.FALSE));
+ trans.setUptodate(this.basePath, !this.peers.values().contains(Boolean.FALSE));
+ }
+
+ public final synchronized void addAllEntries(final AdjRIBsTransaction trans) {
+ for (final Entry<I, RIBEntry> e : this.entries.entrySet()) {
+ final RIBEntry entry = e.getValue();
+ final RIBEntryData<I, D, K> state = entry.currentState;
+ trans.advertise(this, e.getKey(), entry.name, state.peer, state.getDataObject(entry.key, entry.name.getKey()));
+ }
}
/**
pab.fieldsFrom(route.getAttributes());
pab.addAugmentation(PathAttributes1.class, new PathAttributes1Builder().setMpReachNlri(reach.build()).build()).build();
} else {
- final MpUnreachNlriBuilder unreach = new MpUnreachNlriBuilder(tableType);
+ final MpUnreachNlriBuilder unreach = new MpUnreachNlriBuilder(this.tableType);
addWithdrawal(unreach, (I)key);
pab.addAugmentation(PathAttributes2.class, new PathAttributes2Builder().setMpUnreachNlri(unreach.build()).build()).build();
}