* terms of the Eclipse Public License v1.0 which accompanies this distribution,
* and is available at http://www.eclipse.org/legal/epl-v10.html
*/
-
package org.opendaylight.lispflowmapping.southbound;
import static io.netty.buffer.Unpooled.wrappedBuffer;
+import static java.util.Objects.requireNonNull;
-import com.google.common.base.Preconditions;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import io.netty.bootstrap.Bootstrap;
import io.netty.buffer.ByteBufUtil;
import io.netty.buffer.PooledByteBufAllocator;
import io.netty.channel.Channel;
-import io.netty.channel.ChannelFuture;
-import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.epoll.Epoll;
import io.netty.channel.epoll.EpollDatagramChannel;
import io.netty.channel.epoll.EpollEventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
+import io.netty.channel.socket.DatagramChannel;
import io.netty.channel.socket.DatagramPacket;
import io.netty.channel.socket.nio.NioDatagramChannel;
import io.netty.util.concurrent.DefaultThreadFactory;
import java.nio.ByteBuffer;
import java.util.List;
import java.util.concurrent.ThreadFactory;
-import org.opendaylight.controller.md.sal.binding.api.DataBroker;
-import org.opendaylight.controller.md.sal.binding.api.NotificationPublishService;
+import javax.annotation.PostConstruct;
+import javax.annotation.PreDestroy;
+import javax.inject.Inject;
+import javax.inject.Singleton;
import org.opendaylight.lispflowmapping.dsbackend.DataStoreBackEnd;
import org.opendaylight.lispflowmapping.inmemorydb.HashMapDb;
import org.opendaylight.lispflowmapping.lisp.type.LispMessage;
import org.opendaylight.lispflowmapping.southbound.lisp.LispXtrSouthboundHandler;
import org.opendaylight.lispflowmapping.southbound.lisp.cache.MapRegisterCache;
import org.opendaylight.lispflowmapping.type.sbplugin.IConfigLispSouthboundPlugin;
-import org.opendaylight.mdsal.singleton.common.api.ClusterSingletonService;
-import org.opendaylight.mdsal.singleton.common.api.ClusterSingletonServiceProvider;
-import org.opendaylight.mdsal.singleton.common.api.ServiceGroupIdentifier;
+import org.opendaylight.mdsal.binding.api.DataBroker;
+import org.opendaylight.mdsal.binding.api.NotificationPublishService;
+import org.opendaylight.mdsal.singleton.api.ClusterSingletonService;
+import org.opendaylight.mdsal.singleton.api.ClusterSingletonServiceProvider;
+import org.opendaylight.mdsal.singleton.api.ServiceGroupIdentifier;
import org.opendaylight.yang.gen.v1.urn.opendaylight.lfm.inet.binary.types.rev160303.IpAddressBinary;
import org.opendaylight.yang.gen.v1.urn.opendaylight.lfm.lisp.proto.rev151105.MessageType;
import org.opendaylight.yang.gen.v1.urn.opendaylight.lfm.lisp.proto.rev151105.eid.container.Eid;
import org.opendaylight.yang.gen.v1.urn.opendaylight.lfm.lisp.proto.rev151105.mapping.authkey.container.MappingAuthkey;
import org.opendaylight.yang.gen.v1.urn.opendaylight.lfm.lisp.proto.rev151105.transport.address.TransportAddress;
import org.opendaylight.yang.gen.v1.urn.opendaylight.lfm.mappingservice.rev150906.db.instance.AuthenticationKey;
+import org.opendaylight.yangtools.concepts.Registration;
import org.opendaylight.yangtools.yang.binding.Notification;
+import org.osgi.service.component.annotations.Activate;
+import org.osgi.service.component.annotations.Component;
+import org.osgi.service.component.annotations.Deactivate;
+import org.osgi.service.component.annotations.Reference;
+import org.osgi.service.metatype.annotations.AttributeDefinition;
+import org.osgi.service.metatype.annotations.Designate;
+import org.osgi.service.metatype.annotations.ObjectClassDefinition;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+@Singleton
+@Component(immediate = true, property = "type=default", configurationPid = "org.opendaylight.lispflowmapping",
+ service = { IConfigLispSouthboundPlugin.class, LispSouthboundPlugin.class })
+@Designate(ocd = LispSouthboundPlugin.Configuration.class)
public class LispSouthboundPlugin implements IConfigLispSouthboundPlugin, AutoCloseable, ClusterSingletonService {
+ @ObjectClassDefinition
+ public @interface Configuration {
+ @AttributeDefinition()
+ String bindingAddress() default DEFAULT_BINDING_ADDRESS;
+
+ @AttributeDefinition()
+ boolean mapRegisterCacheEnabled() default true;
+
+ @AttributeDefinition()
+ long mapRegisterCacheTimeout() default DEFAULT_MAP_REGISTER_CACHE_TIMEOUT;
+ }
+
protected static final Logger LOG = LoggerFactory.getLogger(LispSouthboundPlugin.class);
public static final String LISPFLOWMAPPING_ENTITY_NAME = "lispflowmapping";
- public static final ServiceGroupIdentifier SERVICE_GROUP_IDENTIFIER = ServiceGroupIdentifier.create(
- LISPFLOWMAPPING_ENTITY_NAME);
+ public static final ServiceGroupIdentifier SERVICE_GROUP_IDENTIFIER =
+ new ServiceGroupIdentifier(LISPFLOWMAPPING_ENTITY_NAME);
+
+ private static final String DEFAULT_BINDING_ADDRESS = "0.0.0.0";
+ private static final long DEFAULT_MAP_REGISTER_CACHE_TIMEOUT = 90000;
private volatile boolean isMaster = false;
private volatile String bindingAddress;
private AuthKeyDb akdb;
- private MapRegisterCache mapRegisterCache = new MapRegisterCache();
- private boolean mapRegisterCacheEnabled;
- private long mapRegisterCacheTimeout;
+ private final MapRegisterCache mapRegisterCache = new MapRegisterCache();
+ private final boolean mapRegisterCacheEnabled;
+ private final long mapRegisterCacheTimeout;
private static Object startLock = new Object();
+
+ private final DataBroker dataBroker;
+ private final NotificationPublishService notificationPublishService;
private final ClusterSingletonServiceProvider clusterSingletonService;
+
private LispSouthboundHandler lispSouthboundHandler;
- private LispXtrSouthboundHandler lispXtrSouthboundHandler;
- private NotificationPublishService notificationPublishService;
private int numChannels = 1;
- private Channel[] channel;
+ private final Channel[] channel;
private Channel xtrChannel;
- private Class channelType;
private volatile int xtrPort = LispMessage.XTR_PORT_NUM;
private volatile boolean listenOnXtrPort = false;
- private ConcurrentLispSouthboundStats statistics = new ConcurrentLispSouthboundStats();
- private Bootstrap bootstrap = new Bootstrap();
- private Bootstrap xtrBootstrap = new Bootstrap();
- private ThreadFactory threadFactory = new DefaultThreadFactory("lisp-sb");
+ private final ConcurrentLispSouthboundStats statistics = new ConcurrentLispSouthboundStats();
+ private final Bootstrap bootstrap = new Bootstrap();
+ private final Bootstrap xtrBootstrap = new Bootstrap();
+ private final ThreadFactory threadFactory = new DefaultThreadFactory("lisp-sb");
private EventLoopGroup eventLoopGroup;
- private DataBroker dataBroker;
private AuthenticationKeyDataListener authenticationKeyDataListener;
private DataStoreBackEnd dsbe;
+ private Registration cssReg;
+ @Inject
public LispSouthboundPlugin(final DataBroker dataBroker,
final NotificationPublishService notificationPublishService,
final ClusterSingletonServiceProvider clusterSingletonService) {
+ this(dataBroker, notificationPublishService, clusterSingletonService, DEFAULT_BINDING_ADDRESS, true,
+ DEFAULT_MAP_REGISTER_CACHE_TIMEOUT);
+ }
+
+ @Activate
+ public LispSouthboundPlugin(@Reference final DataBroker dataBroker,
+ @Reference final NotificationPublishService notificationPublishService,
+ @Reference final ClusterSingletonServiceProvider clusterSingletonService,
+ final Configuration configuration) {
+ this(dataBroker, notificationPublishService, clusterSingletonService, configuration.bindingAddress(),
+ configuration.mapRegisterCacheEnabled(), configuration.mapRegisterCacheTimeout());
+ init();
+ }
+
+ public LispSouthboundPlugin(final DataBroker dataBroker,
+ final NotificationPublishService notificationPublishService,
+ final ClusterSingletonServiceProvider clusterSingletonService,
+ final String bindingAddress, final boolean mapRegisterCacheEnabled, final long mapRegisterCacheTimeout) {
+ LOG.info("LISP (RFC6830) Southbound Plugin is initializing...");
this.dataBroker = dataBroker;
this.notificationPublishService = notificationPublishService;
this.clusterSingletonService = clusterSingletonService;
+ this.bindingAddress = bindingAddress;
+ this.mapRegisterCacheEnabled = mapRegisterCacheEnabled;
+ this.mapRegisterCacheTimeout = mapRegisterCacheTimeout;
+
if (Epoll.isAvailable()) {
// When lispflowmapping is under heavy load, there are usually two threads nearing 100% CPU core
// utilization. In order to have some headroom, we reserve 3 cores for "other" tasks, and allow the
channel = new Channel[numChannels];
}
+ @PostConstruct
public void init() {
- LOG.info("LISP (RFC6830) Southbound Plugin is initializing...");
synchronized (startLock) {
- this.akdb = new AuthKeyDb(new HashMapDb());
- this.authenticationKeyDataListener = new AuthenticationKeyDataListener(dataBroker, akdb);
- this.dsbe = new DataStoreBackEnd(dataBroker);
+ akdb = new AuthKeyDb(new HashMapDb());
+ authenticationKeyDataListener = new AuthenticationKeyDataListener(dataBroker, akdb);
+ dsbe = new DataStoreBackEnd(dataBroker);
restoreDaoFromDatastore();
- LispSouthboundHandler lispSouthboundHandler = new LispSouthboundHandler(this);
- this.lispSouthboundHandler = lispSouthboundHandler;
-
- LispXtrSouthboundHandler lispXtrSouthboundHandler = new LispXtrSouthboundHandler(this);
- this.lispXtrSouthboundHandler = lispXtrSouthboundHandler;
-
+ final Class<? extends DatagramChannel> channelType;
if (Epoll.isAvailable()) {
eventLoopGroup = new EpollEventLoopGroup(numChannels, threadFactory);
channelType = EpollDatagramChannel.class;
bootstrap.group(eventLoopGroup);
bootstrap.channel(channelType);
+ lispSouthboundHandler = new LispSouthboundHandler(this);
bootstrap.handler(lispSouthboundHandler);
xtrBootstrap.group(eventLoopGroup);
xtrBootstrap.channel(channelType);
- xtrBootstrap.handler(lispXtrSouthboundHandler);
+ xtrBootstrap.handler(new LispXtrSouthboundHandler(this));
start();
startXtr();
- clusterSingletonService.registerClusterSingletonService(this);
- LOG.info("LISP (RFC6830) Southbound Plugin is up!");
+ cssReg = clusterSingletonService.registerClusterSingletonService(this);
}
+
+ LOG.info("LISP (RFC6830) Southbound Plugin is up!");
}
@SuppressWarnings("checkstyle:IllegalCatch")
private void unloadActions() {
lispSouthboundHandler = null;
- lispXtrSouthboundHandler = null;
stop();
stopXtr();
}
}
- public void handleSerializedLispBuffer(TransportAddress address, ByteBuffer outBuffer,
+ public void handleSerializedLispBuffer(final TransportAddress address, final ByteBuffer outBuffer,
final MessageType packetType) {
InetAddress ip = getInetAddress(address);
- handleSerializedLispBuffer(ip, outBuffer, packetType, address.getPort().getValue(), null);
+ handleSerializedLispBuffer(ip, outBuffer, packetType, address.getPort().getValue().toJava(), null);
}
- public void handleSerializedLispBuffer(InetAddress address, ByteBuffer outBuffer,
+ public void handleSerializedLispBuffer(final InetAddress address, final ByteBuffer outBuffer,
final MessageType packetType, final int portNumber, Channel senderChannel) {
if (senderChannel == null) {
- senderChannel = this.channel[0];
+ senderChannel = channel[0];
}
InetSocketAddress recipient = new InetSocketAddress(address, portNumber);
outBuffer.position(0);
if (LOG.isTraceEnabled()) {
LOG.trace("Buffer:\n{}", ByteBufUtil.prettyHexDump(data));
}
- senderChannel.write(packet).addListener(new ChannelFutureListener() {
- @Override
- public void operationComplete(ChannelFuture future) {
- if (future.isSuccess()) {
- LOG.trace("Success");
- statistics.incrementTx(packetType.getIntValue());
- } else {
- LOG.warn("Failed to send packet");
- statistics.incrementTxErrors();
- }
+ senderChannel.write(packet).addListener(future -> {
+ if (future.isSuccess()) {
+ LOG.trace("Success");
+ statistics.incrementTx(packetType.getIntValue());
+ } else {
+ LOG.warn("Failed to send packet");
+ statistics.incrementTxErrors();
}
});
senderChannel.flush();
}
- private InetAddress getInetAddress(TransportAddress address) {
- Preconditions.checkNotNull(address, "TransportAddress must not be null");
+ private static InetAddress getInetAddress(final TransportAddress address) {
+ requireNonNull(address, "TransportAddress must not be null");
IpAddressBinary ip = address.getIpAddress();
try {
if (ip.getIpv4AddressBinary() != null) {
@Override
@SuppressWarnings("checkstyle:IllegalCatch")
- public void setLispAddress(String address) {
+ public void setLispAddress(final String address) {
synchronized (startLock) {
if (bindingAddress.equals(address)) {
LOG.debug("Configured LISP binding address didn't change.");
}
@Override
- public void shouldListenOnXtrPort(boolean shouldListenOnXtrPort) {
+ public void shouldListenOnXtrPort(final boolean shouldListenOnXtrPort) {
listenOnXtrPort = shouldListenOnXtrPort;
if (listenOnXtrPort) {
restartXtr();
}
@Override
- public void setXtrPort(int port) {
- this.xtrPort = port;
+ public void setXtrPort(final int port) {
+ xtrPort = port;
if (listenOnXtrPort) {
restartXtr();
}
}
- public void setMapRegisterCacheEnabled(final boolean mapRegisterCacheEnabled) {
- this.mapRegisterCacheEnabled = mapRegisterCacheEnabled;
- if (mapRegisterCacheEnabled) {
- LOG.info("Enabling Map-Register cache");
- } else {
- LOG.info("Disabling Map-Register cache");
- }
- }
-
- public void setMapRegisterCacheTimeout(long mapRegisterCacheTimeout) {
- this.mapRegisterCacheTimeout = mapRegisterCacheTimeout;
- }
-
- public void setBindingAddress(String bindingAddress) {
- this.bindingAddress = bindingAddress;
- }
-
+ @Deactivate
+ @PreDestroy
@Override
public void close() throws Exception {
eventLoopGroup.shutdownGracefully();
lispSouthboundHandler.close();
unloadActions();
- clusterSingletonService.close();
+ if (cssReg != null) {
+ cssReg.close();
+ }
dsbe.closeTransactionChain();
}
@Override
public void instantiateServiceInstance() {
- this.isMaster = true;
+ isMaster = true;
}
@Override
public ListenableFuture<Void> closeServiceInstance() {
- this.isMaster = false;
+ isMaster = false;
return Futures.<Void>immediateFuture(null);
}