import io.netty.buffer.ByteBufUtil;
import io.netty.buffer.PooledByteBufAllocator;
import io.netty.channel.Channel;
-import io.netty.channel.ChannelFuture;
-import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.epoll.Epoll;
import java.net.InetSocketAddress;
import java.net.UnknownHostException;
import java.nio.ByteBuffer;
+import java.util.List;
import java.util.concurrent.ThreadFactory;
-import org.opendaylight.controller.md.sal.binding.api.DataBroker;
-import org.opendaylight.controller.md.sal.binding.api.NotificationPublishService;
import org.opendaylight.lispflowmapping.dsbackend.DataStoreBackEnd;
import org.opendaylight.lispflowmapping.inmemorydb.HashMapDb;
import org.opendaylight.lispflowmapping.lisp.type.LispMessage;
-import org.opendaylight.lispflowmapping.mapcache.SimpleMapCache;
+import org.opendaylight.lispflowmapping.lisp.util.LispAddressStringifier;
+import org.opendaylight.lispflowmapping.mapcache.AuthKeyDb;
import org.opendaylight.lispflowmapping.southbound.lisp.AuthenticationKeyDataListener;
import org.opendaylight.lispflowmapping.southbound.lisp.LispSouthboundHandler;
import org.opendaylight.lispflowmapping.southbound.lisp.LispXtrSouthboundHandler;
import org.opendaylight.lispflowmapping.southbound.lisp.cache.MapRegisterCache;
import org.opendaylight.lispflowmapping.type.sbplugin.IConfigLispSouthboundPlugin;
+import org.opendaylight.mdsal.binding.api.DataBroker;
+import org.opendaylight.mdsal.binding.api.NotificationPublishService;
import org.opendaylight.mdsal.singleton.common.api.ClusterSingletonService;
import org.opendaylight.mdsal.singleton.common.api.ClusterSingletonServiceProvider;
import org.opendaylight.mdsal.singleton.common.api.ServiceGroupIdentifier;
import org.opendaylight.yang.gen.v1.urn.opendaylight.lfm.inet.binary.types.rev160303.IpAddressBinary;
import org.opendaylight.yang.gen.v1.urn.opendaylight.lfm.lisp.proto.rev151105.MessageType;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.lfm.lisp.proto.rev151105.eid.container.Eid;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.lfm.lisp.proto.rev151105.mapping.authkey.container.MappingAuthkey;
import org.opendaylight.yang.gen.v1.urn.opendaylight.lfm.lisp.proto.rev151105.transport.address.TransportAddress;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.lfm.mappingservice.rev150906.db.instance.AuthenticationKey;
+import org.opendaylight.yangtools.yang.binding.Notification;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public static final ServiceGroupIdentifier SERVICE_GROUP_IDENTIFIER = ServiceGroupIdentifier.create(
LISPFLOWMAPPING_ENTITY_NAME);
+ private volatile boolean isMaster = false;
private volatile String bindingAddress;
- private SimpleMapCache smc;
- private MapRegisterCache mapRegisterCache = new MapRegisterCache();
+ private AuthKeyDb akdb;
+ private final MapRegisterCache mapRegisterCache = new MapRegisterCache();
private boolean mapRegisterCacheEnabled;
private long mapRegisterCacheTimeout;
private final ClusterSingletonServiceProvider clusterSingletonService;
private LispSouthboundHandler lispSouthboundHandler;
private LispXtrSouthboundHandler lispXtrSouthboundHandler;
- private NotificationPublishService notificationPublishService;
+ private final NotificationPublishService notificationPublishService;
private int numChannels = 1;
- private Channel[] channel;
+ private final Channel[] channel;
private Channel xtrChannel;
private Class channelType;
private volatile int xtrPort = LispMessage.XTR_PORT_NUM;
private volatile boolean listenOnXtrPort = false;
- private ConcurrentLispSouthboundStats statistics = new ConcurrentLispSouthboundStats();
- private Bootstrap bootstrap = new Bootstrap();
- private Bootstrap xtrBootstrap = new Bootstrap();
- private ThreadFactory threadFactory = new DefaultThreadFactory("lisp-sb");
+ private final ConcurrentLispSouthboundStats statistics = new ConcurrentLispSouthboundStats();
+ private final Bootstrap bootstrap = new Bootstrap();
+ private final Bootstrap xtrBootstrap = new Bootstrap();
+ private final ThreadFactory threadFactory = new DefaultThreadFactory("lisp-sb");
private EventLoopGroup eventLoopGroup;
- private DataBroker dataBroker;
+ private final DataBroker dataBroker;
private AuthenticationKeyDataListener authenticationKeyDataListener;
private DataStoreBackEnd dsbe;
this.dataBroker = dataBroker;
this.notificationPublishService = notificationPublishService;
this.clusterSingletonService = clusterSingletonService;
- this.clusterSingletonService.registerClusterSingletonService(this);
if (Epoll.isAvailable()) {
// When lispflowmapping is under heavy load, there are usually two threads nearing 100% CPU core
// utilization. In order to have some headroom, we reserve 3 cores for "other" tasks, and allow the
public void init() {
LOG.info("LISP (RFC6830) Southbound Plugin is initializing...");
synchronized (startLock) {
- this.smc = new SimpleMapCache(new HashMapDb());
- this.authenticationKeyDataListener = new AuthenticationKeyDataListener(dataBroker, smc);
+ this.akdb = new AuthKeyDb(new HashMapDb());
+ this.authenticationKeyDataListener = new AuthenticationKeyDataListener(dataBroker, akdb);
this.dsbe = new DataStoreBackEnd(dataBroker);
+ restoreDaoFromDatastore();
- lispSouthboundHandler = new LispSouthboundHandler(this);
- lispSouthboundHandler.setDataBroker(dataBroker);
- lispSouthboundHandler.setNotificationProvider(notificationPublishService);
- lispSouthboundHandler.setSimpleMapCache(smc);
- lispSouthboundHandler.setMapRegisterCache(mapRegisterCache);
- lispSouthboundHandler.setMapRegisterCacheTimeout(mapRegisterCacheTimeout);
- lispSouthboundHandler.setAuthenticationKeyDataListener(authenticationKeyDataListener);
- lispSouthboundHandler.setDataStoreBackEnd(dsbe);
- lispSouthboundHandler.setStats(statistics);
- lispSouthboundHandler.restoreDaoFromDatastore();
+ LispSouthboundHandler lsbh = new LispSouthboundHandler(this);
+ this.lispSouthboundHandler = lsbh;
- lispXtrSouthboundHandler = new LispXtrSouthboundHandler();
- lispXtrSouthboundHandler.setNotificationProvider(notificationPublishService);
+ LispXtrSouthboundHandler lxsbh = new LispXtrSouthboundHandler(this);
+ this.lispXtrSouthboundHandler = lxsbh;
if (Epoll.isAvailable()) {
eventLoopGroup = new EpollEventLoopGroup(numChannels, threadFactory);
bootstrap.group(eventLoopGroup);
bootstrap.channel(channelType);
- bootstrap.handler(lispSouthboundHandler);
+ bootstrap.handler(lsbh);
xtrBootstrap.group(eventLoopGroup);
xtrBootstrap.channel(channelType);
- xtrBootstrap.handler(lispXtrSouthboundHandler);
+ xtrBootstrap.handler(lxsbh);
start();
startXtr();
+ clusterSingletonService.registerClusterSingletonService(this);
LOG.info("LISP (RFC6830) Southbound Plugin is up!");
}
}
LOG.info("LISP (RFC6830) Southbound Plugin is down!");
}
+ /**
+ * Restore all keys from MDSAL datastore.
+ */
+ public void restoreDaoFromDatastore() {
+ final List<AuthenticationKey> authKeys = dsbe.getAllAuthenticationKeys();
+ LOG.info("Restoring {} keys from datastore into southbound DAO", authKeys.size());
+
+ for (AuthenticationKey authKey : authKeys) {
+ final Eid key = authKey.getEid();
+ final MappingAuthkey mappingAuthkey = authKey.getMappingAuthkey();
+ LOG.debug("Adding authentication key '{}' with key-ID {} for {}", mappingAuthkey.getKeyString(),
+ mappingAuthkey.getKeyType(),
+ LispAddressStringifier.getString(key));
+ akdb.addAuthenticationKey(key, mappingAuthkey);
+ }
+ }
+
public void handleSerializedLispBuffer(TransportAddress address, ByteBuffer outBuffer,
final MessageType packetType) {
InetAddress ip = getInetAddress(address);
- handleSerializedLispBuffer(ip, outBuffer, packetType, address.getPort().getValue(), null);
+ handleSerializedLispBuffer(ip, outBuffer, packetType, address.getPort().getValue().toJava(), null);
}
public void handleSerializedLispBuffer(InetAddress address, ByteBuffer outBuffer,
if (LOG.isTraceEnabled()) {
LOG.trace("Buffer:\n{}", ByteBufUtil.prettyHexDump(data));
}
- senderChannel.write(packet).addListener(new ChannelFutureListener() {
- @Override
- public void operationComplete(ChannelFuture future) {
- if (future.isSuccess()) {
- LOG.trace("Success");
- statistics.incrementTx(packetType.getIntValue());
- } else {
- LOG.warn("Failed to send packet");
- statistics.incrementTxErrors();
- }
+ senderChannel.write(packet).addListener(future -> {
+ if (future.isSuccess()) {
+ LOG.trace("Success");
+ statistics.incrementTx(packetType.getIntValue());
+ } else {
+ LOG.warn("Failed to send packet");
+ statistics.incrementTxErrors();
}
});
senderChannel.flush();
return null;
}
- public ConcurrentLispSouthboundStats getStats() {
- return statistics;
- }
-
@Override
@SuppressWarnings("checkstyle:IllegalCatch")
public void setLispAddress(String address) {
lispSouthboundHandler.close();
unloadActions();
clusterSingletonService.close();
+ dsbe.closeTransactionChain();
}
@Override
public void instantiateServiceInstance() {
- if (lispSouthboundHandler != null) {
- lispSouthboundHandler.setNotificationProvider(notificationPublishService);
- lispSouthboundHandler.restoreDaoFromDatastore();
- lispSouthboundHandler.setIsMaster(true);
- }
- if (lispXtrSouthboundHandler != null) {
- lispXtrSouthboundHandler.setNotificationProvider(notificationPublishService);
- }
+ this.isMaster = true;
}
@Override
public ListenableFuture<Void> closeServiceInstance() {
- if (lispSouthboundHandler != null) {
- lispSouthboundHandler.setNotificationProvider(null);
- lispSouthboundHandler.setIsMaster(false);
- }
- if (lispXtrSouthboundHandler != null) {
- lispXtrSouthboundHandler.setNotificationProvider(null);
- }
+ this.isMaster = false;
return Futures.<Void>immediateFuture(null);
}
return SERVICE_GROUP_IDENTIFIER;
}
+ public synchronized void sendNotificationIfPossible(final Notification notification) throws InterruptedException {
+ if (isMaster && notificationPublishService != null) {
+ notificationPublishService.putNotification(notification);
+ LOG.trace("Publishing notification: {}", notification);
+ } else if (notificationPublishService == null) {
+ LOG.warn("Can't publish notification because no reference to publication service exists!");
+ }
+ }
+
+ public AuthKeyDb getAkdb() {
+ return akdb;
+ }
+
+ public ConcurrentLispSouthboundStats getStats() {
+ return statistics;
+ }
+
+ public DataBroker getDataBroker() {
+ return dataBroker;
+ }
+
+ public AuthenticationKeyDataListener getAuthenticationKeyDataListener() {
+ return authenticationKeyDataListener;
+ }
+
public MapRegisterCache getMapRegisterCache() {
return mapRegisterCache;
}