import static java.util.stream.Collectors.groupingBy;
import static java.util.stream.Collectors.toList;
import static java.util.stream.Collectors.toMap;
-import static org.opendaylight.mdsal.common.api.LogicalDatastoreType.CONFIGURATION;
+import static org.opendaylight.mdsal.binding.util.Datastore.CONFIGURATION;
-import com.google.common.util.concurrent.FutureCallback;
-import com.google.common.util.concurrent.Futures;
-import com.google.common.util.concurrent.MoreExecutors;
-import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.function.BiPredicate;
import java.util.function.Function;
import java.util.function.Predicate;
+import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
import javax.inject.Inject;
import javax.inject.Singleton;
-import org.opendaylight.genius.datastoreutils.SingleTransactionDataBroker;
+import org.opendaylight.genius.utils.batching.ResourceBatchingManager;
+import org.opendaylight.genius.utils.hwvtep.HwvtepHACache;
import org.opendaylight.genius.utils.hwvtep.HwvtepSouthboundUtils;
-import org.opendaylight.infrautils.metrics.Counter;
-import org.opendaylight.infrautils.metrics.Labeled;
-import org.opendaylight.infrautils.metrics.MetricDescriptor;
-import org.opendaylight.infrautils.metrics.MetricProvider;
import org.opendaylight.infrautils.utils.concurrent.Executors;
import org.opendaylight.mdsal.binding.api.DataBroker;
-import org.opendaylight.mdsal.binding.api.DataObjectModification;
-import org.opendaylight.mdsal.binding.api.ReadTransaction;
+import org.opendaylight.mdsal.binding.util.Datastore.Configuration;
+import org.opendaylight.mdsal.binding.util.ManagedNewTransactionRunner;
+import org.opendaylight.mdsal.binding.util.ManagedNewTransactionRunnerImpl;
+import org.opendaylight.mdsal.binding.util.TypedReadTransaction;
import org.opendaylight.mdsal.common.api.LogicalDatastoreType;
+import org.opendaylight.netvirt.elan.cache.ConfigMcastCache;
+import org.opendaylight.netvirt.elan.cache.ItmExternalTunnelCache;
import org.opendaylight.netvirt.elan.l2gw.ha.HwvtepHAUtil;
import org.opendaylight.netvirt.elan.l2gw.recovery.impl.L2GatewayConnectionInstanceRecoveryHandler;
import org.opendaylight.netvirt.elan.l2gw.recovery.impl.L2GatewayServiceRecoveryHandler;
import org.opendaylight.netvirt.elan.l2gw.utils.L2GatewayConnectionUtils;
import org.opendaylight.netvirt.elan.utils.Scheduler;
import org.opendaylight.netvirt.neutronvpn.api.l2gw.L2GatewayCache;
-import org.opendaylight.netvirt.neutronvpn.api.l2gw.L2GatewayDevice;
import org.opendaylight.serviceutils.srm.RecoverableListener;
import org.opendaylight.serviceutils.srm.ServiceRecoveryRegistry;
import org.opendaylight.serviceutils.tools.listener.AbstractClusteredAsyncDataTreeChangeListener;
-import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.inet.types.rev130715.IpAddress;
import org.opendaylight.yang.gen.v1.urn.opendaylight.neutron.l2gateways.rev150712.l2gateway.connections.attributes.L2gatewayConnections;
import org.opendaylight.yang.gen.v1.urn.opendaylight.neutron.l2gateways.rev150712.l2gateway.connections.attributes.l2gatewayconnections.L2gatewayConnection;
import org.opendaylight.yang.gen.v1.urn.opendaylight.neutron.rev150712.Neutron;
import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.ovsdb.hwvtep.rev150901.HwvtepGlobalAugmentation;
import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.ovsdb.hwvtep.rev150901.PhysicalSwitchAugmentation;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.ovsdb.hwvtep.rev150901.hwvtep.global.attributes.RemoteMcastMacs;
import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.ovsdb.hwvtep.rev150901.hwvtep.physical._switch.attributes.TunnelIps;
+import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.NodeId;
import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.Topology;
import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.Node;
import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
@Singleton
public class L2GatewayConnectionListener extends AbstractClusteredAsyncDataTreeChangeListener<L2gatewayConnection>
implements RecoverableListener {
+
private static final Logger LOG = LoggerFactory.getLogger(L2GatewayConnectionListener.class);
private static final int MAX_READ_TRIALS = 120;
private static final Function<Node, InstanceIdentifier<Node>> TO_GLOBAL_PATH =
- HwvtepHAUtil::getGlobalNodePathFromPSNode;
+ HwvtepHAUtil::getGlobalNodePathFromPSNode;
private static final Function<Node, InstanceIdentifier<Node>> TO_NODE_PATH =
(node) -> HwvtepSouthboundUtils.createInstanceIdentifier(node.getNodeId());
private static final Function<InstanceIdentifier<Node>, String> GET_DEVICE_NAME = HwvtepHAUtil::getPsName;
- private static final Predicate<InstanceIdentifier<Node>> IS_PS_NODE = (psIid) ->
- HwvtepHAUtil.getPsName(psIid) != null;
+ private static final Predicate<InstanceIdentifier<Node>> IS_PS_NODE = (psIid) -> {
+ return HwvtepHAUtil.getPsName(psIid) != null;
+ };
private static final Predicate<Node> IS_HA_PARENT_NODE = (node) -> {
HwvtepGlobalAugmentation augmentation = node.augmentation(HwvtepGlobalAugmentation.class);
private final L2GatewayConnectionUtils l2GatewayConnectionUtils;
private final Scheduler scheduler;
private final L2GatewayCache l2GatewayCache;
- private final Labeled<Labeled<Counter>> elanConnectionsCounter;
+ private final ConfigMcastCache configMcastCache;
+ private final L2GatewayListener l2GatewayListener;
+ private final ItmExternalTunnelCache itmExternalTunnelCache;
+ private final HwvtepPhysicalSwitchListener hwvtepPhysicalSwitchListener;
+ private final ManagedNewTransactionRunner txRunner;
+
+ Map<InstanceIdentifier<Node>, Node> allNodes = null;
@Inject
public L2GatewayConnectionListener(final DataBroker db, L2GatewayConnectionUtils l2GatewayConnectionUtils,
Scheduler scheduler, L2GatewayCache l2GatewayCache,
- MetricProvider metricProvider,
final L2GatewayServiceRecoveryHandler l2GatewayServiceRecoveryHandler,
final L2GatewayConnectionInstanceRecoveryHandler l2InstanceRecoveryHandler,
- final ServiceRecoveryRegistry serviceRecoveryRegistry) {
+ final ServiceRecoveryRegistry serviceRecoveryRegistry,
+ ConfigMcastCache configMcastCache,
+ L2GatewayListener l2GatewayListener,
+ ItmExternalTunnelCache itmExternalTunnelCache,
+ HwvtepPhysicalSwitchListener hwvtepPhysicalSwitchListener) {
super(db, LogicalDatastoreType.CONFIGURATION, InstanceIdentifier.create(Neutron.class)
.child(L2gatewayConnections.class).child(L2gatewayConnection.class),
- Executors.newListeningSingleThreadExecutor("L2GatewayConnectionListener", LOG));
+ Executors.newListeningSingleThreadExecutor("L2GatewayConnectionListener", LOG));
+ this.txRunner = new ManagedNewTransactionRunnerImpl(db);
this.broker = db;
this.l2GatewayConnectionUtils = l2GatewayConnectionUtils;
this.scheduler = scheduler;
this.l2GatewayCache = l2GatewayCache;
- this.elanConnectionsCounter = metricProvider.newCounter(MetricDescriptor.builder()
- .anchor(this).project("netvirt").module("l2gw").id("connections").build(), "modification", "elan");
+ this.configMcastCache = configMcastCache;
+ this.l2GatewayListener = l2GatewayListener;
+ this.itmExternalTunnelCache = itmExternalTunnelCache;
+ this.hwvtepPhysicalSwitchListener = hwvtepPhysicalSwitchListener;
serviceRecoveryRegistry.addRecoverableListener(l2GatewayServiceRecoveryHandler.buildServiceRegistryKey(),
this);
serviceRecoveryRegistry.addRecoverableListener(l2InstanceRecoveryHandler.buildServiceRegistryKey(),
this);
- init();
}
+ @PostConstruct
+ @SuppressWarnings("illegalcatch")
public void init() {
- loadL2GwDeviceCache(1);
- LOG.trace("Loading l2gw connection cache");
- loadL2GwConnectionCache();
+ ResourceBatchingManager.getInstance().registerDefaultBatchHandlers(this.broker);
+ scheduler.getScheduledExecutorService().schedule(() -> {
+ txRunner.callWithNewReadOnlyTransactionAndClose(CONFIGURATION, tx -> {
+ try {
+ LOG.trace("Loading l2gw device cache");
+ loadL2GwDeviceCache(tx);
+ LOG.trace("Loading l2gw Mcast cache");
+ fillConfigMcastCache();
+ LOG.trace("Loading l2gw connection cache");
+ loadL2GwConnectionCache(tx);
+ } catch (Exception e) {
+ LOG.error("Failed to load cache", e);
+ } finally {
+ allNodes.clear();
+ l2GatewayListener.registerListener();
+ ///configMcastCache.registerListener(CONFIGURATION, broker);
+ //itmExternalTunnelCache.registerListener(CONFIGURATION, broker);
+ registerListener();
+ hwvtepPhysicalSwitchListener.registerListener();
+ }
+ });
+ }, 1, TimeUnit.SECONDS);
+ }
+
+ @Override
+ public void register() {
+ LOG.info("Registering L2GatewayConnectionListener Override Method");
+ super.register();
}
@Override
@Override
public void add(final InstanceIdentifier<L2gatewayConnection> identifier, final L2gatewayConnection input) {
LOG.trace("Adding L2gatewayConnection: {}", input);
- elanConnectionsCounter
- .label(DataObjectModification.ModificationType.WRITE.name())
- .label(input.getNetworkId().getValue()).increment();
+
// Get associated L2GwId from 'input'
// Create logical switch in each of the L2GwDevices part of L2Gw
// Logical switch name is network UUID
@Override
public void remove(InstanceIdentifier<L2gatewayConnection> identifier, L2gatewayConnection input) {
LOG.trace("Removing L2gatewayConnection: {}", input);
- elanConnectionsCounter
- .label(DataObjectModification.ModificationType.DELETE.name())
- .label(input.getNetworkId().getValue()).increment();
+
l2GatewayConnectionUtils.deleteL2GatewayConnection(input);
}
LOG.trace("Updating L2gatewayConnection : original value={}, updated value={}", original, update);
}
- @SuppressFBWarnings(value = "UPM_UNCALLED_PRIVATE_METHOD",
- justification = "https://github.com/spotbugs/spotbugs/issues/811")
- private void loadL2GwDeviceCache(final int trialNo) {
- scheduler.getScheduledExecutorService().schedule(() -> {
- if (trialNo == MAX_READ_TRIALS) {
- LOG.error("Failed to read config topology");
- return;
- }
- ReadTransaction tx = broker.newReadOnlyTransaction();
- InstanceIdentifier<Topology> topoIid = HwvtepSouthboundUtils.createHwvtepTopologyInstanceIdentifier();
- Futures.addCallback(tx.read(CONFIGURATION, topoIid), new FutureCallback<Optional<Topology>>() {
- @Override
- public void onSuccess(Optional<Topology> topologyOptional) {
- if (topologyOptional != null && topologyOptional.isPresent()) {
- loadL2GwDeviceCache(new ArrayList<Node>(topologyOptional.get().nonnullNode().values()));
- }
- registerListener();
- }
-
- @Override
- public void onFailure(Throwable throwable) {
- loadL2GwDeviceCache(trialNo + 1);
- }
- }, MoreExecutors.directExecutor());
- tx.close();
- }, 1, TimeUnit.SECONDS);
+ private void addL2DeviceToCache(InstanceIdentifier<Node> psIid, Node globalNode, Node psNode) {
+ LOG.trace("L2GatewayConnectionListener Adding device to cache {}", psNode.getNodeId().getValue());
+ String deviceName = HwvtepHAUtil.getPsName(psIid);
+ List<TunnelIps> tunnelIps = new ArrayList<>(getTunnelIps(psNode));
+ if (tunnelIps != null) {
+ l2GatewayCache.updateL2GatewayCache(deviceName, globalNode.getNodeId().getValue(), tunnelIps);
+ LOG.info("L2GatewayConnectionListener Added device to cache {} {}",
+ psNode.getNodeId().getValue(), tunnelIps);
+ } else {
+ LOG.error("L2GatewayConnectionListener Could not add device to l2gw cache no tunnel ip found {}",
+ psNode.getNodeId().getValue());
+ }
}
- @SuppressFBWarnings(value = "UPM_UNCALLED_PRIVATE_METHOD",
- justification = "https://github.com/spotbugs/spotbugs/issues/811")
- private void loadL2GwDeviceCache(List<Node> nodes) {
- if (nodes == null) {
- LOG.debug("No config topology nodes are present");
+ private void fillConfigMcastCache() {
+ if (allNodes == null) {
return;
}
- Map<InstanceIdentifier<Node>, Node> allNodes = nodes
+ //allNodes.entrySet().stream().map(entry -> entry);
+ allNodes.entrySet().stream()
+ .filter(entry -> entry.getValue().augmentation(HwvtepGlobalAugmentation.class) != null)
+ .filter(entry ->
+ entry.getValue().augmentation(HwvtepGlobalAugmentation.class).getRemoteMcastMacs() != null)
+ .forEach(entry -> {
+ entry.getValue().augmentation(HwvtepGlobalAugmentation.class).getRemoteMcastMacs().values().stream()
+ .forEach(mac -> {
+ configMcastCache.added(getMacIid(entry.getKey(), mac), mac);
+ });
+ });
+ }
+
+ private InstanceIdentifier<RemoteMcastMacs> getMacIid(InstanceIdentifier<Node> nodeIid, RemoteMcastMacs mac) {
+ return nodeIid.augmentation(HwvtepGlobalAugmentation.class).child(RemoteMcastMacs.class, mac.key());
+ }
+
+ public void loadL2GwConnectionCache(TypedReadTransaction<Configuration> tx) {
+ InstanceIdentifier<L2gatewayConnections> parentIid = InstanceIdentifier
+ .create(Neutron.class)
+ .child(L2gatewayConnections.class);
+ Optional<L2gatewayConnections> optional = Optional.empty();
+ try {
+ optional = tx.read(parentIid).get();
+ } catch (ExecutionException | InterruptedException e) {
+ LOG.error("Exception while reading l2gwconnecton for populating Cache");
+ }
+ if (optional.isPresent() && optional.get().getL2gatewayConnection() != null) {
+ LOG.trace("Found some connections to fill in l2gw connection cache");
+ optional.get().getL2gatewayConnection().values()
+ .forEach(connection -> {
+ add(parentIid.child(L2gatewayConnection.class, connection.key()), connection);
+ });
+ }
+ }
+
+ private void loadL2GwDeviceCache(TypedReadTransaction tx) {
+ allNodes = (Map<InstanceIdentifier<Node>, Node>) readAllConfigNodes(tx)
.stream()
.collect(toMap(TO_NODE_PATH, Function.identity()));
.collect(groupingBy(GET_DEVICE_NAME, toList()));
//Process HA nodes
- allNodes.values().stream()
- .filter(IS_HA_PARENT_NODE)
- .forEach(parentNode -> allIids.stream()
- .filter(IS_PS_NODE)
- .filter(psIid -> PS_NODE_OF_PARENT_NODE.test(psIid, parentNode))
- .forEach(psIid -> addL2DeviceToCache(psIid, parentNode, allNodes.get(psIid))));
+ createHANodes(allIids);
//Process non HA nodes there will be only one ps node iid for each device for non ha nodes
psNodesByDeviceName.values().stream()
});
}
- public void loadL2GwConnectionCache() {
- InstanceIdentifier<L2gatewayConnections> parentIid = InstanceIdentifier
- .create(Neutron.class)
- .child(L2gatewayConnections.class);
+ private void createHANodes(Set<InstanceIdentifier<Node>> allIids) {
+ allNodes.values().stream()
+ .filter(IS_HA_PARENT_NODE)
+ .forEach(parentNode -> {
+ fillHACache(parentNode);
+ allIids.stream()
+ .filter(IS_PS_NODE)
+ .filter(psIid -> PS_NODE_OF_PARENT_NODE.test(psIid, parentNode))
+ .forEach(psIid -> {
+ addL2DeviceToCache(psIid, parentNode, allNodes.get(psIid));
+ });
+ });
+ }
- Optional<L2gatewayConnections> optional = Optional.empty();
- try {
- optional = SingleTransactionDataBroker.syncReadOptional(broker, CONFIGURATION,
- parentIid);
- } catch (ExecutionException | InterruptedException e) {
- LOG.error("loadL2GwConnectionCache: Exception while reading L2gatewayConnections DS", e);
+ private static void fillHACache(Node parentNode) {
+ InstanceIdentifier<Node> parentIid
+ = HwvtepHAUtil.convertToInstanceIdentifier(parentNode.getNodeId().getValue());
+ List<NodeId> childIids
+ = HwvtepHAUtil.getChildNodeIdsFromManagerOtherConfig(Optional.of(parentNode));
+ if (childIids != null) {
+ for (NodeId childid : childIids) {
+ InstanceIdentifier<Node> childIid
+ = HwvtepHAUtil.convertToInstanceIdentifier(childid.getValue());
+ HwvtepHACache.getInstance().addChild(parentIid, childIid);
+ }
}
- if (optional.isPresent() && optional.get().getL2gatewayConnection() != null) {
- LOG.trace("Found some connections to fill in l2gw connection cache");
- new ArrayList<>(optional.get().nonnullL2gatewayConnection().values())
- .forEach(connection -> {
- add(parentIid.child(L2gatewayConnection.class, connection.key()), connection);
- });
+ }
+
+ private Collection<TunnelIps> getTunnelIps(Node psNode) {
+ if (psNode.augmentation(PhysicalSwitchAugmentation.class) != null) {
+ return psNode.augmentation(PhysicalSwitchAugmentation.class).nonnullTunnelIps().values();
}
+ return Collections.EMPTY_LIST;
}
- void addL2DeviceToCache(InstanceIdentifier<Node> psIid, Node globalNode, Node psNode) {
- LOG.trace("Adding device to cache {}", psNode.getNodeId().getValue());
- String deviceName = HwvtepHAUtil.getPsName(psIid);
- L2GatewayDevice l2GwDevice = l2GatewayCache.addOrGet(deviceName);
- l2GwDevice.setConnected(true);
- l2GwDevice.setHwvtepNodeId(globalNode.getNodeId().getValue());
+ private List<Node> readAllConfigNodes(TypedReadTransaction<Configuration> tx) {
- List<TunnelIps> tunnelIps = psNode.augmentation(PhysicalSwitchAugmentation.class) != null
- ? new ArrayList<>(psNode.augmentation(PhysicalSwitchAugmentation.class)
- .nonnullTunnelIps().values()) : null;
- if (tunnelIps != null) {
- for (TunnelIps tunnelIp : tunnelIps) {
- IpAddress tunnelIpAddr = tunnelIp.getTunnelIpsKey();
- l2GwDevice.addTunnelIp(tunnelIpAddr);
+
+ int trialNo = 1;
+ Optional<Topology> topologyOptional = Optional.empty();
+ do {
+ try {
+ topologyOptional = tx.read(HwvtepSouthboundUtils.createHwvtepTopologyInstanceIdentifier()).get();
+ break;
+ } catch (ExecutionException | InterruptedException e) {
+ try {
+ Thread.sleep(1000);
+ } catch (InterruptedException e1) {
+ LOG.trace("Sleep interrupted");
+ }
}
+ } while (trialNo++ < MAX_READ_TRIALS);
+ if (topologyOptional != null && topologyOptional.isPresent() && topologyOptional.get().getNode() != null) {
+ return new ArrayList<>(topologyOptional.get().nonnullNode().values());
}
+ return Collections.EMPTY_LIST;
}
}