package org.opendaylight.netvirt.elan.l2gw.listeners;
+import static org.opendaylight.genius.infra.Datastore.CONFIGURATION;
+
import java.util.Collections;
import java.util.HashSet;
-import java.util.List;
+import java.util.Map;
import java.util.Objects;
+import java.util.Optional;
import java.util.Set;
-import java.util.concurrent.ExecutionException;
import java.util.function.BiPredicate;
import java.util.function.Predicate;
-import javax.annotation.PostConstruct;
+import javax.annotation.PreDestroy;
import javax.inject.Inject;
import javax.inject.Singleton;
-import org.opendaylight.controller.md.sal.binding.api.ClusteredDataTreeChangeListener;
-import org.opendaylight.controller.md.sal.binding.api.DataBroker;
-import org.opendaylight.controller.md.sal.binding.api.ReadWriteTransaction;
-import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
+import org.eclipse.jdt.annotation.Nullable;
import org.opendaylight.genius.datastoreutils.hwvtep.HwvtepAbstractDataTreeChangeListener;
+import org.opendaylight.genius.infra.ManagedNewTransactionRunner;
+import org.opendaylight.genius.infra.ManagedNewTransactionRunnerImpl;
import org.opendaylight.genius.mdsalutil.MDSALUtil;
-import org.opendaylight.genius.utils.hwvtep.HwvtepHACache;
+import org.opendaylight.genius.utils.hwvtep.HwvtepNodeHACache;
import org.opendaylight.genius.utils.hwvtep.HwvtepSouthboundConstants;
import org.opendaylight.genius.utils.hwvtep.HwvtepSouthboundUtils;
+import org.opendaylight.infrautils.utils.concurrent.Executors;
+import org.opendaylight.infrautils.utils.concurrent.LoggingFutures;
+import org.opendaylight.mdsal.binding.api.ClusteredDataTreeChangeListener;
+import org.opendaylight.mdsal.binding.api.DataBroker;
+import org.opendaylight.mdsal.binding.api.DataTreeIdentifier;
+import org.opendaylight.mdsal.common.api.LogicalDatastoreType;
import org.opendaylight.netvirt.elan.l2gw.ha.HwvtepHAUtil;
import org.opendaylight.netvirt.elan.l2gw.ha.listeners.HAOpClusteredListener;
+import org.opendaylight.netvirt.elan.l2gw.recovery.impl.L2GatewayServiceRecoveryHandler;
import org.opendaylight.netvirt.elan.l2gw.utils.ElanL2GatewayUtils;
import org.opendaylight.netvirt.elan.l2gw.utils.L2GatewayConnectionUtils;
import org.opendaylight.netvirt.elan.l2gw.utils.L2GatewayUtils;
import org.opendaylight.netvirt.elanmanager.utils.ElanL2GwCacheUtils;
import org.opendaylight.netvirt.neutronvpn.api.l2gw.L2GatewayCache;
import org.opendaylight.netvirt.neutronvpn.api.l2gw.L2GatewayDevice;
+import org.opendaylight.serviceutils.srm.RecoverableListener;
+import org.opendaylight.serviceutils.srm.ServiceRecoveryRegistry;
import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.inet.types.rev130715.IpAddress;
import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.itm.rpcs.rev160406.ItmRpcService;
import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.ovsdb.hwvtep.rev150901.PhysicalSwitchAugmentation;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.ovsdb.hwvtep.rev150901.PhysicalSwitchAugmentationBuilder;
import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.ovsdb.hwvtep.rev150901.hwvtep.physical._switch.attributes.TunnelIps;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.ovsdb.hwvtep.rev150901.hwvtep.physical._switch.attributes.TunnelIpsKey;
import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.NetworkTopology;
import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.NodeId;
import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.Topology;
@Singleton
public class HwvtepPhysicalSwitchListener
extends HwvtepAbstractDataTreeChangeListener<PhysicalSwitchAugmentation, HwvtepPhysicalSwitchListener>
- implements ClusteredDataTreeChangeListener<PhysicalSwitchAugmentation> {
+ implements ClusteredDataTreeChangeListener<PhysicalSwitchAugmentation>, RecoverableListener {
/** The Constant LOG. */
private static final Logger LOG = LoggerFactory.getLogger(HwvtepPhysicalSwitchListener.class);
private static final BiPredicate<L2GatewayDevice, InstanceIdentifier<Node>> DEVICE_NOT_CACHED_OR_PARENT_CONNECTED =
- (l2GatewayDevice, globalIid) -> {
- return l2GatewayDevice == null || l2GatewayDevice.getHwvtepNodeId() == null
- || !Objects.equals(l2GatewayDevice.getHwvtepNodeId(),
- globalIid.firstKeyOf(Node.class).getNodeId().getValue());
- };
+ (l2GatewayDevice, globalIid) -> l2GatewayDevice == null || l2GatewayDevice.getHwvtepNodeId() == null
+ || !Objects.equals(l2GatewayDevice.getHwvtepNodeId(),
+ globalIid.firstKeyOf(Node.class).getNodeId().getValue());
private static final Predicate<PhysicalSwitchAugmentation> TUNNEL_IP_AVAILABLE =
- phySwitch -> !HwvtepHAUtil.isEmpty(phySwitch.getTunnelIps());
+ phySwitch -> !HwvtepHAUtil.isEmpty(phySwitch.getTunnelIps().values());
private static final Predicate<PhysicalSwitchAugmentation> TUNNEL_IP_NOT_AVAILABLE = TUNNEL_IP_AVAILABLE.negate();
private static final BiPredicate<PhysicalSwitchAugmentation, L2GatewayDevice> TUNNEL_IP_CHANGED =
- (phySwitchAfter, existingDevice) -> {
- return TUNNEL_IP_AVAILABLE.test(phySwitchAfter)
- && !Objects.equals(
- existingDevice.getTunnelIp(), phySwitchAfter.getTunnelIps().get(0).getTunnelIpsKey());
- };
+ (phySwitchAfter, existingDevice) -> TUNNEL_IP_AVAILABLE.test(phySwitchAfter)
+ && !Objects.equals(
+ existingDevice.getTunnelIp(), phySwitchAfter.getTunnelIps().get(0).getTunnelIpsKey());
/** The data broker. */
private final DataBroker dataBroker;
+ private final ManagedNewTransactionRunner txRunner;
/** The itm rpc service. */
private final ItmRpcService itmRpcService;
private final ElanClusterUtils elanClusterUtils;
- private final HwvtepHACache hwvtepHACache = HwvtepHACache.getInstance();
+ private final HwvtepNodeHACache hwvtepNodeHACache;
private final L2gwServiceProvider l2gwServiceProvider;
- private final BiPredicate<L2GatewayDevice, InstanceIdentifier<Node>> childConnectedAfterParent =
- (l2GwDevice, globalIid) -> {
- return !hwvtepHACache.isHAParentNode(globalIid)
- && l2GwDevice != null;
- // FIXME: The following call to equals compares different types (String and InstanceIdentifier) and
- // thus will always return false. I don't know what the intention is here so commented out for now.
- //&& !Objects.equals(l2GwDevice.getHwvtepNodeId(), globalIid);
- };
+ private final BiPredicate<L2GatewayDevice, InstanceIdentifier<Node>> childConnectedAfterParent;
private final Predicate<L2GatewayDevice> alreadyHasL2Gwids =
- (l2GwDevice) -> {
- return l2GwDevice != null && HwvtepHAUtil.isEmpty(l2GwDevice.getL2GatewayIds());
- };
-
- private final BiPredicate<L2GatewayDevice, InstanceIdentifier<Node>> parentConnectedAfterChild =
- (l2GwDevice, globalIid) -> {
- InstanceIdentifier<Node> existingIid = globalIid;
- if (l2GwDevice != null && l2GwDevice.getHwvtepNodeId() != null) {
- existingIid = HwvtepHAUtil.convertToInstanceIdentifier(l2GwDevice.getHwvtepNodeId());
- }
- return hwvtepHACache.isHAParentNode(globalIid)
- && l2GwDevice != null
- // FIXME: The following call to equals compares different types (String and InstanceIdentifier) and
- // thus will always return false. I don't know what the intention is here so commented out for now.
- //&& !Objects.equals(l2GwDevice.getHwvtepNodeId(), globalIid)
- && Objects.equals(globalIid, hwvtepHACache.getParent(existingIid));
- };
-
+ (l2GwDevice) -> l2GwDevice != null && HwvtepHAUtil.isEmpty(l2GwDevice.getL2GatewayIds());
+ private final BiPredicate<L2GatewayDevice, InstanceIdentifier<Node>> parentConnectedAfterChild;
private final HAOpClusteredListener haOpClusteredListener;
private final L2GatewayCache l2GatewayCache;
* Instantiates a new hwvtep physical switch listener.
*/
@Inject
- public HwvtepPhysicalSwitchListener(final DataBroker dataBroker, ItmRpcService itmRpcService,
- ElanClusterUtils elanClusterUtils, L2gwServiceProvider l2gwServiceProvider,
- HAOpClusteredListener haListener, L2GatewayCache l2GatewayCache,
- StaleVlanBindingsCleaner staleVlanBindingsCleaner) {
- super(PhysicalSwitchAugmentation.class, HwvtepPhysicalSwitchListener.class);
+ public HwvtepPhysicalSwitchListener(final L2GatewayServiceRecoveryHandler l2GatewayServiceRecoveryHandler,
+ final ServiceRecoveryRegistry serviceRecoveryRegistry,
+ final DataBroker dataBroker, ItmRpcService itmRpcService,
+ ElanClusterUtils elanClusterUtils, L2gwServiceProvider l2gwServiceProvider,
+ HAOpClusteredListener haListener, L2GatewayCache l2GatewayCache,
+ StaleVlanBindingsCleaner staleVlanBindingsCleaner,
+ HwvtepNodeHACache hwvtepNodeHACache) {
+ super(dataBroker, DataTreeIdentifier.create(LogicalDatastoreType.OPERATIONAL,
+ InstanceIdentifier.create(NetworkTopology.class)
+ .child(Topology.class, new TopologyKey(HwvtepSouthboundConstants.HWVTEP_TOPOLOGY_ID)).child(Node.class)
+ .augmentation(PhysicalSwitchAugmentation.class)),
+ Executors.newListeningSingleThreadExecutor("HwvtepPhysicalSwitchListener", LOG),
+ hwvtepNodeHACache);
this.dataBroker = dataBroker;
+ this.txRunner = new ManagedNewTransactionRunnerImpl(dataBroker);
this.itmRpcService = itmRpcService;
this.elanClusterUtils = elanClusterUtils;
this.l2gwServiceProvider = l2gwServiceProvider;
this.staleVlanBindingsCleaner = staleVlanBindingsCleaner;
this.haOpClusteredListener = haListener;
this.l2GatewayCache = l2GatewayCache;
+ this.hwvtepNodeHACache = hwvtepNodeHACache;
+
+ childConnectedAfterParent = (l2GwDevice, globalIid) -> {
+ return !hwvtepNodeHACache.isHAParentNode(globalIid)
+ && l2GwDevice != null && l2GwDevice.getHwvtepNodeId() != null
+ && !Objects.equals(l2GwDevice.getHwvtepNodeId(), globalIid.firstKeyOf(Node.class)
+ .getNodeId().getValue());
+ };
+
+ parentConnectedAfterChild = (l2GwDevice, globalIid) -> {
+ InstanceIdentifier<Node> existingIid = globalIid;
+ if (l2GwDevice != null && l2GwDevice.getHwvtepNodeId() != null) {
+ existingIid = HwvtepHAUtil.convertToInstanceIdentifier(l2GwDevice.getHwvtepNodeId());
+ }
+ return hwvtepNodeHACache.isHAParentNode(globalIid)
+ && l2GwDevice != null && l2GwDevice.getHwvtepNodeId() != null
+ && !Objects.equals(l2GwDevice.getHwvtepNodeId(), globalIid.firstKeyOf(Node.class)
+ .getNodeId().getValue())
+ && Objects.equals(globalIid, hwvtepNodeHACache.getParent(existingIid));
+ };
+
+ serviceRecoveryRegistry.addRecoverableListener(l2GatewayServiceRecoveryHandler.buildServiceRegistryKey(),
+ this);
}
- @Override
- @PostConstruct
public void init() {
- registerListener(LogicalDatastoreType.OPERATIONAL, dataBroker);
+ registerListener();
}
@Override
- protected InstanceIdentifier<PhysicalSwitchAugmentation> getWildCardPath() {
- return InstanceIdentifier.create(NetworkTopology.class)
- .child(Topology.class, new TopologyKey(HwvtepSouthboundConstants.HWVTEP_TOPOLOGY_ID)).child(Node.class)
- .augmentation(PhysicalSwitchAugmentation.class);
+ public void registerListener() {
+ super.register();
+ LOG.info("Registering HwvtepPhysicalSwitchListener");
+ }
+
+ public void deregisterListener() {
+ super.close();
+ LOG.info("Deregistering HwvtepPhysicalSwitchListener");
}
@Override
- protected HwvtepPhysicalSwitchListener getDataTreeChangeListener() {
- return HwvtepPhysicalSwitchListener.this;
+ @PreDestroy
+ public void close() {
+ super.close();
+ Executors.shutdownAndAwaitTermination(getExecutorService());
}
@Override
existingDevice.getDeviceName(), existingDevice.getTunnelIp());
Thread.sleep(10000L);//TODO remove these sleeps
LOG.info("Creating itm tunnels for device {}", existingDevice.getDeviceName());
- ElanL2GatewayUtils.createItmTunnels(itmRpcService, hwvtepId, psName,
+ ElanL2GatewayUtils.createItmTunnels(dataBroker, itmRpcService, hwvtepId, psName,
phySwitchAfter.getTunnelIps().get(0).getTunnelIpsKey());
return Collections.emptyList();
}
LOG.error("Global node is absent {}", globalNodeId);
return;
}
- HAOpClusteredListener.addToCacheIfHAChildNode(globalNodeIid, node.get());
- if (hwvtepHACache.isHAEnabledDevice(globalNodeIid)) {
+ HwvtepHAUtil.addToCacheIfHAChildNode(globalNodeIid, node.get(), hwvtepNodeHACache);
+ if (hwvtepNodeHACache.isHAEnabledDevice(globalNodeIid)) {
LOG.trace("Ha enabled device {}", globalNodeIid);
return;
}
LOG.trace("Updating cache for node {}", globalNodeIid);
L2GatewayDevice l2GwDevice = l2GatewayCache.get(psName);
if (childConnectedAfterParent.test(l2GwDevice, globalNodeIid)) {
- LOG.trace("Device {} {} is already Connected by ",
+ LOG.trace("Device {} {} is already Connected by {}",
psName, globalNodeId, l2GwDevice.getHwvtepNodeId());
return;
}
l2GwDevice.setConnected(true);
l2GwDevice.setHwvtepNodeId(globalNodeId);
- List<TunnelIps> tunnelIps = phySwitchAdded.getTunnelIps();
+ Map<TunnelIpsKey, TunnelIps> tunnelIps = phySwitchAdded.getTunnelIps();
if (tunnelIps != null) {
- for (TunnelIps tunnelIp : tunnelIps) {
+ for (TunnelIps tunnelIp : tunnelIps.values()) {
IpAddress tunnelIpAddr = tunnelIp.getTunnelIpsKey();
l2GwDevice.addTunnelIp(tunnelIpAddr);
}
}
handleAdd(l2GwDevice);
- return;
+ elanClusterUtils.runOnlyInOwnerNode("Update config tunnels IP ",
+ () -> updateConfigTunnelIp(identifier, phySwitchAdded));
});
}
- boolean updateHACacheIfHANode(DataBroker broker, InstanceIdentifier<Node> globalNodeId)
- throws ExecutionException, InterruptedException {
- ReadWriteTransaction transaction = broker.newReadWriteTransaction();
- Node node = transaction.read(LogicalDatastoreType.OPERATIONAL, globalNodeId).get().get();
- HAOpClusteredListener.addToCacheIfHAChildNode(globalNodeId, node);
- return hwvtepHACache.isHAEnabledDevice(globalNodeId);
- }
-
/**
* Handle add.
*
l2GwDevice.getL2GatewayIds(), psName, tunnelIpAddr);
l2gwServiceProvider.provisionItmAndL2gwConnection(l2GwDevice, psName, hwvtepNodeId, tunnelIpAddr);
} else {
- LOG.info("l2gw.provision.skip {}", hwvtepNodeId, psName);
+ LOG.info("l2gw.provision.skip {}:{}", hwvtepNodeId, psName);
}
}
elanClusterUtils.runOnlyInOwnerNode("Stale entry cleanup", () -> {
* the identifier
* @return the node id
*/
- private NodeId getNodeId(InstanceIdentifier<PhysicalSwitchAugmentation> identifier) {
+ private static NodeId getNodeId(InstanceIdentifier<PhysicalSwitchAugmentation> identifier) {
return identifier.firstKeyOf(Node.class).getNodeId();
}
- private String getManagedByNodeId(InstanceIdentifier<PhysicalSwitchAugmentation> identifier) {
+ private static String getManagedByNodeId(InstanceIdentifier<PhysicalSwitchAugmentation> identifier) {
String psNodeId = identifier.firstKeyOf(Node.class).getNodeId().getValue();
if (psNodeId.contains(HwvtepHAUtil.PHYSICALSWITCH)) {
return psNodeId.substring(0, psNodeId.indexOf(HwvtepHAUtil.PHYSICALSWITCH));
return psNodeId;
}
- private InstanceIdentifier<Node> getManagedByNodeIid(InstanceIdentifier<PhysicalSwitchAugmentation> identifier) {
+ @Nullable
+ private static InstanceIdentifier<Node> getManagedByNodeIid(
+ InstanceIdentifier<PhysicalSwitchAugmentation> identifier) {
String psNodeId = identifier.firstKeyOf(Node.class).getNodeId().getValue();
if (psNodeId.contains(HwvtepHAUtil.PHYSICALSWITCH)) {
psNodeId = psNodeId.substring(0, psNodeId.indexOf(HwvtepHAUtil.PHYSICALSWITCH));
return null;
}
- private String getPsName(InstanceIdentifier<PhysicalSwitchAugmentation> identifier) {
+ @Nullable
+ private static String getPsName(InstanceIdentifier<PhysicalSwitchAugmentation> identifier) {
String psNodeId = identifier.firstKeyOf(Node.class).getNodeId().getValue();
if (psNodeId.contains(HwvtepHAUtil.PHYSICALSWITCH)) {
return psNodeId.substring(psNodeId.indexOf(HwvtepHAUtil.PHYSICALSWITCH) + HwvtepHAUtil.PHYSICALSWITCH
}
return null;
}
+
+ private void updateConfigTunnelIp(InstanceIdentifier<PhysicalSwitchAugmentation> identifier,
+ PhysicalSwitchAugmentation phySwitchAdded) {
+ if (phySwitchAdded.getTunnelIps() != null) {
+ LoggingFutures.addErrorLogging(
+ txRunner.callWithNewReadWriteTransactionAndSubmit(CONFIGURATION, tx -> {
+ Optional<PhysicalSwitchAugmentation> existingSwitch = tx.read(identifier).get();
+ PhysicalSwitchAugmentationBuilder psBuilder = new PhysicalSwitchAugmentationBuilder();
+ if (existingSwitch.isPresent()) {
+ psBuilder = new PhysicalSwitchAugmentationBuilder(existingSwitch.get());
+ }
+ psBuilder.setTunnelIps(phySwitchAdded.getTunnelIps());
+ tx.mergeParentStructurePut(identifier, psBuilder.build());
+ LOG.trace("Updating config tunnel ips {}", identifier);
+ }), LOG, "Failed to update the config tunnel ips {}", identifier);
+ }
+ }
}